Skip to content

Commit

Permalink
Merge bitcoin#28955: index: block filters sync, reduce disk read oper…
Browse files Browse the repository at this point in the history
…ations by caching last header

99afb9d refactor: init, simplify index shutdown code (furszy)
0faafb5 index: decrease ThreadSync cs_main contention (furszy)
f1469eb index: cache last block filter header (furszy)
a6756ec index: blockfilter, decouple header lookup into its own function (furszy)
331f044 index: blockfilter, decouple Write into its own function (furszy)
bcbd7eb bench: basic block filter index initial sync (furszy)

Pull request description:

  Work decoupled from bitcoin#26966 per request.

  The aim is to remove an unnecessary disk read operation that currently takes place with every new arriving block (or scanned block during background sync). Instead of reading the last filter header from disk merely to access its hash for constructing the next filter, this work caches it, occupying just 32 more bytes in memory.

  Also, reduces `cs_main` lock contention during the index initial sync process. And, simplifies the indexes initialization and shutdown procedure.

  Testing Note:
  To compare the changes, added a pretty basic benchmark in the second commit. Alternatively, could also test the changes by timing the block filter sync from scratch on any network; start the node with `-blockfilterindex` and monitor the logs until the syncing process finish.

  Local Benchmark Results:

  *Master (c252a0f):
  |               ns/op |                op/s |    err% |     total | benchmark
  |--------------------:|--------------------:|--------:|----------:|:----------
  |      132,042,516.60 |                7.57 |    0.3% |      7.79 | `BlockFilterIndexSync`

  *PR (43a212c):
  |               ns/op |                op/s |    err% |     total | benchmark
  |--------------------:|--------------------:|--------:|----------:|:----------
  |      126,915,841.60 |                7.88 |    0.6% |      7.51 | `BlockFilterIndexSync`

ACKs for top commit:
  Sjors:
    re-ACK 99afb9d
  achow101:
    ACK 99afb9d
  TheCharlatan:
    Re-ACK 99afb9d
  andrewtoth:
    ACK 99afb9d

Tree-SHA512: 927daadd68f4ee1ca781a89519539b895f5185a76ebaf525fbc246ea8dcf40d44a82def00ac34b188640802844b312270067f1b33e65a2479e06be9169c616de
  • Loading branch information
achow101 committed Mar 20, 2024
2 parents 5b9831a + 99afb9d commit 0b96a19
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 59 deletions.
1 change: 1 addition & 0 deletions src/Makefile.bench.include
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ bench_bench_bitcoin_SOURCES = \
bench/examples.cpp \
bench/gcs_filter.cpp \
bench/hashpadding.cpp \
bench/index_blockfilter.cpp \
bench/load_external.cpp \
bench/lockedpool.cpp \
bench/logging.cpp \
Expand Down
43 changes: 43 additions & 0 deletions src/bench/index_blockfilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2023-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://www.opensource.org/licenses/mit-license.php.

#include <bench/bench.h>

#include <addresstype.h>
#include <index/blockfilterindex.h>
#include <node/chainstate.h>
#include <node/context.h>
#include <test/util/setup_common.h>
#include <util/strencodings.h>

// Very simple block filter index sync benchmark, only using coinbase outputs.
static void BlockFilterIndexSync(benchmark::Bench& bench)
{
const auto test_setup = MakeNoLogFileContext<TestChain100Setup>();

// Create more blocks
int CHAIN_SIZE = 600;
CPubKey pubkey{ParseHex("02ed26169896db86ced4cbb7b3ecef9859b5952825adbeab998fb5b307e54949c9")};
CScript script = GetScriptForDestination(WitnessV0KeyHash(pubkey));
std::vector<CMutableTransaction> noTxns;
for (int i = 0; i < CHAIN_SIZE - 100; i++) {
test_setup->CreateAndProcessBlock(noTxns, script);
SetMockTime(GetTime() + 1);
}
assert(WITH_LOCK(::cs_main, return test_setup->m_node.chainman->ActiveHeight() == CHAIN_SIZE));

bench.minEpochIterations(5).run([&] {
BlockFilterIndex filter_index(interfaces::MakeChain(test_setup->m_node), BlockFilterType::BASIC,
/*n_cache_size=*/0, /*f_memory=*/false, /*f_wipe=*/true);
assert(filter_index.Init());
assert(!filter_index.BlockUntilSyncedToCurrentChain());
filter_index.Sync();

IndexSummary summary = filter_index.GetSummary();
assert(summary.synced);
assert(summary.best_block_hash == WITH_LOCK(::cs_main, return test_setup->m_node.chainman->ActiveTip()->GetBlockHash()));
});
}

