Skip to content

Commit

Permalink
add read/write lock to unlock ha parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxxen committed Mar 20, 2024
1 parent 1d3e1dd commit 5edbd3e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
51 changes: 37 additions & 14 deletions src/hnsw/hnsw_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,12 @@ struct HNSWIndexScanState : public IndexScanState {
unique_array<row_t> row_ids;
};

unique_ptr<IndexScanState> HNSWIndex::InitializeScan(float *query_vector, idx_t limit) const {
unique_ptr<IndexScanState> HNSWIndex::InitializeScan(float *query_vector, idx_t limit) {
auto state = make_uniq<HNSWIndexScanState>();

// Acquire a shared lock to search the index
auto lock = rwlock.GetSharedLock();

auto search_result = index.search(query_vector, limit);

state->current_row = 0;
Expand All @@ -261,6 +265,9 @@ idx_t HNSWIndex::Scan(IndexScanState &state, Vector &result) {
}

void HNSWIndex::CommitDrop(IndexLock &index_lock) {
// Acquire an exclusive lock to drop the index
auto lock = rwlock.GetExclusiveLock();

index.reset();
// TODO: Maybe we can drop these much earlier?
linked_block_allocator->Reset();
Expand All @@ -282,22 +289,31 @@ void HNSWIndex::Construct(DataChunk &input, Vector &row_ids, idx_t thread_idx) {
auto vec_child_data = FlatVector::GetData<float>(vec_child_vec);
auto rowid_data = FlatVector::GetData<row_t>(row_ids);

// Even if .add is threadsafe, it is not threadsafe in combination with .reserve
// E.g. we need to have enough capacity up front before we can parallelize. If we reserve while adding it will crash.
// So we need to lock here.
// An idea: Buffer everything into a ColumnDataCollection and then add it all at in parallel in the end once we have reserved enough space.
static mutex hnsw_index_mutex;
lock_guard<mutex> lock(hnsw_index_mutex);
bool need_to_resize = false;
// Check if we need to resize the index
{
auto lock = rwlock.GetSharedLock();
need_to_resize = index.capacity() < index.size() + count;
}

if(!index.reserve(NextPowerOfTwo(index.size() + count))) {
throw InternalException("Failed to reserve space in the HNSW index");
if(need_to_resize) {
// Acquire an exclusive lock to resize the index
auto lock = rwlock.GetExclusiveLock();
// Check if we still need to resize
if (index.capacity() < index.size() + count) {
index.reserve(NextPowerOfTwo(index.size() + count));
}
}

for (idx_t out_idx = 0; out_idx < count; out_idx++) {
auto rowid = rowid_data[out_idx];
auto result = index.add(rowid, vec_child_data + (out_idx * array_size), thread_idx);
if(!result) {
throw InternalException("Failed to add to the HNSW index: %s", result.error.what());
// For inserting into the index, we just need a shared lock
{
auto lock = rwlock.GetSharedLock();
for (idx_t out_idx = 0; out_idx < count; out_idx++) {
auto rowid = rowid_data[out_idx];
auto result = index.add(rowid, vec_child_data + (out_idx * array_size), thread_idx);
if (!result) {
throw InternalException("Failed to add to the HNSW index: %s", result.error.what());
}
}
}
}
Expand All @@ -314,6 +330,10 @@ void HNSWIndex::Delete(IndexLock &lock, DataChunk &input, Vector &rowid_vec) {
auto count = input.size();
rowid_vec.Flatten(count);
auto row_id_data = FlatVector::GetData<row_t>(rowid_vec);

// For deleting from the index, we need an exclusive lock
auto _lock = rwlock.GetExclusiveLock();

for(idx_t i = 0; i < input.size(); i++) {
auto result = index.remove(row_id_data[i]);
}
Expand All @@ -338,6 +358,9 @@ void HNSWIndex::VerifyAppend(DataChunk &chunk, ConflictManager &conflict_manager
}

void HNSWIndex::PersistToDisk() {
// Acquire an exclusive lock to persist the index
auto lock = rwlock.GetExclusiveLock();

// Write

if (root_block_ptr.Get() == 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/hnsw/hnsw_index_physical_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ SinkResultType PhysicalCreateHNSWIndex::Sink(ExecutionContext &context, DataChun
throw NotImplementedException("Custom index creation only supported for single-column indexes");
}



auto &row_identifiers = chunk.data[1];

// Construct the index
Expand Down
6 changes: 5 additions & 1 deletion src/include/hnsw/hnsw_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

namespace duckdb {

class StorageLock;

class HNSWIndex : public Index {
public:
// The type name of the HNSWIndex
Expand All @@ -31,7 +33,7 @@ class HNSWIndex : public Index {
//! The allocator used to persist linked blocks
unique_ptr<FixedSizeAllocator> linked_block_allocator;

unique_ptr<IndexScanState> InitializeScan(float *query_vector, idx_t limit) const;
unique_ptr<IndexScanState> InitializeScan(float *query_vector, idx_t limit);
idx_t Scan(IndexScanState &state, Vector &result);

idx_t GetVectorSize() const;
Expand Down Expand Up @@ -80,6 +82,8 @@ class HNSWIndex : public Index {
DataChunk &input) override {
return "Constraint violation in HNSW index";
}
private:
StorageLock rwlock;
};

} // namespace duckdb

0 comments on commit 5edbd3e

Please sign in to comment.