Skip to content

Commit

Permalink
Merge pull request #37 from vidardb/fdw-refactor
Browse files Browse the repository at this point in the history
Refactor kv framework
  • Loading branch information
chenquanzhao authored Sep 26, 2020
2 parents 9af5bec + 555863b commit 46251e9
Show file tree
Hide file tree
Showing 28 changed files with 3,471 additions and 3,150 deletions.
35 changes: 30 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ COMPILE.cxx.bc = $(CLANG) -xc++ -Wno-ignored-attributes $(BITCODE_CXXFLAGS) $(CP
$(LLVM_BINPATH)/opt -module-summary -f $@ -o $@

ifdef VIDARDB
PG_CPPFLAGS += -Wno-declaration-after-statement -DVIDARDB
PG_CPPFLAGS += -Wno-declaration-after-statement -Wno-unused-function -DVIDARDB
SHLIB_LINK = -lvidardb
else
PG_CPPFLAGS += -Wno-declaration-after-statement
PG_CPPFLAGS += -Wno-declaration-after-statement -Wno-unused-function
SHLIB_LINK = -lrocksdb
endif

Expand All @@ -21,7 +21,11 @@ PG_CPPFLAGS += -Wno-deprecated-declarations
SHLIB_LINK += -lstdc++
endif

OBJS = src/kv_fdw.o src/kv_utility.o src/kv_shm.o src/kv_storage.o src/kv_posix.o src/kv_process.o
PG_CPPFLAGS += -Isrc

OBJS = src/kv_fdw.o src/kv_utility.o src/server/kv_storage.o src/ipc/kv_posix.o \
src/ipc/kv_message.o src/ipc/kv_channel.o src/ipc/kv_mq.o \
src/client/kv_client.o src/server/kv_worker.o src/server/kv_manager.o

EXTENSION = kv_fdw
DATA = sql/kv_fdw--0.0.1.sql
Expand All @@ -43,8 +47,29 @@ NETWORK ?= default
APT_OPTS ?=
ENV_EXTS ?=

src/kv_storage.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/kv_storage.cc
src/server/kv_storage.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/server/kv_storage.cc

src/ipc/kv_posix.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/ipc/kv_posix.cc

src/ipc/kv_message.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/ipc/kv_message.cc

src/ipc/kv_channel.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/ipc/kv_channel.cc

src/ipc/kv_mq.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/ipc/kv_mq.cc

src/client/kv_client.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/client/kv_client.cc

src/server/kv_worker.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/server/kv_worker.cc

src/server/kv_manager.bc:
$(COMPILE.cxx.bc) $(CCFLAGS) $(CPPFLAGS) -fPIC -c -o $@ src/server/kv_manager.cc

.PHONY: docker-image
docker-image:
Expand Down
132 changes: 132 additions & 0 deletions src/client/kv_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/* Copyright 2020-present VidarDB Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <unordered_map>
using namespace std;

#include "kv_api.h"
#include "server/kv_manager.h"

extern "C" {
#include "postgres.h"
#include "miscadmin.h"
}

/*
* In backend process scope
*/

static KVManagerClient* manager = nullptr;
static unordered_map<KVWorkerId, KVWorkerClient*> workers;

/*
* Implementation for kv client
*/

static void InitKVManagerClient() {
manager = new KVManagerClient();
}

static KVWorkerClient* GetKVWorkerClient(KVWorkerId workerId) {
auto it = workers.find(workerId);
if (it != workers.end()) {
return it->second;
}

if (!manager) {
InitKVManagerClient();
}

bool success = manager->Launch(workerId);
if (success) {
KVWorkerClient* worker = new KVWorkerClient(workerId);
workers.insert({workerId, worker});
return worker;
}

ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("too many background workers"),
errhint("Up to %d background workers can be registered with"
" the current settings.", max_worker_processes),
errhint("Consider increasing the configuration parameter "
"\"max_worker_processes\".")));
return nullptr;
}

void KVOpenRequest(KVRelationId rid, OpenArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
worker->Open(rid, args);
}

void KVCloseRequest(KVRelationId rid) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
worker->Close(rid);
}

uint64 KVCountRequest(KVRelationId rid) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
return worker->Count(rid);
}

bool KVPutRequest(KVRelationId rid, PutArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
return worker->Put(rid, args);
}

bool KVGetRequest(KVRelationId rid, GetArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
return worker->Get(rid, args);
}

bool KVDeleteRequest(KVRelationId rid, DeleteArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
return worker->Delete(rid, args);
}

void KVLoadRequest(KVRelationId rid, PutArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
worker->Load(rid, args);
}

bool KVReadBatchRequest(KVRelationId rid, ReadBatchArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
return worker->ReadBatch(rid, args);
}

void KVCloseCursorRequest(KVRelationId rid, CloseCursorArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
worker->CloseCursor(rid, args);
}

#ifdef VIDARDB
bool KVRangeQueryRequest(KVRelationId rid, RangeQueryArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
return worker->RangeQuery(rid, args);
}

void KVClearRangeQueryRequest(KVRelationId rid, RangeQueryArgs* args) {
KVWorkerClient* worker = GetKVWorkerClient(rid);
worker->ClearRangeQuery(rid, args);
}
#endif

void KVTerminateRequest(KVRelationId rid, KVDatabaseId dbId) {
if (!manager) {
InitKVManagerClient();
}

manager->Terminate(rid, dbId);
workers.erase(rid);
}
Loading

0 comments on commit 46251e9

Please sign in to comment.