Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: enable migration for substores #3412

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
# we must duplicate the rustflags from `.cargo/config.toml`.
RUSTFLAGS: "-D warnings --cfg tokio_unstable"
- name: Run tests with nextest
run: cargo nextest run --release
run: cargo nextest run --release --features migration
env:
CARGO_TERM_COLOR: always

Expand Down
24 changes: 18 additions & 6 deletions crates/storage/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,16 @@ impl Snapshot {
.await?
}

pub fn substore_version(
&self,
prefix: &Arc<store::substore::SubstoreConfig>,
) -> Option<jmt::Version> {
self.0.multistore_cache.get_version(prefix)
pub fn prefix_version(&self, prefix: &str) -> Result<Option<jmt::Version>> {
let config = self
.0
.multistore_cache
.config
.find_substore(prefix.as_bytes());
if prefix != config.prefix {
anyhow::bail!("requested substore (prefix={prefix}) does not exist")
}
Ok(self.substore_version(&config))
}

/// Returns the root hash of the subtree corresponding to the given prefix.
Expand All @@ -108,7 +113,7 @@ impl Snapshot {
// However, we do not want to mislead the caller by returning a root hash
// that does not correspond to the queried prefix, so we error out instead.
if prefix != config.prefix {
anyhow::bail!("requested substore does not exist")
anyhow::bail!("requested substore (prefix={prefix}) does not exist")
}

let version = self
Expand Down Expand Up @@ -137,6 +142,13 @@ impl Snapshot {
pub async fn root_hash(&self) -> Result<crate::RootHash> {
self.prefix_root_hash("").await
}

pub(crate) fn substore_version(
&self,
prefix: &Arc<store::substore::SubstoreConfig>,
) -> Option<jmt::Version> {
self.0.multistore_cache.get_version(prefix)
}
}

#[async_trait]
Expand Down
5 changes: 5 additions & 0 deletions crates/storage/src/snapshot_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ impl SnapshotCache {
.map(Clone::clone)
.filter(|s| s.version() == version)
}

/// Empties the cache.
pub fn clear(&mut self) {
self.cache.clear();
}
}

#[cfg(test)]
Expand Down
70 changes: 38 additions & 32 deletions crates/storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,20 +273,23 @@ impl Storage {
anyhow::bail!("version mismatch in commit: expected state forked from version {} but found state forked from version {}", old_version, snapshot.version());
}

self.commit_inner(snapshot, changes, new_version, true)
self.commit_inner(snapshot, changes, new_version, false)
.await
}

