diff --git a/Cargo.lock b/Cargo.lock index e99d015..858b8d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -40,6 +55,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "bytes" version = "1.1.0" @@ -48,9 +69,9 @@ checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" [[package]] name = "cc" -version = "1.0.70" +version = "1.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d26a6ce4b6a484fa3edb70f7efa6fc430fd2b87285fe8b84304fd0936faa0dc0" +checksum = "e9e8aabfac534be767c909e0690571677d49f41bd8465ae876fe043d52ba5292" [[package]] name = "cfg-if" @@ -58,6 +79,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "clap" version = "2.33.3" @@ -83,6 +118,23 @@ dependencies = [ "winapi", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "dtoa" version = "0.4.8" @@ -224,6 +276,29 @@ dependencies = [ "libc", ] +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "indexmap" version = "1.7.0" @@ -249,6 +324,15 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "js-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.139" @@ -294,6 +378,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "0.7.13" @@ -329,6 +419,16 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -338,6 +438,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -350,9 +459,12 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.8.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +checksum = "82881c4be219ab5faaf2ad5e5e5ecdff8c66bd7402ca3160975c93b24961afd1" +dependencies = [ + "portable-atomic", +] [[package]] name = "parking_lot" @@ -391,6 +503,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -647,6 +765,61 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "wasm-bindgen" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.79", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" + [[package]] name = "winapi" version = "0.3.9" @@ -669,6 +842,79 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + [[package]] name = "yaml-rust" version = "0.4.5" @@ -683,8 +929,10 @@ name = "zinit" version = "0.2.0" dependencies = [ "anyhow", + "chrono", "clap", "command-group", + "cron", "fern", "futures", "git-version", diff --git a/Cargo.toml b/Cargo.toml index 55c8c3a..a074adb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,5 @@ clap = "2.33" git-version = "0.3.5" command-group = "1.0.8" futures = "0.3.30" +cron = "0.12.1" +chrono = "0.4.38" diff --git a/src/zinit/config.rs b/src/zinit/config.rs index c76229e..e12d344 100644 --- a/src/zinit/config.rs +++ b/src/zinit/config.rs @@ -1,11 +1,13 @@ use anyhow::Result; -use serde::{Deserialize, Serialize}; +use cron::Schedule; +use serde::{Deserialize, Deserializer, Serialize}; use serde_yaml as yaml; use std::collections::HashMap; use std::ffi::OsStr; use std::fs::{self, File}; use std::path::Path; pub type Services = HashMap; +use std::str::FromStr; pub const DEFAULT_SHUTDOWN_TIMEOUT: u64 = 10; // in seconds @@ -52,7 +54,8 @@ pub struct Service { pub log: Log, pub env: HashMap, pub dir: String, - pub cron: Option, + #[serde(default, deserialize_with = "deserialize_cron_option")] + pub cron: Option, } impl Service { @@ -65,14 +68,9 @@ impl Service { Signal::from_str(&self.signal.stop.to_uppercase())?; - // Validate the cron field if present - if let Some(cron_value) = self.cron { - if cron_value == 0 { - bail!("cron value must be greater than zero"); - } - if !self.one_shot { - bail!("cron can only be specified for oneshot services"); - } + // Cron jobs only possible for oneshot services + if self.cron.is_some() && !self.one_shot { + bail!("cron can only be used for oneshot services"); } Ok(()) @@ -128,3 +126,18 @@ pub fn load_dir>(p: T) -> Result { Ok(services) } + +/// Custom deserializer to parse cron expression from string +fn deserialize_cron_option<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + use serde::de::Error; + let s: Option = Option::deserialize(deserializer)?; + if let Some(s) = s { + let schedule = Schedule::from_str(&s).map_err(D::Error::custom)?; + Ok(Some(schedule)) + } else { + Ok(None) + } +} diff --git a/src/zinit/mod.rs b/src/zinit/mod.rs index 75db7fa..d63898d 100644 --- a/src/zinit/mod.rs +++ b/src/zinit/mod.rs @@ -4,6 +4,7 @@ use crate::manager::{Log, Logs, Process, ProcessManager}; use crate::zinit::ord::ProcessDAG; use crate::zinit::ord::{service_dependency_order, DUMMY_ROOT}; use anyhow::Result; +use chrono::{DateTime, Utc}; use config::DEFAULT_SHUTDOWN_TIMEOUT; use nix::sys::reboot::RebootMode; use nix::sys::signal; @@ -17,8 +18,8 @@ use thiserror::Error; use tokio::sync::mpsc; use tokio::sync::watch; use tokio::sync::{Notify, RwLock}; -use tokio::time::timeout; use tokio::time::{self}; +use tokio::time::{sleep_until, timeout, Instant}; use tokio_stream::{wrappers::WatchStream, StreamExt}; pub trait WaitStatusExt { @@ -546,7 +547,13 @@ impl ZInit { let name = name.clone(); let mut service = input.write().await; - if service.target == Target::Down || service.scheduled { + if service.target == Target::Down { + debug!("service '{}' target is down", name); + return; + } + + if service.scheduled { + debug!("service '{}' already scheduled", name); return; } @@ -554,95 +561,168 @@ impl ZInit { drop(service); // Release the lock loop { - { - let service = input.read().await; - if service.target == Target::Down { - // Service target is down; exit the loop - break; - } - let config = service.service.clone(); - drop(service); // Release the lock - - // Wait for dependencies - while !self.can_schedule(&config).await { - let sig = self.notify.notified(); - self.set(&name, Some(State::Blocked), None).await; - sig.await; + let name = name.clone(); + + let service = input.read().await; + // early check if service is down, so we don't have to do extra checks + if service.target == Target::Down { + // we check target in loop in case service have + // been set down. + break; + } + let config = service.service.clone(); + // we need to spawn this service now, but is it ready? + // are all dependent services are running ? + + // so we drop the table to give other services + // chance to acquire the lock and schedule themselves + drop(service); + + 'checks: loop { + let sig = self.notify.notified(); + debug!("checking {} if it can schedule", name); + if self.can_schedule(&config).await { + debug!("service {} can schedule", name); + break 'checks; } - let log = match config.log { - config::Log::None => Log::None, - config::Log::Stdout => Log::Stdout, - config::Log::Ring => Log::Ring(name.clone()), - }; + self.set(&name, Some(State::Blocked), None).await; + // don't even care if i am lagging + // as long i am notified that some services status + // has changed + debug!("service {} is blocked, waiting release", name); + sig.await; + } - let mut service = input.write().await; + // If cron is specified for a oneshot service, schedule the execution + if let Some(ref schedule) = config.cron { + // Get current time + let now: DateTime = Utc::now(); - if service.target == Target::Down { - // Service target is down; exit the loop - break; - } + // Get next scheduled time + if let Some(next_datetime) = schedule.upcoming(Utc).next() { + let duration = next_datetime + .signed_duration_since(now) + .to_std() + .unwrap_or(Duration::from_secs(0)); - let child = match self - .pm - .run( - Process::new(&config.exec, &config.dir, Some(config.env.clone())), - log.clone(), - ) - .await - { - Ok(child) => { - service.state.set(State::Spawned); - service.pid = child.pid; - child - } - Err(err) => { - error!("service {} failed to start: {}", name, err); - service.state.set(State::Failure); - // Decide whether to break or continue based on your logic + // Wait until the next scheduled time + debug!("service {} scheduled to run at {}", name, next_datetime); + sleep_until(Instant::now() + duration).await; + + // Before executing, check if service is still up and not shutting down + if *self.shutdown.read().await || self.is_target_down(&name).await { break; } - }; + } else { + // No upcoming scheduled times; exit the loop + debug!("service '{}' has not more scheduled runs", name); + break; + } + } else if config.one_shot { + // For oneshot services without cron, proceed immediately + debug!( + "service '{}' is a oneshot service without cron; starting immediately", + name + ); + } else { + // For non-oneshot services, proceed as usual + debug!("service '{}' is not a oneshot service: proceeding", name); + } - // Since only oneshot services can have cron, we are in oneshot mode - service.state.set(State::Running); - drop(service); // Release the lock + let log = match config.log { + config::Log::None => Log::None, + config::Log::Stdout => Log::Stdout, + config::Log::Ring => Log::Ring(name.clone()), + }; - // Wait for the child process to finish - let wait_result = child.wait().await; + let mut service = input.write().await; + // we check again in case target has changed. Since we had to release the lock + // earlier to not block locking on this service (for example if a stop was called) + // while the service was waiting for dependencies. + // the lock is kept until the spawning and the update of the pid. + if service.target == Target::Down { + // we check target in loop in case service have + // been set down. + break; + } - let mut service = input.write().await; - service.pid = Pid::from_raw(0); + let child = self + .pm + .run( + Process::new(&config.exec, &config.dir, Some(config.env.clone())), + log.clone(), + ) + .await; + + let child = match child { + Ok(child) => { + service.state.set(State::Spawned); + service.pid = child.pid; + child + } + Err(err) => { + // so, spawning failed. and nothing we can do about it + // this can be duo to a bad command or exe not found. + // set service to failure. + error!("service {} failed to start: {}", name, err); + service.state.set(State::Failure); + break; + } + }; - match wait_result { - Err(err) => { - error!("failed to read service '{}' status: {}", name, err); - service.state.set(State::Unknown); - } - Ok(status) => service.state.set(match status.success() { - true => State::Success, - false => State::Error(status), - }), + service.state.set(State::Running); + drop(service); + + // Wait for the child process to finish + let result = child.wait().await; + + let mut service = input.write().await; + service.pid = Pid::from_raw(0); + + match result { + Err(err) => { + error!("failed to read service '{}' status: {}", name, err); + service.state.set(State::Unknown); } - drop(service); // Release the lock + Ok(status) => { + service.state.set(if status.success() { + State::Success + } else { + State::Error(status) + }); + } + } - // Check if we should schedule the service again based on cron - if let Some(cron_duration) = config.cron { - if *self.shutdown.read().await { - // If shutting down, exit the loop - break; - } - // Wait for the specified cron duration - tokio::time::sleep(std::time::Duration::from_secs(cron_duration)).await; - // Loop will restart and the oneshot service will be executed again + drop(service); + + // For oneshot services with cron, loop to schedule next execution + if config.one_shot { + if config.cron.is_some() { + continue; // Schedule the next execution } else { - // No cron specified, exit the loop after the oneshot execution - break; + self.notify.notify_waiters(); + break; // No cron; exit the loop } + } else { + // For non-oneshot services, handle respawn logic + // Wait before restarting + time::sleep(Duration::from_secs(2)).await; } } let mut service = input.write().await; service.scheduled = false; } + + // Helper function to check if the service target is down + async fn is_target_down(&self, name: &str) -> bool { + let table = self.services.read().await; + if let Some(service) = table.get(name) { + let service = service.read().await; + service.target == Target::Down + } else { + true // Service not found; treat as down + } + } }