BENCHMARK(BlockFilterIndexSync, benchmark::PriorityLevel::HIGH);
33 changes: 15 additions & 18 deletions src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain&
return chain.Next(chain.FindFork(pindex_prev));
}

void BaseIndex::ThreadSync()
void BaseIndex::Sync()
{
const CBlockIndex* pindex = m_best_block_index.load();
if (!m_synced) {
Expand All @@ -159,23 +159,20 @@ void BaseIndex::ThreadSync()
return;
}

{
LOCK(cs_main);
const CBlockIndex* pindex_next = NextSyncBlock(pindex, m_chainstate->m_chain);
if (!pindex_next) {
SetBestBlockIndex(pindex);
m_synced = true;
// No need to handle errors in Commit. See rationale above.
Commit();
break;
}
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalErrorf("%s: Failed to rewind index %s to a previous chain tip",
__func__, GetName());
return;
}
pindex = pindex_next;
const CBlockIndex* pindex_next = WITH_LOCK(cs_main, return NextSyncBlock(pindex, m_chainstate->m_chain));
if (!pindex_next) {
SetBestBlockIndex(pindex);
m_synced = true;
// No need to handle errors in Commit. See rationale above.
Commit();
break;
}
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalErrorf("%s: Failed to rewind index %s to a previous chain tip", __func__, GetName());
return;
}
pindex = pindex_next;


auto current_time{std::chrono::steady_clock::now()};
if (last_log_time + SYNC_LOG_INTERVAL < current_time) {
Expand Down Expand Up @@ -394,7 +391,7 @@ bool BaseIndex::StartBackgroundSync()
{
if (!m_init) throw std::logic_error("Error: Cannot start a non-initialized index");

m_thread_sync = std::thread(&util::TraceThread, GetName(), [this] { ThreadSync(); });
m_thread_sync = std::thread(&util::TraceThread, GetName(), [this] { Sync(); });
return true;
}

Expand Down
16 changes: 8 additions & 8 deletions src/index/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ class BaseIndex : public CValidationInterface
std::thread m_thread_sync;
CThreadInterrupt m_interrupt;

/// Sync the index with the block index starting from the current best block.
/// Intended to be run in its own thread, m_thread_sync, and can be
/// interrupted with m_interrupt. Once the index gets in sync, the m_synced
/// flag is set and the BlockConnected ValidationInterface callback takes
/// over and the sync thread exits.
void ThreadSync();

/// Write the current index state (eg. chain block locator and subclass-specific items) to disk.
///
/// Recommendations for error handling:
Expand Down Expand Up @@ -152,9 +145,16 @@ class BaseIndex : public CValidationInterface
/// validation interface so that it stays in sync with blockchain updates.
[[nodiscard]] bool Init();

/// Starts the initial sync process.
/// Starts the initial sync process on a background thread.
[[nodiscard]] bool StartBackgroundSync();

/// Sync the index with the block index starting from the current best block.
/// Intended to be run in its own thread, m_thread_sync, and can be
/// interrupted with m_interrupt. Once the index gets in sync, the m_synced
/// flag is set and the BlockConnected ValidationInterface callback takes
/// over and the sync thread exits.
void Sync();

/// Stops the instance from staying in sync with blockchain updates.
void Stop();

Expand Down
57 changes: 39 additions & 18 deletions src/index/blockfilterindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ bool BlockFilterIndex::CustomInit(const std::optional<interfaces::BlockKey>& blo
m_next_filter_pos.nFile = 0;
m_next_filter_pos.nPos = 0;
}

if (block) {
auto op_last_header = ReadFilterHeader(block->height, block->hash);
if (!op_last_header) {
LogError("Cannot read last block filter header; index may be corrupted\n");
return false;
}
m_last_header = *op_last_header;
}

return true;
}

Expand Down Expand Up @@ -222,10 +232,25 @@ size_t BlockFilterIndex::WriteFilterToDisk(FlatFilePos& pos, const BlockFilter&
return data_size;
}

std::optional<uint256> BlockFilterIndex::ReadFilterHeader(int height, const uint256& expected_block_hash)
{
std::pair<uint256, DBVal> read_out;
if (!m_db->Read(DBHeightKey(height), read_out)) {
return std::nullopt;
}

if (read_out.first != expected_block_hash) {
LogError("%s: previous block header belongs to unexpected block %s; expected %s\n",
__func__, read_out.first.ToString(), expected_block_hash.ToString());
return std::nullopt;
}

return read_out.second.header;
}

