Skip to content

Commit

Permalink
Implement simple durable Raft storage based on RocksDB
Browse files Browse the repository at this point in the history
This commit adds RocksDbStorage which implements raft::Storage.
The RocksDbStorage is a durable storage implementation which is
used by the RaftMetadataStore to store the raft state durably.

This fixes #1791.
  • Loading branch information
tillrohrmann committed Aug 12, 2024
1 parent 6b19d9d commit eadd1e9
Show file tree
Hide file tree
Showing 7 changed files with 501 additions and 94 deletions.
1 change: 1 addition & 0 deletions crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod grpc;
mod grpc_svc;
pub mod local;
pub mod raft;
mod util;

use bytestring::ByteString;
use restate_core::metadata_store::VersionedValue;
Expand Down
62 changes: 13 additions & 49 deletions crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use crate::{
MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender,
util, MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender,
};
use bytes::{BufMut, BytesMut};
use bytestring::ByteString;
Expand All @@ -23,7 +23,7 @@ use restate_types::config::{MetadataStoreOptions, RocksDbOptions};
use restate_types::live::BoxedLiveLoad;
use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode};
use restate_types::Version;
use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB};
use rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions, DB};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, trace};
Expand Down Expand Up @@ -56,13 +56,17 @@ impl LocalMetadataStore {
let db_name = DbName::new(DB_NAME);
let db_manager = RocksDbManager::get();
let cfs = vec![CfName::new(KV_PAIRS)];
let db_spec = DbSpecBuilder::new(db_name.clone(), options.data_dir(), db_options(options))
.add_cf_pattern(
CfPrefixPattern::ANY,
cf_options(options.rocksdb_memory_budget()),
)
.ensure_column_families(cfs)
.build_as_db();
let db_spec = DbSpecBuilder::new(
db_name.clone(),
options.data_dir(),
util::db_options(options),
)
.add_cf_pattern(
CfPrefixPattern::ANY,
util::cf_options(options.rocksdb_memory_budget()),
)
.ensure_column_families(cfs)
.build_as_db();

let db = db_manager
.open_db(updateable_rocksdb_options.clone(), db_spec)
Expand Down Expand Up @@ -295,43 +299,3 @@ impl LocalMetadataStore {
}
}
}

fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options {
rocksdb::Options::default()
}

fn cf_options(
memory_budget: usize,
) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static {
move |mut opts| {
set_memory_related_opts(&mut opts, memory_budget);
opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
opts.set_num_levels(3);

opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::None,
DBCompressionType::Zstd,
]);

//
opts
}
}

fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) {
// We set the budget to allow 1 mutable + 3 immutable.
opts.set_write_buffer_size(memtables_budget / 4);

// merge 2 memtables when flushing to L0
opts.set_min_write_buffer_number_to_merge(2);
opts.set_max_write_buffer_number(4);
// start flushing L0->L1 as soon as possible. each file on level0 is
// (memtable_memory_budget / 2). This will flush level 0 when it's bigger than
// memtable_memory_budget.
opts.set_level_zero_file_num_compaction_trigger(2);
// doesn't really matter much, but we don't want to create too many files
opts.set_target_file_size_base(memtables_budget as u64 / 8);
// make Level1 size equal to Level0 size, so that L0->L1 compactions are fast
opts.set_max_bytes_for_level_base(memtables_budget as u64);
}
2 changes: 1 addition & 1 deletion crates/metadata-store/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@
// by the Apache License, Version 2.0.

pub mod service;
mod store;
mod storage;
mod store;
2 changes: 1 addition & 1 deletion crates/metadata-store/src/raft/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl RaftMetadataStoreService {
impl MetadataStoreService for RaftMetadataStoreService {
async fn run(mut self) -> Result<(), Error> {
let store_options = self.options.live_load();
let store = RaftMetadataStore::new().map_err(Error::generic)?;
let store = RaftMetadataStore::create().await.map_err(Error::generic)?;

let mut builder = GrpcServiceBuilder::default();

Expand Down
Loading

0 comments on commit eadd1e9

Please sign in to comment.