Skip to content

Commit

Permalink
fix: merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
PokIsemaine committed Jul 15, 2024
1 parent 452a631 commit bd67a66
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 26 deletions.
17 changes: 13 additions & 4 deletions src/search/hnsw_indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ namespace redis {
HnswNode::HnswNode(NodeKey key, uint16_t level) : key(std::move(key)), level(level) {}

StatusOr<HnswNodeFieldMetadata> HnswNode::DecodeMetadata(const SearchKey& search_key, engine::Storage* storage) const {
// TODO: ctx?
engine::Context ctx(storage);
auto node_index_key = search_key.ConstructHnswNode(level, key);
rocksdb::PinnableSlice value;
auto s = storage->Get(rocksdb::ReadOptions(), storage->GetCFHandle(ColumnFamilyID::Search), node_index_key, &value);
auto s =
storage->Get(ctx, ctx.GetReadOptions(), storage->GetCFHandle(ColumnFamilyID::Search), node_index_key, &value);
if (!s.ok()) return {Status::NotOK, s.ToString()};

HnswNodeFieldMetadata metadata;
Expand All @@ -58,7 +61,9 @@ void HnswNode::PutMetadata(HnswNodeFieldMetadata* node_meta, const SearchKey& se
void HnswNode::DecodeNeighbours(const SearchKey& search_key, engine::Storage* storage) {
neighbours.clear();
auto edge_prefix = search_key.ConstructHnswEdgeWithSingleEnd(level, key);
util::UniqueIterator iter(storage, storage->DefaultScanOptions(), ColumnFamilyID::Search);
// TODO: ctx?
engine::Context ctx(storage);
util::UniqueIterator iter(ctx, storage, storage->DefaultScanOptions(), ColumnFamilyID::Search);
for (iter->Seek(edge_prefix); iter->Valid(); iter->Next()) {
if (!iter->key().starts_with(edge_prefix)) {
break;
Expand Down Expand Up @@ -185,7 +190,9 @@ uint16_t HnswIndex::RandomizeLayer() {

StatusOr<HnswIndex::NodeKey> HnswIndex::DefaultEntryPoint(uint16_t level) const {
auto prefix = search_key.ConstructHnswLevelNodePrefix(level);
util::UniqueIterator it(storage, storage->DefaultScanOptions(), ColumnFamilyID::Search);
// TODO: ctx?
engine::Context ctx(storage);
util::UniqueIterator it(ctx, storage, storage->DefaultScanOptions(), ColumnFamilyID::Search);
it->Seek(prefix);

Slice node_key;
Expand Down Expand Up @@ -515,7 +522,9 @@ Status HnswIndex::DeleteVectorEntry(std::string_view key, ObserverOrUniquePtr<ro

auto has_other_nodes_at_level = [&](uint16_t level, std::string_view skip_key) -> bool {
auto prefix = search_key.ConstructHnswLevelNodePrefix(level);
util::UniqueIterator it(storage, storage->DefaultScanOptions(), ColumnFamilyID::Search);
// TODO: ctx?
engine::Context ctx(storage);
util::UniqueIterator it(ctx, storage, storage->DefaultScanOptions(), ColumnFamilyID::Search);
it->Seek(prefix);

Slice node_key;
Expand Down
7 changes: 5 additions & 2 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,20 @@ Status IndexUpdater::UpdateHnswVectorIndex(std::string_view key, const kqir::Val
auto storage = indexer->storage;
auto hnsw = HnswIndex(search_key, vector, storage);

// TODO: ctx?
engine::Context ctx(storage);

if (!original.IsNull()) {
auto batch = storage->GetWriteBatchBase();
GET_OR_RET(hnsw.DeleteVectorEntry(key, batch));
auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
}

if (!current.IsNull()) {
auto batch = storage->GetWriteBatchBase();
GET_OR_RET(hnsw.InsertVectorEntry(key, current.Get<kqir::NumericArray>(), batch));
auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
}

Expand Down
3 changes: 2 additions & 1 deletion src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,8 @@ std::string Database::AppendNamespacePrefix(const Slice &user_key) {
return ComposeNamespaceKey(namespace_, user_key, storage_->IsSlotIdEncoded());
}

rocksdb::Status Database::ClearKeysOfSlotRange(engine::Context &ctx, const rocksdb::Slice &ns, const SlotRange &slot_range) {
rocksdb::Status Database::ClearKeysOfSlotRange(engine::Context &ctx, const rocksdb::Slice &ns,
const SlotRange &slot_range) {
if (!storage_->IsSlotIdEncoded()) {
return rocksdb::Status::Aborted("It is not in cluster mode");
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ class Database {
std::string *end_cursor = nullptr, RedisType type = kRedisNone);
[[nodiscard]] rocksdb::Status RandomKey(engine::Context &ctx, const std::string &cursor, std::string *key);
std::string AppendNamespacePrefix(const Slice &user_key);
[[nodiscard]] rocksdb::Status ClearKeysOfSlotRange(engine::Context &ctx, const rocksdb::Slice &ns, const SlotRange &slot_range);
[[nodiscard]] rocksdb::Status ClearKeysOfSlotRange(engine::Context &ctx, const rocksdb::Slice &ns,
const SlotRange &slot_range);
[[nodiscard]] rocksdb::Status KeyExist(engine::Context &ctx, const std::string &key);

// Copy <key,value> to <new_key,value> (already an internal key)
Expand Down
29 changes: 16 additions & 13 deletions tests/cppunit/hnsw_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ TEST_F(HnswIndexTest, DecodeNodesToVectorItems) {
node1.PutMetadata(&metadata1, hnsw_index->search_key, hnsw_index->storage, batch.Get());
node2.PutMetadata(&metadata2, hnsw_index->search_key, hnsw_index->storage, batch.Get());
node3.PutMetadata(&metadata3, hnsw_index->search_key, hnsw_index->storage, batch.Get());
auto s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
engine::Context ctx(storage_.get());
auto s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

std::vector<std::string> keys = {node_key1, node_key2, node_key3};
Expand Down Expand Up @@ -265,7 +266,8 @@ TEST_F(HnswIndexTest, SearchLayer) {
node3.PutMetadata(&metadata3, hnsw_index->search_key, hnsw_index->storage, batch.Get());
node4.PutMetadata(&metadata4, hnsw_index->search_key, hnsw_index->storage, batch.Get());
node5.PutMetadata(&metadata5, hnsw_index->search_key, hnsw_index->storage, batch.Get());
auto s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
engine::Context ctx(storage_.get());
auto s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

// Add Neighbours
Expand All @@ -286,7 +288,7 @@ TEST_F(HnswIndexTest, SearchLayer) {
ASSERT_TRUE(s7.IsOK());
auto s8 = node5.AddNeighbour("node3", hnsw_index->search_key, hnsw_index->storage, batch.Get());
ASSERT_TRUE(s8.IsOK());
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

redis::VectorItem target_vector;
Expand Down Expand Up @@ -347,14 +349,15 @@ TEST_F(HnswIndexTest, InsertAndDeleteVectorEntry) {
// Insert n1 into layer 1
uint16_t target_level = 1;
auto batch = storage_->GetWriteBatchBase();
engine::Context ctx(storage_.get());
auto s1 = hnsw_index->InsertVectorEntryInternal(key1, vec1, batch, target_level);
ASSERT_TRUE(s1.IsOK());
auto s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
auto s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

rocksdb::PinnableSlice value;
auto index_meta_key = hnsw_index->search_key.ConstructFieldMeta();
s = storage_->Get(rocksdb::ReadOptions(), hnsw_index->storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key,
s = storage_->Get(ctx, ctx.GetReadOptions(), hnsw_index->storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key,
&value);
ASSERT_TRUE(s.ok());
redis::HnswVectorFieldMetadata decoded_metadata;
Expand All @@ -378,11 +381,11 @@ TEST_F(HnswIndexTest, InsertAndDeleteVectorEntry) {
target_level = 3;
auto s4 = hnsw_index->InsertVectorEntryInternal(key2, vec2, batch, target_level);
ASSERT_TRUE(s4.IsOK());
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

index_meta_key = hnsw_index->search_key.ConstructFieldMeta();
s = storage_->Get(rocksdb::ReadOptions(), hnsw_index->storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key,
s = storage_->Get(ctx, ctx.GetReadOptions(), hnsw_index->storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key,
&value);
ASSERT_TRUE(s.ok());
decoded_metadata.Decode(&value);
Expand Down Expand Up @@ -423,11 +426,11 @@ TEST_F(HnswIndexTest, InsertAndDeleteVectorEntry) {
target_level = 2;
auto s7 = hnsw_index->InsertVectorEntryInternal(key3, vec3, batch, target_level);
ASSERT_TRUE(s7.IsOK());
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

index_meta_key = hnsw_index->search_key.ConstructFieldMeta();
s = storage_->Get(rocksdb::ReadOptions(), hnsw_index->storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key,
s = storage_->Get(ctx, ctx.GetReadOptions(), hnsw_index->storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key,
&value);
ASSERT_TRUE(s.ok());
decoded_metadata.Decode(&value);
Expand Down Expand Up @@ -458,7 +461,7 @@ TEST_F(HnswIndexTest, InsertAndDeleteVectorEntry) {
target_level = 1;
auto s10 = hnsw_index->InsertVectorEntryInternal(key4, vec4, batch, target_level);
ASSERT_TRUE(s10.IsOK());
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

redis::HnswNode node4_layer0(key4, 0);
Expand Down Expand Up @@ -498,7 +501,7 @@ TEST_F(HnswIndexTest, InsertAndDeleteVectorEntry) {
batch = storage_->GetWriteBatchBase();
auto s15 = hnsw_index->InsertVectorEntryInternal(key5, vec5, batch, target_level);
ASSERT_TRUE(s15.IsOK());
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

auto s16 = node2_layer1.DecodeMetadata(hnsw_index->search_key, hnsw_index->storage);
Expand Down Expand Up @@ -562,11 +565,11 @@ TEST_F(HnswIndexTest, InsertAndDeleteVectorEntry) {
batch = storage_->GetWriteBatchBase();
auto s22 = hnsw_index->DeleteVectorEntry(key2, batch);
ASSERT_TRUE(s22.IsOK());
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

index_meta_key = hnsw_index->search_key.ConstructFieldMeta();
s = storage_->Get(rocksdb::ReadOptions(), hnsw_index->storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key,
s = storage_->Get(ctx, ctx.GetReadOptions(), hnsw_index->storage->GetCFHandle(ColumnFamilyID::Search), index_meta_key,
&value);
ASSERT_TRUE(s.ok());
decoded_metadata.Decode(&value);
Expand Down
12 changes: 7 additions & 5 deletions tests/cppunit/hnsw_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ TEST_F(NodeTest, PutAndDecodeMetadata) {
node1.PutMetadata(&metadata1, search_key, storage_.get(), batch.Get());
node2.PutMetadata(&metadata2, search_key, storage_.get(), batch.Get());
node3.PutMetadata(&metadata3, search_key, storage_.get(), batch.Get());
auto s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
engine::Context ctx(storage_.get());
auto s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

auto decoded_metadata1 = node1.DecodeMetadata(search_key, storage_.get());
Expand Down Expand Up @@ -85,7 +86,7 @@ TEST_F(NodeTest, PutAndDecodeMetadata) {
batch->Put(storage_->GetCFHandle(ColumnFamilyID::Search), edge2, Slice());
batch->Put(storage_->GetCFHandle(ColumnFamilyID::Search), edge3, Slice());
batch->Put(storage_->GetCFHandle(ColumnFamilyID::Search), edge4, Slice());
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
ASSERT_TRUE(s.ok());

node1.DecodeNeighbours(search_key, storage_.get());
Expand Down Expand Up @@ -121,7 +122,8 @@ TEST_F(NodeTest, ModifyNeighbours) {
node2.PutMetadata(&metadata2, search_key, storage_.get(), batch1.Get());
node3.PutMetadata(&metadata3, search_key, storage_.get(), batch1.Get());
node4.PutMetadata(&metadata4, search_key, storage_.get(), batch1.Get());
auto s = storage_->Write(storage_->DefaultWriteOptions(), batch1->GetWriteBatch());
engine::Context ctx(storage_.get());
auto s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch1->GetWriteBatch());
ASSERT_TRUE(s.ok());

// Add Edges
Expand All @@ -134,7 +136,7 @@ TEST_F(NodeTest, ModifyNeighbours) {
ASSERT_TRUE(s3.IsOK());
auto s4 = node3.AddNeighbour("node2", search_key, storage_.get(), batch2.Get());
ASSERT_TRUE(s4.IsOK());
s = storage_->Write(storage_->DefaultWriteOptions(), batch2->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch2->GetWriteBatch());
ASSERT_TRUE(s.ok());

node1.DecodeNeighbours(search_key, storage_.get());
Expand All @@ -156,7 +158,7 @@ TEST_F(NodeTest, ModifyNeighbours) {
auto s5 = node2.RemoveNeighbour("node3", search_key, storage_.get(), batch3.Get());
ASSERT_TRUE(s5.IsOK());

s = storage_->Write(storage_->DefaultWriteOptions(), batch3->GetWriteBatch());
s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch3->GetWriteBatch());
ASSERT_TRUE(s.ok());

node2.DecodeNeighbours(search_key, storage_.get());
Expand Down

0 comments on commit bd67a66

Please sign in to comment.