bool BlockFilterIndex::CustomAppend(const interfaces::BlockInfo& block)
{
CBlockUndo block_undo;
uint256 prev_header;

if (block.height > 0) {
// pindex variable gives indexing code access to node internals. It
Expand All @@ -234,34 +259,28 @@ bool BlockFilterIndex::CustomAppend(const interfaces::BlockInfo& block)
if (!m_chainstate->m_blockman.UndoReadFromDisk(block_undo, *pindex)) {
return false;
}

std::pair<uint256, DBVal> read_out;
if (!m_db->Read(DBHeightKey(block.height - 1), read_out)) {
return false;
}

uint256 expected_block_hash = *Assert(block.prev_hash);
if (read_out.first != expected_block_hash) {
LogError("%s: previous block header belongs to unexpected block %s; expected %s\n",
__func__, read_out.first.ToString(), expected_block_hash.ToString());
return false;
}

prev_header = read_out.second.header;
}

BlockFilter filter(m_filter_type, *Assert(block.data), block_undo);

const uint256& header = filter.ComputeHeader(m_last_header);
bool res = Write(filter, block.height, header);
if (res) m_last_header = header; // update last header
return res;
}

bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header)
{
size_t bytes_written = WriteFilterToDisk(m_next_filter_pos, filter);
if (bytes_written == 0) return false;

std::pair<uint256, DBVal> value;
value.first = block.hash;
value.first = filter.GetBlockHash();
value.second.hash = filter.GetHash();
value.second.header = filter.ComputeHeader(prev_header);
value.second.header = filter_header;
value.second.pos = m_next_filter_pos;

if (!m_db->Write(DBHeightKey(block.height), value)) {
if (!m_db->Write(DBHeightKey(block_height), value)) {
return false;
}

Expand Down Expand Up @@ -315,6 +334,8 @@ bool BlockFilterIndex::CustomRewind(const interfaces::BlockKey& current_tip, con
batch.Write(DB_FILTER_POS, m_next_filter_pos);
if (!m_db->WriteBatch(batch)) return false;

// Update cached header
m_last_header = *Assert(ReadFilterHeader(new_tip.height, new_tip.hash));
return true;
}

Expand Down
7 changes: 7 additions & 0 deletions src/index/blockfilterindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,15 @@ class BlockFilterIndex final : public BaseIndex
/** cache of block hash to filter header, to avoid disk access when responding to getcfcheckpt. */
std::unordered_map<uint256, uint256, FilterHeaderHasher> m_headers_cache GUARDED_BY(m_cs_headers_cache);

// Last computed header to avoid disk reads on every new block.
uint256 m_last_header{};

bool AllowPrune() const override { return true; }

bool Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header);

std::optional<uint256> ReadFilterHeader(int height, const uint256& expected_block_hash);

protected:
bool CustomInit(const std::optional<interfaces::BlockKey>& block) override;

Expand Down
21 changes: 6 additions & 15 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,8 @@ void Interrupt(NodeContext& node)
InterruptMapPort();
if (node.connman)
node.connman->Interrupt();
if (g_txindex) {
g_txindex->Interrupt();
}
ForEachBlockFilterIndex([](BlockFilterIndex& index) { index.Interrupt(); });
if (g_coin_stats_index) {
g_coin_stats_index->Interrupt();
for (auto* index : node.indexes) {
index->Interrupt();
}
}

Expand Down Expand Up @@ -337,16 +333,11 @@ void Shutdown(NodeContext& node)
if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks();

// Stop and delete all indexes only after flushing background callbacks.
if (g_txindex) {
g_txindex->Stop();
g_txindex.reset();
}
if (g_coin_stats_index) {
g_coin_stats_index->Stop();
g_coin_stats_index.reset();
}
ForEachBlockFilterIndex([](BlockFilterIndex& index) { index.Stop(); });
for (auto* index : node.indexes) index->Stop();
if (g_txindex) g_txindex.reset();
if (g_coin_stats_index) g_coin_stats_index.reset();
DestroyAllBlockFilterIndexes();
node.indexes.clear(); // all instances are nullptr now

// Any future callbacks will be dropped. This should absolutely be safe - if
// missing a callback results in an unrecoverable situation, unclean shutdown
Expand Down

0 comments on commit 0b96a19

Please sign in to comment.