Skip to content

Commit

Permalink
Merge commit 'a29df097dd23c36a749e5c616f4d1cae1051e853' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
PokIsemaine committed Sep 8, 2024
2 parents 6c85004 + a29df09 commit df58c47
Show file tree
Hide file tree
Showing 43 changed files with 768 additions and 312 deletions.
4 changes: 2 additions & 2 deletions cmake/rocksdb.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ endif()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(rocksdb
facebook/rocksdb v9.5.2
MD5=03489fe4e03a7216d8f52e59409f1ca8
facebook/rocksdb v9.6.1
MD5=ce31144a7e65d8f4f3f9d98986509eb1
)

FetchContent_GetProperties(jemalloc)
Expand Down
7 changes: 7 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -967,5 +967,12 @@ rocksdb.rate_limiter_auto_tuned yes
# Default: yes
# rocksdb.avoid_unnecessary_blocking_io yes

# Specifies the maximum size in bytes for a write batch in RocksDB.
# If set to 0, there is no size limit for write batches.
# This option can help control memory usage and manage large WriteBatch operations more effectively.
#
# Default: 0
# rocksdb.write_options.write_batch_max_bytes 0

################################ NAMESPACE #####################################
# namespace.test change.me
7 changes: 7 additions & 0 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,13 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was finish";
post_fullsync_cb_();

// It needs to reload namespaces from DB after the full sync is done,
// or namespaces are not visible in the replica.
s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to load and rewrite namespace: " << s.Msg();
}