/// Commits the provided [`StateDelta`] to persistent storage as the latest
/// version of the chain state. If `write_to_snapshot_cache` is `false`, the
/// snapshot will not be written to the snapshot cache, and no subscribers
/// will be notified.
/// Commits the supplied [`Cache`] to persistent storage.
///
/// # Migrations
/// In the case of chain state migrations we need to commit the new state
/// without incrementing the version. If `perform_migration` is `true` the
/// snapshot will _not_ be written to the snapshot cache, and no subscribers
/// will be notified. Substore versions will not be updated.
async fn commit_inner(
&self,
snapshot: Snapshot,
cache: Cache,
version: jmt::Version,
write_to_snapshot_cache: bool,
perform_migration: bool,
) -> Result<crate::RootHash> {
tracing::debug!(new_jmt_version = ?version, "committing state delta");
let mut changes_by_substore = cache.shard_by_prefix(&self.0.multistore_config);
Expand All @@ -303,23 +306,24 @@ impl Storage {
// its own changes to the batch, and we will commit it at the end.
let mut write_batch = rocksdb::WriteBatch::default();

// Note(erwan): since we work over sharded keyspaces/disjoint column families,
// it is tempting to consider a rewrite of this loop into a [`tokio::task::JoinSet`],
// however there are some complications that should be on your radar:
// * overhead: at the time of writing, there is a single digit number of substores,
// so it's implausible that the overhead of a joinset would be worth it.
// * atomicity: unfortunately, `WriteBatch`es are not thread safe, this means that
// to spin-up N tasks, we would either need to:
// Option A: use a single batch, and synchronize access to it between tasks.
// if the number of substore contention grows with the number of substores,
// which is likely the case if you're considering a joinset.
// Option B: use N batches, and find a way to commit to them atomically.
// (better, but not supported) RocksDB does not allow merging batches
// together, and though [`rocksdb::OptimisticTransactionDB`] offers an
// ACID API, it is not compatible with the [`rocksdb::WriteBatch`] API.
// A last option is to relax atomicity constraints, so that each commit task can produce
// its own independent batch, and we can commit them all at once. This means that each batch
// write is atomic, but the overall commit is not.
// Note(erwan): Here, we iterate over each substore, and spawn a task to
// commit it. Since we know that the substore keyspace is disjoint, we
// could consider rewriting this loop into a [`tokio::task::JoinSet`],
// however consider that `rocksdb::WriteBatch` is _not_ thread-safe.
//
// This means that to spin-up N tasks, we would need to use a
// single batch wrapped in a mutex, or use N batches, and find
// a way to commit to them atomically. Since that is not supported
// by RocksDB, we would have to iterate over each entry in each
// batch, and merge them together.
//
// Another option is to trade atomicity for parallelism by producing
// N batches, and committing them in distinct atomic writes. This is
// dangerous because it could leave the node in an inconsistent state.
//
// Instead of doing that, we lean on the fact that the number of substores
// is small, and that the synchronization overhead of a joinset would exceed
// its benefits.
for config in self.0.multistore_config.iter() {
tracing::debug!(substore_prefix = ?config.prefix, "processing substore");
// If the substore is empty, we need to fetch its initialized version from the cache.
Expand All @@ -338,7 +342,11 @@ impl Storage {
continue;
};

let version = old_substore_version.wrapping_add(1);
let version = if perform_migration {
old_substore_version
} else {
old_substore_version.wrapping_add(1)
};
new_versions.push(version);
let substore_snapshot = SubstoreSnapshot {
config: config.clone(),
Expand Down Expand Up @@ -416,7 +424,7 @@ impl Storage {
multistore_versions.set_version(main_store_config, version);

/* hydrate the snapshot cache */
if !write_to_snapshot_cache {
if perform_migration {
tracing::debug!("skipping snapshot cache update");
return Ok(global_root_hash);
}
Expand Down Expand Up @@ -444,11 +452,10 @@ impl Storage {
#[cfg(feature = "migration")]
/// Commits the provided [`StateDelta`] to persistent storage without increasing the version
/// of the chain state.
/// TODO(erwan): with the addition of substores, we need to revisit this API.
pub async fn commit_in_place(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
let (snapshot, changes) = delta.flatten();
let old_version = self.latest_version();
self.commit_inner(snapshot, changes, old_version, false)
self.commit_inner(snapshot, changes, old_version, true)
.await
}

Expand All @@ -458,12 +465,12 @@ impl Storage {
self.0.db.clone()
}

#[cfg(test)]
/// Consumes the `Inner` storage and waits for all resources to be reclaimed.
/// Shuts down the database and the dispatcher task, and waits for all resources to be reclaimed.
/// Panics if there are still outstanding references to the `Inner` storage.
pub(crate) async fn release(mut self) {
pub async fn release(mut self) {
if let Some(inner) = Arc::get_mut(&mut self.0) {
inner.shutdown().await;
inner.snapshots.write().clear();
// `Inner` is dropped once the call completes.
} else {
panic!("Unable to get mutable reference to Inner");
Expand All @@ -472,8 +479,7 @@ impl Storage {
}

impl Inner {
#[cfg(test)]
pub async fn shutdown(&mut self) {
pub(crate) async fn shutdown(&mut self) {
if let Some(jh) = self.jh_dispatcher.take() {
jh.abort();
let _ = jh.await;
Expand Down
Loading
Loading