From ae4ed057fc306d967f9cc816906f09fee668549d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Tue, 17 Sep 2024 10:45:22 -0700 Subject: [PATCH] cometindex: speedup by committing event changes in batches of 1000 (#4854) Instead of creating one transaction for each event we need to index, we instead only close this transaction every 1000 events (or when when we've caught up to the database). This gives about a 5x performance in catch up speed. ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > indexing only --- crates/util/cometindex/src/indexer.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/util/cometindex/src/indexer.rs b/crates/util/cometindex/src/indexer.rs index fa21a0af1d..962b845dfb 100644 --- a/crates/util/cometindex/src/indexer.rs +++ b/crates/util/cometindex/src/indexer.rs @@ -152,6 +152,7 @@ impl Indexer { let mut relevant_events = 0usize; let mut es = read_events(&src_db, watermark); + let mut dbtx = dst_db.begin().await?; while let Some(event) = es.next().await.transpose()? { if scanned_events % 1000 == 0 { tracing::info!(scanned_events, relevant_events); @@ -178,8 +179,6 @@ impl Indexer { relevant_events += 1; - // Otherwise we have something to process. Make a dbtx - let mut dbtx = dst_db.begin().await?; for index in indexes { if index.is_relevant(&event.as_ref().kind) { tracing::debug!(?event, ?index, "relevant to index"); @@ -188,8 +187,15 @@ impl Indexer { } // Mark that we got to at least this event update_watermark(&mut dbtx, event.local_rowid).await?; - dbtx.commit().await?; + // Only commit in batches of <= 1000 events, for about a 5x performance increase when + // catching up. + if relevant_events % 1000 == 0 { + dbtx.commit().await?; + dbtx = dst_db.begin().await?; + } } + // Flush out the remaining changes. + dbtx.commit().await?; Ok(()) }