// Switch to psync state machine again
psync_steps_.Start();
return CBState::QUIT;
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ Config::Config() {
{"rocksdb.write_options.low_pri", true, new YesNoField(&rocks_db.write_options.low_pri, false)},
{"rocksdb.write_options.memtable_insert_hint_per_batch", true,
new YesNoField(&rocks_db.write_options.memtable_insert_hint_per_batch, false)},
{"rocksdb.write_options.write_batch_max_bytes", false,
new IntField(&rocks_db.write_options.write_batch_max_bytes, 0, 0, INT_MAX)},

/* rocksdb read options */
{"rocksdb.read_options.async_io", false, new YesNoField(&rocks_db.read_options.async_io, true)},
Expand Down
1 change: 1 addition & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ struct Config {
bool no_slowdown;
bool low_pri;
bool memtable_insert_hint_per_batch;
int write_batch_max_bytes;
} write_options;

struct ReadOptions {
Expand Down
64 changes: 46 additions & 18 deletions src/search/hnsw_indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ StatusOr<HnswNodeFieldMetadata> HnswNode::DecodeMetadata(engine::Context& ctx, c
return metadata;
}

void HnswNode::PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const {
Status HnswNode::PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const {
std::string updated_metadata;
node_meta->Encode(&updated_metadata);
batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), search_key.ConstructHnswNode(level, key), updated_metadata);
auto s = batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), search_key.ConstructHnswNode(level, key),
updated_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

void HnswNode::DecodeNeighbours(engine::Context& ctx, const SearchKey& search_key) {
Expand All @@ -75,12 +80,13 @@ void HnswNode::DecodeNeighbours(engine::Context& ctx, const SearchKey& search_ke
Status HnswNode::AddNeighbour(engine::Context& ctx, const NodeKey& neighbour_key, const SearchKey& search_key,
rocksdb::WriteBatchBase* batch) const {
auto edge_index_key = search_key.ConstructHnswEdge(level, key, neighbour_key);
batch->Put(ctx.storage->GetCFHandle(ColumnFamilyID::Search), edge_index_key, Slice());

auto rocket_s = batch->Put(ctx.storage->GetCFHandle(ColumnFamilyID::Search), edge_index_key, Slice());
if (!rocket_s.ok()) {
return {Status::NotOK, rocket_s.ToString()};
}
HnswNodeFieldMetadata node_metadata = GET_OR_RET(DecodeMetadata(ctx, search_key));
node_metadata.num_neighbours++;
PutMetadata(&node_metadata, search_key, ctx.storage, batch);
return Status::OK();
return PutMetadata(&node_metadata, search_key, ctx.storage, batch);
}

Status HnswNode::RemoveNeighbour(engine::Context& ctx, const NodeKey& neighbour_key, const SearchKey& search_key,
Expand All @@ -93,8 +99,7 @@ Status HnswNode::RemoveNeighbour(engine::Context& ctx, const NodeKey& neighbour_

HnswNodeFieldMetadata node_metadata = GET_OR_RET(DecodeMetadata(ctx, search_key));
node_metadata.num_neighbours--;
PutMetadata(&node_metadata, search_key, ctx.storage, batch);
return Status::OK();
return PutMetadata(&node_metadata, search_key, ctx.storage, batch);
}

Status VectorItem::Create(NodeKey key, const kqir::NumericArray& vector, const HnswVectorFieldMetadata* metadata,
Expand Down Expand Up @@ -445,7 +450,10 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi

// Update inserted node metadata
HnswNodeFieldMetadata node_metadata(static_cast<uint16_t>(connected_edges_set.size()), vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}

// Update modified nodes metadata
for (const auto& node_edges : deleted_edges_map) {
Expand All @@ -458,14 +466,20 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi
connected_edges_set.erase(current_node_key);
}
current_node_metadata.num_neighbours = new_num_neighbours;
current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
s = current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}

for (const auto& current_node_key : connected_edges_set) {
auto current_node = HnswNode(current_node_key, level);
HnswNodeFieldMetadata current_node_metadata = GET_OR_RET(current_node.DecodeMetadata(ctx, search_key));
current_node_metadata.num_neighbours++;
current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
s = current_node.PutMetadata(&current_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}

entry_points.clear();
Expand All @@ -476,21 +490,30 @@ Status HnswIndex::InsertVectorEntryInternal(engine::Context& ctx, std::string_vi
} else {
auto node = HnswNode(std::string(key), 0);
HnswNodeFieldMetadata node_metadata(0, vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
metadata->num_levels = 1;
}

while (target_level > metadata->num_levels - 1) {
auto node = HnswNode(std::string(key), metadata->num_levels);
HnswNodeFieldMetadata node_metadata(0, vector);
node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
auto s = node.PutMetadata(&node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
metadata->num_levels++;
}

std::string encoded_index_metadata;
metadata->Encode(&encoded_index_metadata);
auto index_meta_key = search_key.ConstructFieldMeta();
batch->Put(cf_handle, index_meta_key, encoded_index_metadata);
auto s = batch->Put(cf_handle, index_meta_key, encoded_index_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

return Status::OK();
}
Expand Down Expand Up @@ -524,7 +547,10 @@ Status HnswIndex::DeleteVectorEntry(engine::Context& ctx, std::string_view key,
auto neighbour_node = HnswNode(neighbour_key, level);
HnswNodeFieldMetadata neighbour_node_metadata = GET_OR_RET(neighbour_node.DecodeMetadata(ctx, search_key));
neighbour_node_metadata.num_neighbours--;
neighbour_node.PutMetadata(&neighbour_node_metadata, search_key, storage, batch.Get());
auto s = neighbour_node.PutMetadata(&neighbour_node_metadata, search_key, storage, batch.Get());
if (!s.IsOK()) {
return s;
}
}
}

Expand Down Expand Up @@ -559,8 +585,10 @@ Status HnswIndex::DeleteVectorEntry(engine::Context& ctx, std::string_view key,
std::string encoded_index_metadata;
metadata->Encode(&encoded_index_metadata);
auto index_meta_key = search_key.ConstructFieldMeta();
batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key, encoded_index_metadata);

auto s = batch->Put(storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key, encoded_index_metadata);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions src/search/hnsw_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ struct HnswNode {
HnswNode(NodeKey key, uint16_t level);

StatusOr<HnswNodeFieldMetadata> DecodeMetadata(engine::Context& ctx, const SearchKey& search_key) const;
void PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const;
Status PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& search_key, engine::Storage* storage,
rocksdb::WriteBatchBase* batch) const;
void DecodeNeighbours(engine::Context& ctx, const SearchKey& search_key);

// For testing purpose
Expand Down
35 changes: 28 additions & 7 deletions src/search/index_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,28 @@ struct IndexManager {

std::string meta_val;
info->metadata.Encode(&meta_val);
batch->Put(cf, index_key.ConstructIndexMeta(), meta_val);
auto s = batch->Put(cf, index_key.ConstructIndexMeta(), meta_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

std::string prefix_val;
info->prefixes.Encode(&prefix_val);
batch->Put(cf, index_key.ConstructIndexPrefixes(), prefix_val);
s = batch->Put(cf, index_key.ConstructIndexPrefixes(), prefix_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

for (const auto &[_, field_info] : info->fields) {
SearchKey field_key(info->ns, info->name, field_info.name);

std::string field_val;
field_info.metadata->Encode(&field_val);

batch->Put(cf, field_key.ConstructFieldMeta(), field_val);
s = batch->Put(cf, field_key.ConstructFieldMeta(), field_val);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

if (auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); !s.ok()) {
Expand Down Expand Up @@ -231,16 +240,28 @@ struct IndexManager {

auto batch = storage->GetWriteBatchBase();

batch->Delete(cf, index_key.ConstructIndexMeta());
batch->Delete(cf, index_key.ConstructIndexPrefixes());
auto s = batch->Delete(cf, index_key.ConstructIndexMeta());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
s = batch->Delete(cf, index_key.ConstructIndexPrefixes());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

auto begin = index_key.ConstructAllFieldMetaBegin();
auto end = index_key.ConstructAllFieldMetaEnd();
batch->DeleteRange(cf, begin, end);
s = batch->DeleteRange(cf, begin, end);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

begin = index_key.ConstructAllFieldDataBegin();
end = index_key.ConstructAllFieldDataEnd();
batch->DeleteRange(cf, begin, end);
s = batch->DeleteRange(cf, begin, end);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

auto no_txn_ctx = engine::Context::NoTransactionContext(storage);
if (auto s = storage->Write(no_txn_ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); !s.ok()) {
Expand Down
20 changes: 16 additions & 4 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,19 @@ Status IndexUpdater::UpdateTagIndex(engine::Context &ctx, std::string_view key,
for (const auto &tag : tags_to_delete) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

batch->Delete(cf_handle, index_key);
auto s = batch->Delete(cf_handle, index_key);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

for (const auto &tag : tags_to_add) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

batch->Put(cf_handle, index_key, Slice());
auto s = batch->Put(cf_handle, index_key, Slice());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
Expand All @@ -253,13 +259,19 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context &ctx, std::string_view k
if (!original.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(original.Get<kqir::Numeric>(), key);

batch->Delete(cf_handle, index_key);
auto s = batch->Delete(cf_handle, index_key);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

if (!current.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(current.Get<kqir::Numeric>(), key);

batch->Put(cf_handle, index_key, Slice());
auto s = batch->Put(cf_handle, index_key, Slice());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
}
auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
Expand Down
3 changes: 2 additions & 1 deletion src/server/namespace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ bool Namespace::IsAllowModify() const {
Status Namespace::loadFromDB(std::map<std::string, std::string>* db_tokens) const {
std::string value;
engine::Context ctx(storage_);
auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf_, kNamespaceDBKey, &value);
auto cf = storage_->GetCFHandle(ColumnFamilyID::Propagate);
auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf, kNamespaceDBKey, &value);
if (!s.ok()) {
if (s.IsNotFound()) return Status::OK();
return {Status::NotOK, s.ToString()};
Expand Down
4 changes: 1 addition & 3 deletions src/server/namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ constexpr const char *kNamespaceDBKey = "__namespace_keys__";

class Namespace {
public:
explicit Namespace(engine::Storage *storage)
: storage_(storage), cf_(storage_->GetCFHandle(ColumnFamilyID::Propagate)) {}
explicit Namespace(engine::Storage *storage) : storage_(storage) {}

~Namespace() = default;
Namespace(const Namespace &) = delete;
Expand All @@ -45,7 +44,6 @@ class Namespace {

private:
engine::Storage *storage_;
rocksdb::ColumnFamilyHandle *cf_ = nullptr;

std::shared_mutex tokens_mu_;
// mapping from token to namespace name
Expand Down
10 changes: 6 additions & 4 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1353,14 +1353,16 @@ void Server::PrepareRestoreDB() {
// accessing, data migration task should be stopped before restoring DB
WaitNoMigrateProcessing();

// Workers will disallow to run commands which may access DB, so we should
// enable this flag to stop workers from running new commands. And wait for
// the exclusive guard to be released to guarantee no worker is running.
is_loading_ = true;

// To guarantee work threads don't access DB, we should release 'ExclusivityGuard'
// ASAP to avoid user can't receive responses for long time, because the following
// 'CloseDB' may cost much time to acquire DB mutex.
LOG(INFO) << "[server] Waiting workers for finishing executing commands...";
{
auto exclusivity = WorkExclusivityGuard();
is_loading_ = true;
}
{ auto exclusivity = WorkExclusivityGuard(); }

// Cron thread, compaction checker thread, full synchronization thread
// may always run in the background, we need to close db, so they don't actually work.
Expand Down
8 changes: 4 additions & 4 deletions src/storage/rdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,14 +483,14 @@ Status RDB::saveRdbObject(engine::Context &ctx, int type, const std::string &key
} else if (type == RDBTypeHash || type == RDBTypeHashListPack || type == RDBTypeHashZipList ||
type == RDBTypeHashZipMap) {
const auto &entries = std::get<std::map<std::string, std::string>>(obj);
std::vector<FieldValue> filed_values;
filed_values.reserve(entries.size());
std::vector<FieldValue> field_values;
field_values.reserve(entries.size());
for (const auto &entry : entries) {
filed_values.emplace_back(entry.first, entry.second);
field_values.emplace_back(entry.first, entry.second);
}
redis::Hash hash_db(storage_, ns_);
uint64_t count = 0;
db_status = hash_db.MSet(ctx, key, filed_values, false /*nx*/, &count);
db_status = hash_db.MSet(ctx, key, field_values, false /*nx*/, &count);
} else if (type == RDBTypeList || type == RDBTypeListZipList || type == RDBTypeListQuickList ||
type == RDBTypeListQuickList2) {
const auto &elements = std::get<std::vector<std::string>>(obj);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class RDB {
Status SaveZSetObject(const std::vector<MemberScore> &member_scores);

// Hash
Status SaveHashObject(const std::vector<FieldValue> &filed_value);
Status SaveHashObject(const std::vector<FieldValue> &field_value);

private:
engine::Storage *storage_;
Expand Down
Loading

0 comments on commit df58c47

Please sign in to comment.