From a1100bf270b7dd332a65bb88d216045d38cddfa6 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 27 Jul 2023 23:22:42 +0100 Subject: [PATCH] Support for configuration hot reload and efficient caching and validation --- Cargo.lock | 227 ++++++++-- cronback-lib/Cargo.toml | 2 + cronback-lib/config.rs | 412 +++++++++++++++++-- cronback-lib/default.toml | 50 --- cronback-lib/grpc_client_provider.rs | 13 +- cronback-lib/main.toml | 17 + cronback-lib/service.rs | 45 +- cronback-lib/shutdown.rs | 2 +- cronback-services/src/api/config.toml | 7 + cronback-services/src/api/mod.rs | 9 +- cronback-services/src/dispatcher/config.toml | 6 + cronback-services/src/dispatcher/mod.rs | 1 + cronback-services/src/metadata/config.toml | 6 + cronback-services/src/metadata/mod.rs | 1 + cronback-services/src/scheduler/config.toml | 10 + cronback-services/src/scheduler/mod.rs | 5 +- cronback-services/tests/scheduler_test.rs | 25 +- cronback/src/cli.rs | 4 +- cronback/src/lib.rs | 54 ++- 19 files changed, 725 insertions(+), 171 deletions(-) delete mode 100644 cronback-lib/default.toml create mode 100644 cronback-lib/main.toml create mode 100644 cronback-services/src/api/config.toml create mode 100644 cronback-services/src/dispatcher/config.toml create mode 100644 cronback-services/src/metadata/config.toml create mode 100644 cronback-services/src/scheduler/config.toml diff --git a/Cargo.lock b/Cargo.lock index fafa4b8..4f64408 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,7 +117,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -127,7 +127,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -622,7 +622,7 @@ checksum = "2674ec482fbc38012cf31e6c42ba0177b431a0cb6f15fe40efa5aab1bda516f6" dependencies = [ "is-terminal", "lazy_static", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -851,6 +851,8 @@ dependencies = [ "ipext", "iso8601-duration", "metrics", + "notify", + "notify-debouncer-mini", "rand", "sea-orm", "sea-orm-migration", @@ -1201,7 +1203,7 @@ checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" dependencies = [ "errno-dragonfly", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1222,7 +1224,7 @@ checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" dependencies = [ "cfg-if", "home", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1240,6 +1242,18 @@ dependencies = [ "instant", ] +[[package]] +name = "filetime" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cbc844cecaee9d4443931972e1289c8ff485cb4cc2767cb03ca139ed6885153" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall 0.2.16", + "windows-sys 0.48.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -1288,6 +1302,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -1537,7 +1560,7 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1701,6 +1724,26 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -1718,7 +1761,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ "hermit-abi", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1740,7 +1783,7 @@ checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", "rustix 0.38.4", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1776,6 +1819,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kqueue" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1968,8 +2031,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", + "log", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2045,6 +2109,34 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5738a2795d57ea20abec2d6d76c6081186709c0024187cd5977265eda6598b51" +dependencies = [ + "bitflags 1.3.2", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "mio", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "notify-debouncer-mini" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e55ee272914f4563a2f8b8553eb6811f3c0caea81c756346bad15b7e3ef969f0" +dependencies = [ + "crossbeam-channel", + "notify", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2233,7 +2325,7 @@ dependencies = [ "libc", "redox_syscall 0.3.5", "smallvec", - "windows-targets", + "windows-targets 0.48.1", ] [[package]] @@ -2839,7 +2931,7 @@ dependencies = [ "io-lifetimes", "libc", "linux-raw-sys 0.3.8", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2852,7 +2944,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.3", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2867,13 +2959,22 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe232bdf6be8c8de797b22184ee71118d63780ea42ac85b61d1baa6d3b782ae9" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3500,7 +3601,7 @@ dependencies = [ "fastrand", "redox_syscall 0.3.5", "rustix 0.37.23", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3530,7 +3631,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e6bf6f19e9f8ed8d4048dc22981458ebcf406d67e94cd422e5ecd73d63b3237" dependencies = [ "rustix 0.37.23", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3627,7 +3728,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -4074,6 +4175,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -4223,7 +4334,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" dependencies = [ - "windows-targets", + "windows-targets 0.48.1", +] + +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", ] [[package]] @@ -4232,7 +4352,22 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.1", +] + +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", ] [[package]] @@ -4241,51 +4376,93 @@ version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.0" diff --git a/cronback-lib/Cargo.toml b/cronback-lib/Cargo.toml index eaf482c..3481920 100644 --- a/cronback-lib/Cargo.toml +++ b/cronback-lib/Cargo.toml @@ -49,6 +49,8 @@ validator = { workspace = true } # Unique Dependencies config = { version = "0.13", default-features = false, features = ["toml"] } +notify = { version = "6.0.1" } +notify-debouncer-mini = { version = "0.3.0" } hyper = "0.14.24" tonic-reflection = "0.9.0" url = { workspace = true, features = ["serde"] } diff --git a/cronback-lib/config.rs b/cronback-lib/config.rs index 2a9ddea..c8619e6 100644 --- a/cronback-lib/config.rs +++ b/cronback-lib/config.rs @@ -1,17 +1,54 @@ //! Configuration Model +use std::any::Any; use std::collections::{HashMap, HashSet}; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; use config::builder::DefaultState; use config::{ - Config as ConfigRaw, - ConfigBuilder, - ConfigError, + Config as InnerConfig, + ConfigBuilder as InnerBuilder, + ConfigError as InnerConfigError, Environment, File, FileFormat, }; +use notify_debouncer_mini::{ + new_debouncer, + DebouncedEvent, + DebouncedEventKind, +}; use serde::Deserialize; +use thiserror::Error; +use tracing::{debug, error, info, warn}; + +use crate::prelude::CronbackService; +use crate::Shutdown; + +type SectionMap = HashMap>; +type SectionLoaders = HashMap< + String, + Arc< + dyn Fn( + &str, + &InnerConfig, + ) + -> Result, InnerConfigError> + + Send + + Sync, + >, +>; + +#[derive(Error, Debug)] +pub enum ConfigError { + #[error("configuration error for service '{0}'")] + ServiceConfigLoadError(String, #[source] InnerConfigError), + + #[error(transparent)] + ConfigLoadError(#[from] InnerConfigError), +} #[derive(Debug, Clone, Deserialize)] pub struct MainConfig { @@ -23,45 +60,348 @@ pub struct MainConfig { pub metadata_cell_map: HashMap, } -#[derive(Debug)] -pub struct ConfigLoader { - builder: ConfigBuilder, +#[derive(Clone)] +pub struct ConfigBuilder { + env_prefix: String, + builder: InnerBuilder, + file_sources: Vec, + section_loaders: SectionLoaders, } -impl ConfigLoader { - /// Loads a fresh copy of the configuration from source. - pub fn load_main(&self) -> Result { - let c = self.builder.build_cloned()?; - c.get("main") +impl ConfigBuilder { + /// creates a new builder configured to load the default from a toml string + pub fn new(env_prefix: impl Into, default_toml_src: &str) -> Self { + let builder = InnerConfig::builder() + .add_source(File::from_str(default_toml_src, FileFormat::Toml)); + let mut loader = ConfigBuilder { + env_prefix: env_prefix.into(), + builder, + file_sources: Vec::default(), + section_loaders: HashMap::default(), + }; + // Register main section loader by default. + loader = loader.register_section_loader::("main"); + loader + } + + /// Adds a static toml configuration string as a config source to the + /// builder + pub fn add_default_toml(self, raw: &str) -> Self { + Self { + builder: self + .builder + .add_source(File::from_str(raw, FileFormat::Toml)), + ..self + } + } + + pub fn add_file_source(mut self, path: impl Into) -> Self { + let path: PathBuf = path.into(); + self.file_sources.push(path.clone()); + Self { + builder: self.builder.add_source(File::from(path)), + ..self + } + } + + pub fn register_service(self) -> Self + where + S: CronbackService, + { + self.add_default_toml(S::DEFAULT_CONFIG_TOML) + .register_section_loader::<::ServiceConfig>( + S::CONFIG_SECTION, + ) + } + + // This needs to be the last source added to the builder to ensure that + // environemnt variables are not overridden by files or default config. + fn add_env_source(mut self) -> Self { + self.builder = self.builder.add_source( + Environment::with_prefix(&self.env_prefix) + .try_parsing(true) + .separator("__") + .list_separator(","), + ); + self } - /// Loads a fresh copy of a specific configuration section from source. - pub fn load_section<'de, C>(&self, section: &str) -> Result + // Sets an override that will always be applied on top of all sources. This + // is useful in testing scenarios where you want to override a value + // that is normally set in the default config. + pub fn set_override( + mut self, + key: K, + value: V, + ) -> Result where - C: Deserialize<'de>, + K: AsRef, + V: Into, { - let c = self.builder.build_cloned()?; - c.get(section) - } - - /// creates a new loader configured to load the default and overlays - /// the user supplied config (if supplied). - /// - /// * `config_file`: The path of the configuration file to load. - pub fn from_path(path: &Option) -> ConfigLoader { - let raw = include_str!("default.toml"); - let mut builder = ConfigRaw::builder() - .add_source(File::from_str(raw, FileFormat::Toml)) - .add_source( - Environment::with_prefix("CRONBACK") - .try_parsing(true) - .separator("__") - .list_separator(",") - .with_list_parse_key("api.admin_api_keys"), - ); - if let Some(path) = path { - builder = builder.add_source(File::with_name(path)); + self.builder = self.builder.set_override(key, value)?; + Ok(self) + } + + /// Instantiate the config without watching the configuration for changes + pub fn build_once(mut self) -> Result { + self = self.add_env_source(); + Config::from_builder(self, None) + } + + pub fn build_and_watch( + mut self, + shutdown: Shutdown, + ) -> Result { + self = self.add_env_source(); + Config::from_builder(self, Some(shutdown)) + } + + /// Registers a function that teacher the builder how to load sections for + /// registered services. + pub fn register_section_loader<'de, C>(mut self, section: &str) -> Self + where + C: Deserialize<'de> + Send + Sync + 'static, + { + self.section_loaders.insert( + section.to_owned(), + Arc::new(move |section: &str, inner: &InnerConfig| { + let section = inner.get::(section)?; + let section = Box::new(section) as Box; + Ok(section) + }), + ); + self + } +} + +impl Default for ConfigBuilder { + /// creates a new builder configured to load the default from a toml string + fn default() -> Self { + Self::new("CRONBACK", include_str!("main.toml")) + } +} + +struct CachedConfig { + section_map: SectionMap, + last_modified: Instant, +} + +impl CachedConfig { + fn new(section_map: SectionMap) -> Self { + Self { + section_map, + last_modified: Instant::now(), + } + } +} + +struct ConfigWatcher { + builder: InnerBuilder, + section_loaders: SectionLoaders, + files: Vec, +} + +impl ConfigWatcher { + fn new( + builder: InnerBuilder, + section_loaders: SectionLoaders, + files: Vec, + ) -> Self { + Self { + builder, + section_loaders, + files, + } + } + + fn start_watching( + self, + cache: Arc>, + mut shutdown: Shutdown, + ) { + let (tx, rx) = std::sync::mpsc::channel(); + // Automatically select the best implementation for watching files on + // the current platform. + let mut debouncer = + new_debouncer(Duration::from_secs(2), None, tx).unwrap(); + for source in &self.files { + info!("Installing watcher for file changes: {}", source.display()); + debouncer + .watcher() + .watch(source, notify::RecursiveMode::NonRecursive) + .expect("watch files with notify"); + } + std::thread::Builder::new() + .name("config-watcher".to_owned()) + .spawn(move || { + // It's important that we capture the watcher in the thread, + // otherwise it'll be dropped and we won't be watching anything! + let _debouncer = debouncer; + info!("Configuration watcher thread has started"); + while !shutdown.is_shutdown() { + match rx.recv() { + | Ok(evs) => { + self.handle_events(evs, &cache); + } + | Err(e) => { + error!( + "Cannot continue watching configuration \ + changes: '{}', system will shutdown to avoid \ + crash-looping!", + e + ); + shutdown.broadcast_shutdown(); + } + } + } + info!("Config watcher thread has terminated"); + }) + .unwrap(); + } + + fn load_sections(&self) -> Result { + // load the config + let inner = self.builder.build_cloned()?; + let mut section_map = HashMap::default(); + + for (section, loader) in &self.section_loaders { + debug!("--> Loading configuration section {section}"); + let section_cfg = loader(section, &inner).map_err(|e| { + ConfigError::ServiceConfigLoadError(section.to_owned(), e) + })?; + section_map.insert(section.to_owned(), section_cfg); } - ConfigLoader { builder } + Ok(section_map) + } + + fn handle_events( + &self, + evs: Result, Vec>, + cache: &Arc>, + ) { + match evs { + | Ok(evs) => { + let mut should_update = false; + for event in evs + .into_iter() + .filter(|e| e.kind == DebouncedEventKind::Any) + { + should_update = true; + info!( + "Detected configuration file changes: {:?}", + event.path + ); + } + if should_update { + self.try_update_cache(cache).unwrap_or_else(|e| { + error!( + "Error updating configuration, we will keep the \ + last configuration in memory a valid \ + configuration is loaded: {}", + e + ); + }); + } + } + | Err(e) => { + warn!( + "Error watching configuration file, file changes might \ + not be observed, but the system will continue to operate \ + with the last known configuration: {}", + e.iter() + .map(|e| e.to_string()) + .collect::>() + .join(", ") + ); + } + } + } + + fn try_update_cache( + &self, + cache: &Arc>, + ) -> Result<(), ConfigError> { + let section_map = self.load_sections()?; + let mut cache = cache.write().unwrap(); + cache.section_map = section_map; + info!( + "Configuration has been updated. Last update was {:?} ago.", + cache.last_modified.elapsed() + ); + cache.last_modified = Instant::now(); + Ok(()) } } + +/// Automatically watches sources and update the cache as needed. +/// If configuration loading has failed after the first initialisation (due to a +/// watched config file being invalid) the last loaded configuration will remain +/// loaded in cache and we will log the problem in WARN level to alert the user. +/// +/// Safe to clone, all clones share the same cache and watcher infrastructure. +#[derive(Clone)] +pub struct Config { + cache: Arc>, +} + +impl Config { + fn from_builder( + builder: ConfigBuilder, + shutdown: Option, + ) -> Result { + let loader = ConfigWatcher::new( + builder.builder, + builder.section_loaders, + builder.file_sources, + ); + + let section_map = loader.load_sections()?; + let cache = Arc::new(RwLock::new(CachedConfig::new(section_map))); + + if let Some(shutdown) = shutdown { + // Start the config watcher thread + loader.start_watching(cache.clone(), shutdown); + } + Ok(Self { cache }) + } + + /// Convenience function to get the main config + pub fn get_main(&self) -> MainConfig { + self.get("main") + } + + /// Do not use this method directly, this will panic if the section doesn't + /// exist, or of the type is not correct. Instead, use helpers in + /// ServiceContext instead. + pub fn get(&self, section: &str) -> C + where + C: for<'de> Deserialize<'de> + Clone + 'static, + { + // We assume that validation happened at loading time and that the + // section exists and valid. This function will panic if the section + // doesn't exist in the config. + self.cache + .read() + .unwrap() + .section_map + .get(section) + // downcast_ref returns a reference to the boxed value if it is of + // type T. + .and_then(|boxed| { + (&**boxed as &(dyn Any + 'static)).downcast_ref::() + }) + .unwrap() + .clone() + } + + pub fn last_modified(&self) -> Instant { + self.cache.read().unwrap().last_modified + } +} + +const _: () = { + const fn _assert_send() {} + const fn _assert_send_sync() {} + _assert_send_sync::(); + _assert_send::(); +}; diff --git a/cronback-lib/default.toml b/cronback-lib/default.toml deleted file mode 100644 index 8dac8ad..0000000 --- a/cronback-lib/default.toml +++ /dev/null @@ -1,50 +0,0 @@ -[main] -roles = ["api", "dispatcher", "scheduler", "metadata"] -prometheus_address = "0.0.0.0" -prometheus_port = 9000 - -[main.dispatcher_cell_map] -# Maps a cell_id to a dispatcher address -0 = "http://127.0.0.1:9999" - -[main.scheduler_cell_map] -# Maps a cell_id to a scheduler address -0 = "http://127.0.0.1:9811" - - -[main.metadata_cell_map] -# Maps a cell_id to a metadata service address -0 = "http://127.0.0.1:9998" - -[api] -address = "0.0.0.0" -port = 8888 -database_uri = "sqlite://api.sqlite?mode=rwc" -admin_api_keys = [] -log_request_body = true -log_response_body = true - -[dispatcher] -cell_id = 0 -address = "0.0.0.0" -port = 9999 -request_processing_timeout_s = 30 -database_uri = "sqlite://dispatcher.sqlite?mode=rwc" - -[scheduler] -cell_id = 0 -address = "0.0.0.0" -port = 9811 -request_processing_timeout_s = 30 -spinner_yield_max_ms = 250 -max_triggers_per_tick = 100000 -database_uri = "sqlite://scheduler.sqlite?mode=rwc" -db_flush_s = 10 -dangerous_fast_forward = false - -[metadata] -cell_id = 0 -address = "0.0.0.0" -port = 9998 -request_processing_timeout_s = 30 -database_uri = "sqlite://metadata.sqlite?mode=rwc" diff --git a/cronback-lib/grpc_client_provider.rs b/cronback-lib/grpc_client_provider.rs index fd2fadb..71bad7a 100644 --- a/cronback-lib/grpc_client_provider.rs +++ b/cronback-lib/grpc_client_provider.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::str::FromStr; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; use async_trait::async_trait; use derive_more::{Deref, DerefMut}; @@ -11,7 +11,7 @@ use crate::config::MainConfig; use crate::model::ValidShardedId; use crate::prelude::{GrpcRequestInterceptor, Shard}; use crate::types::{ProjectId, RequestId}; -use crate::ConfigLoader; +use crate::Config; #[derive(Debug, Error)] pub enum GrpcClientError { @@ -109,15 +109,15 @@ pub trait GrpcClientFactory: Send + Sync { // A concrete channel-caching implementation of the GrpcClientFactory used in // production. pub struct GrpcClientProvider { - config_loader: Arc, + config: Config, channel_cache: RwLock>, phantom: std::marker::PhantomData, } impl GrpcClientProvider { - pub fn new(config_loader: Arc) -> Self { + pub fn new(config: Config) -> Self { Self { - config_loader, + config, channel_cache: Default::default(), phantom: Default::default(), } @@ -133,8 +133,7 @@ impl GrpcClientFactory for GrpcClientProvider { request_id: &RequestId, project_id: &ValidShardedId, ) -> Result { - // TODO: Will be optimised in a future change. - let config = self.config_loader.load_main().expect("main config valid"); + let config = self.config.get_main(); // resolve shard -> cell let address = T::get_address(&config, project_id)?; diff --git a/cronback-lib/main.toml b/cronback-lib/main.toml new file mode 100644 index 0000000..300a340 --- /dev/null +++ b/cronback-lib/main.toml @@ -0,0 +1,17 @@ +[main] +roles = ["api", "dispatcher", "scheduler", "metadata"] +prometheus_address = "0.0.0.0" +prometheus_port = 9000 + +[main.dispatcher_cell_map] +# Maps a cell_id to a dispatcher address +0 = "http://127.0.0.1:9999" + +[main.scheduler_cell_map] +# Maps a cell_id to a scheduler address +0 = "http://127.0.0.1:9811" + + +[main.metadata_cell_map] +# Maps a cell_id to a metadata service address +0 = "http://127.0.0.1:9998" diff --git a/cronback-lib/service.rs b/cronback-lib/service.rs index 285fb6a..748dff1 100644 --- a/cronback-lib/service.rs +++ b/cronback-lib/service.rs @@ -2,11 +2,9 @@ use std::convert::Infallible; use std::error::Error; use std::net::SocketAddr; use std::path::Path; -use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use config::ConfigError; use futures::Stream; use hyper::{Body, Request, Response}; use proto::FILE_DESCRIPTOR_SET; @@ -23,7 +21,7 @@ use tower::Service; use tower_http::trace::{MakeSpan, TraceLayer}; use tracing::{error, error_span, info, Id, Span}; -use crate::config::ConfigLoader; +use crate::config::Config; use crate::consts::{PARENT_SPAN_HEADER, PROJECT_ID_HEADER, REQUEST_ID_HEADER}; use crate::database::{Database, DatabaseError, DbMigration}; use crate::rpc_middleware::CronbackRpcMiddleware; @@ -32,19 +30,33 @@ use crate::MainConfig; #[async_trait] pub trait CronbackService: Send + Sync + Sized + Clone + 'static { - type ServiceConfig: for<'a> Deserialize<'a> + Into + Send; + type ServiceConfig: for<'a> Deserialize<'a> + + Clone + // TODO: Consider better option instead of this bound. + + Into + + Send + + Sync; type Migrator: DbMigration; + /// The role of the service. This must be unique across all services running + /// on the same binary. A role can be enabled or disabled via + /// `main.roles` list in the config file. const ROLE: &'static str; - // Default config section to the role name + /// Default config section to the role name (e.g. `scheduler` which + /// translates to `[scheduler]`) default configuration **must** be in TOML + /// format. const CONFIG_SECTION: &'static str = Self::ROLE; + /// An additional configuration layer that will be added to the default + /// configuration _before_ loading any configuration file externally. + const DEFAULT_CONFIG_TOML: &'static str = ""; + /// Create a new service context. fn make_context( - config_loader: Arc, + config: Config, shutdown: Shutdown, ) -> ServiceContext { - ServiceContext::new(config_loader, shutdown) + ServiceContext::new(config, shutdown) } /// Optional hook to install telemetry for the service. @@ -73,7 +85,7 @@ pub trait CronbackService: Send + Sync + Sized + Clone + 'static { // needs to be parametric by config type. #[derive(Clone)] pub struct ServiceContext { - config_loader: Arc, + config: Config, shutdown: Shutdown, _service: std::marker::PhantomData, } @@ -82,9 +94,9 @@ impl ServiceContext where S: CronbackService, { - fn new(config_loader: Arc, shutdown: Shutdown) -> Self { + fn new(config: Config, shutdown: Shutdown) -> Self { Self { - config_loader, + config, shutdown, _service: Default::default(), } @@ -94,12 +106,12 @@ where S::ROLE } - pub fn config_loader(&self) -> Arc { - self.config_loader.clone() + pub fn config(&self) -> &Config { + &self.config } pub fn get_main_config(&self) -> MainConfig { - self.config_loader.load_main().unwrap() + self.config.get_main() } /// Awaits the shutdown signal @@ -112,13 +124,8 @@ where self.shutdown.broadcast_shutdown() } - // TODO: Return ref when we add config caching pub fn service_config(&self) -> S::ServiceConfig { - self.config_loader.load_section(S::CONFIG_SECTION).unwrap() - } - - pub fn load_service_config(&self) -> Result { - self.config_loader.load_section(S::CONFIG_SECTION) + self.config.get(S::CONFIG_SECTION) } } diff --git a/cronback-lib/shutdown.rs b/cronback-lib/shutdown.rs index 5fe63b5..d616448 100644 --- a/cronback-lib/shutdown.rs +++ b/cronback-lib/shutdown.rs @@ -47,7 +47,7 @@ impl Shutdown { /// Returns `true` if the shutdown signal has been received. pub fn is_shutdown(&self) -> bool { - self.shutdown + self.shutdown || !self.watch.is_empty() } /// Receive the shutdown notice, waiting if necessary. diff --git a/cronback-services/src/api/config.toml b/cronback-services/src/api/config.toml new file mode 100644 index 0000000..d0539ca --- /dev/null +++ b/cronback-services/src/api/config.toml @@ -0,0 +1,7 @@ +[api] +address = "0.0.0.0" +port = 8888 +database_uri = "sqlite://api.sqlite?mode=rwc" +admin_api_keys = [] +log_request_body = true +log_response_body = true diff --git a/cronback-services/src/api/mod.rs b/cronback-services/src/api/mod.rs index c2dc51a..81ec1cf 100644 --- a/cronback-services/src/api/mod.rs +++ b/cronback-services/src/api/mod.rs @@ -57,6 +57,7 @@ impl CronbackService for ApiService { type Migrator = migration::Migrator; type ServiceConfig = ApiSvcConfig; + const DEFAULT_CONFIG_TOML: &'static str = include_str!("config.toml"); const ROLE: &'static str = "api"; fn install_telemetry() { @@ -77,7 +78,7 @@ impl CronbackService for ApiService { mut context: ServiceContext, db: Database, ) -> anyhow::Result<()> { - let config_loader = context.config_loader(); + let config = context.config(); let svc_config = context.service_config(); let addr = netutils::parse_addr(&svc_config.address, svc_config.port).unwrap(); @@ -86,13 +87,13 @@ impl CronbackService for ApiService { context: context.clone(), authenicator: Authenticator::new(AuthStore::new(db)), scheduler_clients: Box::new(GrpcClientProvider::new( - config_loader.clone(), + config.clone(), )), dispatcher_clients: Box::new(GrpcClientProvider::new( - config_loader.clone(), + config.clone(), )), metadata_svc_clients: Box::new(GrpcClientProvider::new( - config_loader.clone(), + config.clone(), )), }); diff --git a/cronback-services/src/dispatcher/config.toml b/cronback-services/src/dispatcher/config.toml new file mode 100644 index 0000000..2104833 --- /dev/null +++ b/cronback-services/src/dispatcher/config.toml @@ -0,0 +1,6 @@ +[dispatcher] +cell_id = 0 +address = "0.0.0.0" +port = 9999 +request_processing_timeout_s = 30 +database_uri = "sqlite://dispatcher.sqlite?mode=rwc" diff --git a/cronback-services/src/dispatcher/mod.rs b/cronback-services/src/dispatcher/mod.rs index 926cdfa..d106c14 100644 --- a/cronback-services/src/dispatcher/mod.rs +++ b/cronback-services/src/dispatcher/mod.rs @@ -29,6 +29,7 @@ impl CronbackService for DispatcherService { type Migrator = migration::Migrator; type ServiceConfig = DispatcherSvcConfig; + const DEFAULT_CONFIG_TOML: &'static str = include_str!("config.toml"); const ROLE: &'static str = "dispatcher"; fn install_telemetry() { diff --git a/cronback-services/src/metadata/config.toml b/cronback-services/src/metadata/config.toml new file mode 100644 index 0000000..e043710 --- /dev/null +++ b/cronback-services/src/metadata/config.toml @@ -0,0 +1,6 @@ +[metadata] +cell_id = 0 +address = "0.0.0.0" +port = 9998 +request_processing_timeout_s = 30 +database_uri = "sqlite://metadata.sqlite?mode=rwc" diff --git a/cronback-services/src/metadata/mod.rs b/cronback-services/src/metadata/mod.rs index 96233f6..f883541 100644 --- a/cronback-services/src/metadata/mod.rs +++ b/cronback-services/src/metadata/mod.rs @@ -22,6 +22,7 @@ impl CronbackService for MetadataService { type Migrator = migration::Migrator; type ServiceConfig = MetadataSvcConfig; + const DEFAULT_CONFIG_TOML: &'static str = include_str!("config.toml"); const ROLE: &'static str = "metadata"; #[tracing::instrument(skip_all, fields(service = context.service_name()))] diff --git a/cronback-services/src/scheduler/config.toml b/cronback-services/src/scheduler/config.toml new file mode 100644 index 0000000..3a22de5 --- /dev/null +++ b/cronback-services/src/scheduler/config.toml @@ -0,0 +1,10 @@ +[scheduler] +cell_id = 0 +address = "0.0.0.0" +port = 9811 +request_processing_timeout_s = 30 +spinner_yield_max_ms = 250 +max_triggers_per_tick = 100000 +database_uri = "sqlite://scheduler.sqlite?mode=rwc" +db_flush_s = 10 +dangerous_fast_forward = false diff --git a/cronback-services/src/scheduler/mod.rs b/cronback-services/src/scheduler/mod.rs index 20d92c9..ec79950 100644 --- a/cronback-services/src/scheduler/mod.rs +++ b/cronback-services/src/scheduler/mod.rs @@ -30,6 +30,7 @@ impl CronbackService for SchedulerService { type Migrator = migration::Migrator; type ServiceConfig = SchedulerSvcConfig; + const DEFAULT_CONFIG_TOML: &'static str = include_str!("config.toml"); const ROLE: &'static str = "scheduler"; fn install_telemetry() { @@ -60,7 +61,7 @@ impl CronbackService for SchedulerService { let trigger_store = TriggerStore::new(db); let dispatcher_clients = - Arc::new(GrpcClientProvider::new(context.config_loader())); + Arc::new(GrpcClientProvider::new(context.config().clone())); let controller = Arc::new(SpinnerController::new( context.clone(), @@ -122,7 +123,7 @@ pub mod test_helpers { std::fs::remove_file(&*socket).unwrap(); let dispatcher_client_provider = - Arc::new(GrpcClientProvider::new(context.config_loader())); + Arc::new(GrpcClientProvider::new(context.config().clone())); let db = SchedulerService::in_memory_database().await.unwrap(); diff --git a/cronback-services/tests/scheduler_test.rs b/cronback-services/tests/scheduler_test.rs index 86a8191..83cf6cc 100644 --- a/cronback-services/tests/scheduler_test.rs +++ b/cronback-services/tests/scheduler_test.rs @@ -1,11 +1,9 @@ -use std::sync::Arc; - use cronback_services::scheduler::{test_helpers, SchedulerService}; use dto::traits::ProstOptionExt; use lib::clients::ScopedSchedulerSvcClient; use lib::grpc_test_helpers::TestGrpcClientProvider; use lib::prelude::*; -use lib::{ConfigLoader, GrpcClientFactory, Shutdown}; +use lib::{ConfigBuilder, GrpcClientFactory, Shutdown}; use proto::common::{ action, Action, @@ -124,8 +122,11 @@ async fn get_trigger( #[tokio::test] async fn install_trigger_valid_test() { let shutdown = Shutdown::default(); - let config_loader = Arc::new(ConfigLoader::from_path(&None)); - let context = SchedulerService::make_context(config_loader, shutdown); + let config = ConfigBuilder::default() + .register_service::() + .build_once() + .unwrap(); + let context = SchedulerService::make_context(config, shutdown); let project = ProjectId::generate(); info!("Initialising test..."); let (_serve_future, client_provider) = @@ -196,8 +197,11 @@ async fn install_trigger_valid_test() { #[ignore] async fn install_trigger_uniqueness_test() { let shutdown = Shutdown::default(); - let config_loader = Arc::new(ConfigLoader::from_path(&None)); - let context = SchedulerService::make_context(config_loader, shutdown); + let config = ConfigBuilder::default() + .register_service::() + .build_once() + .unwrap(); + let context = SchedulerService::make_context(config, shutdown); let project = ProjectId::generate(); let (_serve_future, client_provider) = test_helpers::test_server_and_client(context).await; @@ -352,8 +356,11 @@ async fn install_trigger_uniqueness_test() { #[tokio::test] async fn delete_project_triggers_test() { let shutdown = Shutdown::default(); - let config_loader = Arc::new(ConfigLoader::from_path(&None)); - let context = SchedulerService::make_context(config_loader, shutdown); + let config = ConfigBuilder::default() + .register_service::() + .build_once() + .unwrap(); + let context = SchedulerService::make_context(config, shutdown); let project1 = ProjectId::generate(); let project2 = ProjectId::generate(); diff --git a/cronback/src/cli.rs b/cronback/src/cli.rs index 2368e48..8d99f9a 100644 --- a/cronback/src/cli.rs +++ b/cronback/src/cli.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use clap::Parser; #[derive(clap::ValueEnum, Clone)] @@ -12,7 +14,7 @@ pub enum LogFormat { pub struct CliOpts { /// Sets the custom configuration file. #[arg(short, long, value_name = "FILE")] - pub config: Option, + pub config: Option, /// Turn debugging information on #[arg(short, long, action = clap::ArgAction::Count)] diff --git a/cronback/src/lib.rs b/cronback/src/lib.rs index c4d03d0..025f1e2 100644 --- a/cronback/src/lib.rs +++ b/cronback/src/lib.rs @@ -2,7 +2,6 @@ pub mod cli; mod metric_defs; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; use std::time::Duration; use anyhow::{bail, Context, Result}; @@ -12,7 +11,7 @@ use cli::LogFormat; use colored::Colorize; use lib::netutils::parse_addr; use lib::prelude::*; -use lib::{ConfigLoader, MainConfig, Shutdown}; +use lib::{ConfigBuilder, MainConfig, Shutdown}; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_util::MetricKindMask; use tokio::task::JoinSet; @@ -105,12 +104,7 @@ async fn prepare_database( ) -> Result { let service_name = ctx.service_name(); info!(service = service_name, "Preparing database"); - // We load the service configuration to ensure that we have a good - // config. - let config = ctx.load_service_config().with_context(|| { - format!("Failed to load configuration for service '{service_name}") - })?; - + let config = ctx.service_config(); // Service config must implement Into S::prepare_database(config).await.with_context(|| { format!("Failed to prepare database for service '{service_name}") @@ -164,7 +158,11 @@ mod private { #[async_trait] pub trait Cronback: private::Sealed { - async fn run_cronback() -> Result<()>; + async fn run_cronback() -> Result<()> { + // built-in defaults. + Self::run_with_config(ConfigBuilder::default()).await + } + async fn run_with_config(config_builder: ConfigBuilder) -> Result<()>; } macro_rules! impl_cronback_with { @@ -181,7 +179,7 @@ macro_rules! impl_cronback_with { impl<$($ty,)*> Cronback for ($($ty,)*) where $($ty: CronbackService),* { - async fn run_cronback() -> Result<()> { + async fn run_with_config(mut config_builder: ConfigBuilder) -> Result<()> { // Load .env file if it exists match dotenvy::dotenv() { | Ok(_) => {} @@ -198,16 +196,19 @@ macro_rules! impl_cronback_with { setup_logging_subscriber(&opts.log_format, &opts.api_tracing_dir); print_banner(); - trace!(config = opts.config, "Loading configuration"); - let config_loader = Arc::new(ConfigLoader::from_path(&opts.config)); - let config_main = config_loader.load_main()?; + trace!(config = ?opts.config, "Loading configuration"); + if let Some(config_path) = opts.config { + config_builder = config_builder.add_file_source(&config_path); + } - // Configure Metric Exporter - setup_prometheus(&config_main)?; + // We need to load the configuration in two steps. First, we load the main section + // only and read "roles". Based on the roles, we will register services along their + // defaults, section loaders, and etc. Then we will re-load the configuration with + // all the sections and that'll be the final config. + let config_main = config_builder.clone().build_once()?.get_main(); // Install metric definitions metric_defs::install_metrics(); - // Init services let mut available_roles: HashSet = HashSet::new(); let mut services: JoinSet<()> = JoinSet::new(); @@ -219,7 +220,6 @@ macro_rules! impl_cronback_with { $ty::ROLE); } - let $ty = $ty::make_context(config_loader.clone(), shutdown.clone()); )* if !config_main.roles.is_subset(&available_roles) { @@ -228,23 +228,43 @@ macro_rules! impl_cronback_with { } info!("Initializing services"); + // Only register the services in "roles" in the config loader. The system cannot + // react to changes to `main.roles` after startup. + $( + if config_main.roles.contains($ty::ROLE) + { + config_builder = config_builder.register_service::<$ty>(); + } + )* + + // Create the permanent config + let config = config_builder.build_and_watch(shutdown.clone())?; + // // Initialise services and run database migrations before serving any traffic on // any service. let mut databases: HashMap = HashMap::new(); $( if config_main.roles.contains($ty::ROLE) { + // Create a temporary context for the service. We can't use this context + // when spawning the service due to the scope. + let $ty = $ty::make_context(config.clone(), shutdown.clone()); debug!(service = $ty::ROLE, "Installing telemetry"); $ty::install_telemetry(); databases.insert($ty::ROLE.to_owned(), prepare_database(&$ty).await?); } )* + // Configure Metric Exporter + setup_prometheus(&config_main)?; info!("Services has completed database migrations"); + // spawn the services in the order there were registered $( if config_main.roles.contains($ty::ROLE) { + // Create the permanent context for the service. + let $ty = $ty::make_context(config.clone(), shutdown.clone()); services.spawn(spawn_service($ty, databases.remove($ty::ROLE).unwrap())); } )*