diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index be7c13968..da43f361e 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -12,6 +12,7 @@ mod grpc; mod grpc_svc; pub mod local; pub mod raft; +mod util; use bytestring::ByteString; use restate_core::metadata_store::VersionedValue; diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 42b453608..b9060fd3c 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use crate::{ - MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, + util, MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, }; use bytes::{BufMut, BytesMut}; use bytestring::ByteString; @@ -23,7 +23,7 @@ use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; use restate_types::Version; -use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB}; +use rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions, DB}; use std::sync::Arc; use tokio::sync::mpsc; use tracing::{debug, trace}; @@ -56,13 +56,17 @@ impl LocalMetadataStore { let db_name = DbName::new(DB_NAME); let db_manager = RocksDbManager::get(); let cfs = vec![CfName::new(KV_PAIRS)]; - let db_spec = DbSpecBuilder::new(db_name.clone(), options.data_dir(), db_options(options)) - .add_cf_pattern( - CfPrefixPattern::ANY, - cf_options(options.rocksdb_memory_budget()), - ) - .ensure_column_families(cfs) - .build_as_db(); + let db_spec = DbSpecBuilder::new( + db_name.clone(), + options.data_dir(), + util::db_options(options), + ) + .add_cf_pattern( + CfPrefixPattern::ANY, + util::cf_options(options.rocksdb_memory_budget()), + ) + .ensure_column_families(cfs) + .build_as_db(); let db = db_manager .open_db(updateable_rocksdb_options.clone(), db_spec) @@ -295,43 +299,3 @@ impl LocalMetadataStore { } } } - -fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { - rocksdb::Options::default() -} - -fn cf_options( - memory_budget: usize, -) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static { - move |mut opts| { - set_memory_related_opts(&mut opts, memory_budget); - opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - opts.set_num_levels(3); - - opts.set_compression_per_level(&[ - DBCompressionType::None, - DBCompressionType::None, - DBCompressionType::Zstd, - ]); - - // - opts - } -} - -fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) { - // We set the budget to allow 1 mutable + 3 immutable. - opts.set_write_buffer_size(memtables_budget / 4); - - // merge 2 memtables when flushing to L0 - opts.set_min_write_buffer_number_to_merge(2); - opts.set_max_write_buffer_number(4); - // start flushing L0->L1 as soon as possible. each file on level0 is - // (memtable_memory_budget / 2). This will flush level 0 when it's bigger than - // memtable_memory_budget. - opts.set_level_zero_file_num_compaction_trigger(2); - // doesn't really matter much, but we don't want to create too many files - opts.set_target_file_size_base(memtables_budget as u64 / 8); - // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast - opts.set_max_bytes_for_level_base(memtables_budget as u64); -} diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs index fcdb70cc0..7c14a5b27 100644 --- a/crates/metadata-store/src/raft/mod.rs +++ b/crates/metadata-store/src/raft/mod.rs @@ -9,5 +9,5 @@ // by the Apache License, Version 2.0. pub mod service; -mod store; mod storage; +mod store; diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs index 496320c67..e788c58c7 100644 --- a/crates/metadata-store/src/raft/service.rs +++ b/crates/metadata-store/src/raft/service.rs @@ -33,7 +33,7 @@ impl RaftMetadataStoreService { impl MetadataStoreService for RaftMetadataStoreService { async fn run(mut self) -> Result<(), Error> { let store_options = self.options.live_load(); - let store = RaftMetadataStore::new().map_err(Error::generic)?; + let store = RaftMetadataStore::create().await.map_err(Error::generic)?; let mut builder = GrpcServiceBuilder::default(); diff --git a/crates/metadata-store/src/raft/storage.rs b/crates/metadata-store/src/raft/storage.rs index 801707056..4bbfa07c1 100644 --- a/crates/metadata-store/src/raft/storage.rs +++ b/crates/metadata-store/src/raft/storage.rs @@ -8,45 +8,413 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::util; +use protobuf::{Message, ProtobufError}; +use raft::eraftpb::{ConfState, Entry, Snapshot}; +use raft::prelude::HardState; +use raft::{GetEntriesContext, RaftState, Storage, StorageError}; +use restate_rocksdb::{ + CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager, + RocksError, +}; +use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; +use restate_types::live::BoxedLiveLoad; +use rocksdb::{BoundColumnFamily, ReadOptions, WriteBatch, WriteOptions, DB}; +use std::mem::size_of; use std::sync::Arc; -use raft::{GetEntriesContext, RaftState, Storage}; -use raft::eraftpb::{Entry, Snapshot}; -use rocksdb::DB; -use restate_rocksdb::RocksDb; +use std::{error, mem}; -struct RocksDbStorage { +const DB_NAME: &str = "raft-metadata-store"; +const RAFT_CF: &str = "raft"; + +const FIRST_RAFT_INDEX: u64 = 1; + +const RAFT_ENTRY_DISCRIMINATOR: u8 = 0x01; +const HARD_STATE_DISCRIMINATOR: u8 = 0x02; +const CONF_STATE_DISCRIMINATOR: u8 = 0x03; + +const RAFT_ENTRY_KEY_LENGTH: usize = 9; + +#[derive(Debug, thiserror::Error)] +pub enum BuildError { + #[error("failed creating RocksDb: {0}")] + RocksDb(#[from] RocksError), +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed reading/writing from/to RocksDb: {0}")] + RocksDb(#[from] RocksError), + #[error("failed reading/writing from/to raw RocksDb: {0}")] + RocksDbRaw(#[from] rocksdb::Error), + #[error("failed encoding value: {0}")] + Encode(#[from] ProtobufError), + #[error("index '{index}' is out of bounds; last index is '{last_index}'")] + IndexOutOfBounds { index: u64, last_index: u64 }, + #[error("raft log has been compacted; first index is {0}")] + Compacted(u64), +} + +/// Map our internal error type to [`raft::Error`] to fit the [`Storage`] trait definition. +impl From for raft::Error { + fn from(value: Error) -> Self { + match value { + err @ Error::RocksDb(_) + | err @ Error::RocksDbRaw(_) + | err @ Error::IndexOutOfBounds { .. } => storage_error(err), + Error::Encode(err) => raft::Error::CodecError(err), + Error::Compacted(_) => raft::Error::Store(StorageError::Compacted), + } + } +} + +pub struct RocksDbStorage { db: Arc, rocksdb: Arc, + + last_index: u64, + buffer: Vec, } impl RocksDbStorage { - pub fn new() -> Self { - unimplemented!() + pub async fn create( + options: &MetadataStoreOptions, + rocksdb_options: BoxedLiveLoad, + ) -> Result { + let db_name = DbName::new(DB_NAME); + let db_manager = RocksDbManager::get(); + let cfs = vec![CfName::new(RAFT_CF)]; + let db_spec = DbSpecBuilder::new( + db_name.clone(), + options.data_dir(), + util::db_options(options), + ) + .add_cf_pattern( + CfPrefixPattern::ANY, + util::cf_options(options.rocksdb_memory_budget()), + ) + .ensure_column_families(cfs) + .build_as_db(); + + let db = db_manager.open_db(rocksdb_options, db_spec).await?; + let rocksdb = db_manager + .get_db(db_name) + .expect("raft metadata store db is open"); + + let last_index = Self::find_last_index(&db); + + Ok(Self { + db, + rocksdb, + last_index, + buffer: Vec::with_capacity(1024), + }) + } +} + +impl RocksDbStorage { + fn write_options(&self) -> WriteOptions { + let mut write_opts = WriteOptions::default(); + write_opts.disable_wal(false); + // always sync to not lose data + write_opts.set_sync(true); + write_opts + } + + fn find_last_index(db: &DB) -> u64 { + let cf = db.cf_handle(RAFT_CF).expect("RAFT_CF exists"); + let start = Self::raft_entry_key(0); + // end is exclusive so switch to the next discriminator + let mut end = [0; 9]; + end[0] = RAFT_ENTRY_DISCRIMINATOR + 1; + + let mut options = ReadOptions::default(); + options.set_async_io(true); + options.set_iterate_range(start..end); + let mut iterator = db.raw_iterator_cf_opt(&cf, options); + + iterator.seek_to_last(); + + if iterator.valid() { + let key_bytes = iterator.key().expect("key should be present"); + assert_eq!( + key_bytes.len(), + RAFT_ENTRY_KEY_LENGTH, + "raft entry keys must consist of '{}' bytes", + RAFT_ENTRY_KEY_LENGTH + ); + u64::from_be_bytes( + key_bytes[1..(1 + size_of::())] + .try_into() + .expect("buffer should be long enough"), + ) + } else { + // the first valid raft index starts at 1, so 0 means there are no replicated raft entries + 0 + } + } + + pub fn get_hard_state(&self) -> Result { + let key = Self::hard_state_key(); + self.get_value(key) + .map(|hard_state| hard_state.unwrap_or_default()) + } + + pub async fn store_hard_state(&mut self, hard_state: HardState) -> Result<(), Error> { + let key = Self::hard_state_key(); + self.put_value(key, hard_state).await + } + + pub fn get_conf_state(&self) -> Result { + let key = Self::conf_state_key(); + self.get_value(key) + .map(|hard_state| hard_state.unwrap_or_default()) + } + + pub async fn store_conf_state(&mut self, conf_state: ConfState) -> Result<(), Error> { + let key = Self::conf_state_key(); + self.put_value(key, conf_state).await + } + + pub fn get_entry(&self, idx: u64) -> Result, Error> { + let key = Self::raft_entry_key(idx); + self.get_value(key) + } + + fn get_value(&self, key: impl AsRef<[u8]>) -> Result, Error> { + let cf = self.get_cf_handle(); + let bytes = self.db.get_pinned_cf(&cf, key)?; + + if let Some(bytes) = bytes { + let mut value = T::default(); + value.merge_from_bytes(bytes.as_ref())?; + Ok(Some(value)) + } else { + Ok(None) + } + } + + async fn put_value( + &mut self, + key: impl AsRef<[u8]>, + value: T, + ) -> Result<(), Error> { + self.buffer.clear(); + value.write_to_vec(&mut self.buffer)?; + + let cf = self.get_cf_handle(); + let mut write_batch = WriteBatch::default(); + write_batch.put_cf(&cf, key.as_ref(), &self.buffer); + self.rocksdb + .write_batch( + "put_value", + Priority::High, + IoMode::Default, + self.write_options(), + write_batch, + ) + .await + .map_err(Into::into) + } + + pub async fn append(&mut self, entries: &Vec) -> Result<(), Error> { + let mut write_batch = WriteBatch::default(); + let mut buffer = mem::take(&mut self.buffer); + let mut last_index = self.last_index; + + { + let cf = self.get_cf_handle(); + + for entry in entries { + assert_eq!(last_index + 1, entry.index, "Expect raft log w/o holes"); + let key = Self::raft_entry_key(entry.index); + + buffer.clear(); + entry.write_to_vec(&mut buffer)?; + + write_batch.put_cf(&cf, key, &buffer); + last_index = entry.index; + } + } + + let result = self + .rocksdb + .write_batch( + "append", + Priority::High, + IoMode::Default, + self.write_options(), + write_batch, + ) + .await + .map_err(Into::into); + + self.buffer = buffer; + self.last_index = last_index; + + result + } + + fn get_cf_handle(&self) -> Arc { + self.db.cf_handle(RAFT_CF).expect("RAFT_CF exists") + } + + fn raft_entry_key(idx: u64) -> [u8; RAFT_ENTRY_KEY_LENGTH] { + let mut key = [0; RAFT_ENTRY_KEY_LENGTH]; + key[0] = RAFT_ENTRY_DISCRIMINATOR; + key[1..9].copy_from_slice(&idx.to_be_bytes()); + key + } + + fn hard_state_key() -> [u8; 1] { + [HARD_STATE_DISCRIMINATOR] + } + + fn conf_state_key() -> [u8; 1] { + [CONF_STATE_DISCRIMINATOR] + } + + fn check_index(&self, idx: u64) -> Result<(), Error> { + if idx < self.first_index() { + return Err(Error::Compacted(self.first_index())); + } else if idx > self.last_index() { + return Err(Error::IndexOutOfBounds { + index: idx, + last_index: self.last_index(), + }); + } + + Ok(()) + } + + fn check_range(&self, low: u64, high: u64) -> Result<(), Error> { + assert!(low < high, "Low '{low}' must be smaller than high '{high}'"); + + if low < self.first_index() { + return Err(Error::Compacted(self.first_index())); + } + + if high > self.last_index() + 1 { + return Err(Error::IndexOutOfBounds { + index: high, + last_index: self.last_index(), + }); + } + + Ok(()) + } + + fn last_index(&self) -> u64 { + self.last_index + } + + fn first_index(&self) -> u64 { + FIRST_RAFT_INDEX + } + + pub fn apply_snapshot(&mut self, _snapshot: Snapshot) -> Result<(), Error> { + unimplemented!("snapshots are currently not supported"); } } impl Storage for RocksDbStorage { fn initial_state(&self) -> raft::Result { - todo!() + let hard_state = self.get_hard_state()?; + + // fresh instance + if hard_state.commit == 0 { + return Ok(RaftState::new(hard_state, ConfState::default())); + } + + Ok(RaftState::new(hard_state, self.get_conf_state()?)) } - fn entries(&self, low: u64, high: u64, max_size: impl Into>, context: GetEntriesContext) -> raft::Result> { - todo!() + fn entries( + &self, + low: u64, + high: u64, + max_size: impl Into>, + _context: GetEntriesContext, + ) -> raft::Result> { + self.check_range(low, high)?; + let start_key = Self::raft_entry_key(low); + let end_key = Self::raft_entry_key(high); + + let cf = self.get_cf_handle(); + let mut opts = ReadOptions::default(); + opts.set_iterate_range(start_key..end_key); + opts.set_async_io(true); + + let mut iterator = self.db.raw_iterator_cf_opt(&cf, opts); + iterator.seek(start_key); + + let mut result = + Vec::with_capacity(usize::try_from(high - low).expect("u64 fits into usize")); + + let max_size = + usize::try_from(max_size.into().unwrap_or(u64::MAX)).expect("u64 fits into usize"); + let mut size = 0; + let mut expected_idx = low; + + while iterator.valid() { + if size > 0 && size >= max_size { + break; + } + + if let Some(value) = iterator.value() { + let mut entry = Entry::default(); + entry.merge_from_bytes(value)?; + + if expected_idx != entry.index { + if expected_idx == low { + Err(StorageError::Compacted)?; + } else { + // missing raft entries :-( + Err(StorageError::Unavailable)?; + } + } + + result.push(entry); + expected_idx += 1; + size += value.len(); + } + } + + // check for an occurred error + iterator + .status() + .map_err(|err| StorageError::Other(err.into()))?; + + Ok(result) } fn term(&self, idx: u64) -> raft::Result { - todo!() + // todo handle first_index - 1 once truncation is supported + self.check_index(idx)?; + self.get_entry(idx) + .map(|entry| entry.expect("should exist").term) + .map_err(Into::into) } fn first_index(&self) -> raft::Result { - todo!() + Ok(self.first_index()) } fn last_index(&self) -> raft::Result { - todo!() + Ok(self.last_index()) } - fn snapshot(&self, request_index: u64, to: u64) -> raft::Result { - todo!() + fn snapshot(&self, _request_index: u64, _to: u64) -> raft::Result { + // time is relative as some clever people figured out + Err(raft::Error::Store( + StorageError::SnapshotTemporarilyUnavailable, + )) } -} \ No newline at end of file +} + +pub fn storage_error(error: E) -> raft::Error +where + E: Into>, +{ + raft::Error::Store(StorageError::Other(error.into())) +} diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs index 7f268a0de..0c8c88ac6 100644 --- a/crates/metadata-store/src/raft/store.rs +++ b/crates/metadata-store/src/raft/store.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::raft::storage; +use crate::raft::storage::RocksDbStorage; use crate::{ MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, }; @@ -16,10 +18,10 @@ use bytes::Bytes; use bytestring::ByteString; use protobuf::{Message as ProtobufMessage, ProtobufError}; use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Message}; -use raft::storage::MemStorage; use raft::{Config, RawNode}; use restate_core::cancellation_watcher; use restate_core::metadata_store::{Precondition, VersionedValue}; +use restate_types::config::Configuration; use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version}; use slog::o; @@ -33,8 +35,14 @@ use tracing_slog::TracingSlogDrain; use ulid::Ulid; #[derive(Debug, thiserror::Error)] -#[error("failed creating raft node: {0}")] -pub struct BuildError(#[from] raft::Error); +pub enum BuildError { + #[error("failed creating raft node: {0}")] + Raft(#[from] raft::Error), + #[error("failed creating raft storage: {0}")] + Storage(#[from] storage::BuildError), + #[error("failed bootstrapping conf state: {0}")] + BootstrapConfState(#[from] storage::Error), +} #[derive(Debug, thiserror::Error)] pub enum Error { @@ -46,11 +54,13 @@ pub enum Error { DecodeConf(ProtobufError), #[error("failed applying conf change: {0}")] ApplyConfChange(raft::Error), + #[error("failed reading/writing from/to storage: {0}")] + Storage(#[from] storage::Error), } pub struct RaftMetadataStore { _logger: slog::Logger, - raw_node: RawNode, + raw_node: RawNode, tick_interval: time::Interval, callbacks: HashMap, @@ -61,7 +71,7 @@ pub struct RaftMetadataStore { } impl RaftMetadataStore { - pub fn new() -> Result { + pub async fn create() -> Result { let (request_tx, request_rx) = mpsc::channel(2); let config = Config { @@ -69,7 +79,16 @@ impl RaftMetadataStore { ..Default::default() }; - let store = MemStorage::new_with_conf_state(ConfState::from((vec![1], vec![]))); + let rocksdb_options = Configuration::updateable() + .map(|configuration| &configuration.common.rocksdb) + .boxed(); + let mut metadata_store_options = + Configuration::updateable().map(|configuration| &configuration.metadata_store); + let mut store = + RocksDbStorage::create(metadata_store_options.live_load(), rocksdb_options).await?; + let conf_state = ConfState::from((vec![1], vec![])); + store.store_conf_state(conf_state).await?; + let drain = TracingSlogDrain; let logger = slog::Logger::root(drain, o!()); @@ -125,7 +144,7 @@ impl RaftMetadataStore { } } - self.on_ready()?; + self.on_ready().await?; } debug!("Stop running RaftMetadataStore."); @@ -133,7 +152,7 @@ impl RaftMetadataStore { Ok(()) } - fn on_ready(&mut self) -> Result<(), Error> { + async fn on_ready(&mut self) -> Result<(), Error> { if !self.raw_node.has_ready() { return Ok(()); } @@ -149,8 +168,7 @@ impl RaftMetadataStore { if !ready.snapshot().is_empty() { if let Err(err) = self .raw_node - .store() - .wl() + .mut_store() .apply_snapshot(ready.snapshot().clone()) { warn!("failed applying snapshot: {err}"); @@ -158,14 +176,18 @@ impl RaftMetadataStore { } // then handle committed entries - self.handle_committed_entries(ready.take_committed_entries())?; + self.handle_committed_entries(ready.take_committed_entries()) + .await?; // append new Raft entries to storage - self.raw_node.store().wl().append(ready.entries())?; + self.raw_node.mut_store().append(ready.entries()).await?; // update the hard state if an update was produced (e.g. vote has happened) if let Some(hs) = ready.hs() { - self.raw_node.store().wl().set_hardstate(hs.clone()); + self.raw_node + .mut_store() + .store_hard_state(hs.clone()) + .await?; } // send persisted messages (after entries were appended and hard state was updated) @@ -177,12 +199,8 @@ impl RaftMetadataStore { let mut light_ready = self.raw_node.advance(ready); // update the commit index if it changed - if let Some(commit) = light_ready.commit_index() { - self.raw_node - .store() - .wl() - .mut_hard_state() - .set_commit(commit); + if let Some(_commit) = light_ready.commit_index() { + // update commit index in cached hard_state; no need to persist it though } // send outgoing messages @@ -192,7 +210,8 @@ impl RaftMetadataStore { // handle committed entries if !light_ready.committed_entries().is_empty() { - self.handle_committed_entries(light_ready.take_committed_entries())?; + self.handle_committed_entries(light_ready.take_committed_entries()) + .await?; } self.raw_node.advance_apply(); @@ -208,7 +227,10 @@ impl RaftMetadataStore { // todo: Send messages to other peers } - fn handle_committed_entries(&mut self, committed_entries: Vec) -> Result<(), Error> { + async fn handle_committed_entries( + &mut self, + committed_entries: Vec, + ) -> Result<(), Error> { for entry in committed_entries { if entry.data.is_empty() { // new leader was elected @@ -217,8 +239,8 @@ impl RaftMetadataStore { match entry.get_entry_type() { EntryType::EntryNormal => self.handle_normal_entry(entry)?, - EntryType::EntryConfChange => self.handle_conf_change(entry)?, - EntryType::EntryConfChangeV2 => self.handle_conf_change_v2(entry)?, + EntryType::EntryConfChange => self.handle_conf_change(entry).await?, + EntryType::EntryConfChangeV2 => self.handle_conf_change_v2(entry).await?, } } @@ -338,7 +360,7 @@ impl RaftMetadataStore { Ok(()) } - fn handle_conf_change(&mut self, entry: Entry) -> Result<(), Error> { + async fn handle_conf_change(&mut self, entry: Entry) -> Result<(), Error> { let mut cc = ConfChange::default(); cc.merge_from_bytes(&entry.data) .map_err(Error::DecodeConf)?; @@ -346,11 +368,11 @@ impl RaftMetadataStore { .raw_node .apply_conf_change(&cc) .map_err(Error::ApplyConfChange)?; - self.raw_node.store().wl().set_conf_state(cs); + self.raw_node.mut_store().store_conf_state(cs).await?; Ok(()) } - fn handle_conf_change_v2(&mut self, entry: Entry) -> Result<(), Error> { + async fn handle_conf_change_v2(&mut self, entry: Entry) -> Result<(), Error> { let mut cc = ConfChangeV2::default(); cc.merge_from_bytes(&entry.data) .map_err(Error::DecodeConf)?; @@ -358,7 +380,7 @@ impl RaftMetadataStore { .raw_node .apply_conf_change(&cc) .map_err(Error::ApplyConfChange)?; - self.raw_node.store().wl().set_conf_state(cs); + self.raw_node.mut_store().store_conf_state(cs).await?; Ok(()) } diff --git a/crates/metadata-store/src/util.rs b/crates/metadata-store/src/util.rs new file mode 100644 index 000000000..41b971da4 --- /dev/null +++ b/crates/metadata-store/src/util.rs @@ -0,0 +1,52 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::config::MetadataStoreOptions; +use rocksdb::DBCompressionType; + +pub fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { + rocksdb::Options::default() +} + +pub fn cf_options( + memory_budget: usize, +) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static { + move |mut opts| { + set_memory_related_opts(&mut opts, memory_budget); + opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + opts.set_num_levels(3); + + opts.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::None, + DBCompressionType::Zstd, + ]); + + // + opts + } +} + +pub fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) { + // We set the budget to allow 1 mutable + 3 immutable. + opts.set_write_buffer_size(memtables_budget / 4); + + // merge 2 memtables when flushing to L0 + opts.set_min_write_buffer_number_to_merge(2); + opts.set_max_write_buffer_number(4); + // start flushing L0->L1 as soon as possible. each file on level0 is + // (memtable_memory_budget / 2). This will flush level 0 when it's bigger than + // memtable_memory_budget. + opts.set_level_zero_file_num_compaction_trigger(2); + // doesn't really matter much, but we don't want to create too many files + opts.set_target_file_size_base(memtables_budget as u64 / 8); + // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast + opts.set_max_bytes_for_level_base(memtables_budget as u64); +}