From fa88237588be4df244de35a3a0c4f6041409f27f Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Fri, 29 Sep 2023 12:54:30 +0100 Subject: [PATCH] Split shim module into smaller submodules. Signed-off-by: Jorge Prendes --- .github/workflows/ci.yml | 2 + .../containerd-shim-wasm/src/sandbox/cli.rs | 3 +- .../containerd-shim-wasm/src/sandbox/error.rs | 2 +- .../src/sandbox/instance.rs | 57 +- .../containerd-shim-wasm/src/sandbox/mod.rs | 2 +- .../containerd-shim-wasm/src/sandbox/shim.rs | 1278 ----------------- .../src/sandbox/shim/cli.rs | 90 ++ .../src/sandbox/shim/events.rs | 60 + .../src/sandbox/shim/instance_data.rs | 104 ++ .../src/sandbox/shim/instance_option.rs | 48 + .../src/sandbox/shim/local.rs | 402 ++++++ .../src/sandbox/shim/local/tests.rs | 394 +++++ .../src/sandbox/shim/mod.rs | 13 + .../src/sandbox/shim/task_state.rs | 61 + .../containerd-shim-wasm/src/sandbox/stdio.rs | 42 +- .../src/sys/unix/container/instance.rs | 2 +- crates/containerd-shim-wasm/src/testing.rs | 12 +- 17 files changed, 1235 insertions(+), 1337 deletions(-) delete mode 100644 crates/containerd-shim-wasm/src/sandbox/shim.rs create mode 100644 crates/containerd-shim-wasm/src/sandbox/shim/cli.rs create mode 100644 crates/containerd-shim-wasm/src/sandbox/shim/events.rs create mode 100644 crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs create mode 100644 crates/containerd-shim-wasm/src/sandbox/shim/instance_option.rs create mode 100644 crates/containerd-shim-wasm/src/sandbox/shim/local.rs create mode 100644 crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs create mode 100644 crates/containerd-shim-wasm/src/sandbox/shim/mod.rs create mode 100644 crates/containerd-shim-wasm/src/sandbox/shim/task_state.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 222b5b980..1d6d45424 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -80,6 +80,7 @@ jobs: name: ${{ matrix.runtime }} needs: [build-ubuntu, test-image] strategy: + fail-fast: false matrix: # 20.04 uses cgroupv1, 22.04 uses cgroupv2 os: ["ubuntu-20.04", "ubuntu-22.04"] @@ -108,6 +109,7 @@ jobs: name: ${{ matrix.runtime }} needs: [build-ubuntu, test-image] strategy: + fail-fast: false matrix: os: ["ubuntu-20.04", "ubuntu-22.04"] runtime: ["wasmtime", "wasmedge", "wasmer"] diff --git a/crates/containerd-shim-wasm/src/sandbox/cli.rs b/crates/containerd-shim-wasm/src/sandbox/cli.rs index ecbe5b337..3d9f370bc 100644 --- a/crates/containerd-shim-wasm/src/sandbox/cli.rs +++ b/crates/containerd-shim-wasm/src/sandbox/cli.rs @@ -6,7 +6,8 @@ use containerd_shim::{parse, run, Config}; use ttrpc::Server; use crate::sandbox::manager::Shim; -use crate::sandbox::{Instance, Local, ManagerService, ShimCli}; +use crate::sandbox::shim::Local; +use crate::sandbox::{Instance, ManagerService, ShimCli}; use crate::services::sandbox_ttrpc::{create_manager, Manager}; pub mod r#impl { diff --git a/crates/containerd-shim-wasm/src/sandbox/error.rs b/crates/containerd-shim-wasm/src/sandbox/error.rs index a9a87f0c0..328371dc9 100644 --- a/crates/containerd-shim-wasm/src/sandbox/error.rs +++ b/crates/containerd-shim-wasm/src/sandbox/error.rs @@ -50,7 +50,7 @@ pub enum Error { Containerd(String), } -pub type Result = ::std::result::Result; +pub type Result = ::std::result::Result; impl From for ttrpc::Error { fn from(e: Error) -> Self { diff --git a/crates/containerd-shim-wasm/src/sandbox/instance.rs b/crates/containerd-shim-wasm/src/sandbox/instance.rs index 2674e304c..eb3d3f56e 100644 --- a/crates/containerd-shim-wasm/src/sandbox/instance.rs +++ b/crates/containerd-shim-wasm/src/sandbox/instance.rs @@ -1,5 +1,6 @@ //! Abstractions for running/managing a wasm/wasi instance. +use std::path::{Path, PathBuf}; use std::time::Duration; use chrono::{DateTime, Utc}; @@ -16,13 +17,13 @@ pub struct InstanceConfig { /// This should be cheap to clone. engine: Engine, /// Optional stdin named pipe path. - stdin: Option, + stdin: PathBuf, /// Optional stdout named pipe path. - stdout: Option, + stdout: PathBuf, /// Optional stderr named pipe path. - stderr: Option, + stderr: PathBuf, /// Path to the OCI bundle directory. - bundle: Option, + bundle: PathBuf, /// Namespace for containerd namespace: String, // /// GRPC address back to main containerd @@ -30,60 +31,66 @@ pub struct InstanceConfig { } impl InstanceConfig { - pub fn new(engine: Engine, namespace: String, containerd_address: String) -> Self { + pub fn new( + engine: Engine, + namespace: impl AsRef, + containerd_address: impl AsRef, + ) -> Self { + let namespace = namespace.as_ref().to_string(); + let containerd_address = containerd_address.as_ref().to_string(); Self { engine, namespace, containerd_address, - stdin: None, - stdout: None, - stderr: None, - bundle: None, + stdin: PathBuf::default(), + stdout: PathBuf::default(), + stderr: PathBuf::default(), + bundle: PathBuf::default(), } } /// set the stdin path for the instance - pub fn set_stdin(&mut self, stdin: String) -> &mut Self { - self.stdin = Some(stdin); + pub fn set_stdin(&mut self, stdin: impl AsRef) -> &mut Self { + self.stdin = stdin.as_ref().to_path_buf(); self } /// get the stdin path for the instance - pub fn get_stdin(&self) -> Option { - self.stdin.clone() + pub fn get_stdin(&self) -> &Path { + &self.stdin } /// set the stdout path for the instance - pub fn set_stdout(&mut self, stdout: String) -> &mut Self { - self.stdout = Some(stdout); + pub fn set_stdout(&mut self, stdout: impl AsRef) -> &mut Self { + self.stdout = stdout.as_ref().to_path_buf(); self } /// get the stdout path for the instance - pub fn get_stdout(&self) -> Option { - self.stdout.clone() + pub fn get_stdout(&self) -> &Path { + &self.stdout } /// set the stderr path for the instance - pub fn set_stderr(&mut self, stderr: String) -> &mut Self { - self.stderr = Some(stderr); + pub fn set_stderr(&mut self, stderr: impl AsRef) -> &mut Self { + self.stderr = stderr.as_ref().to_path_buf(); self } /// get the stderr path for the instance - pub fn get_stderr(&self) -> Option { - self.stderr.clone() + pub fn get_stderr(&self) -> &Path { + &self.stderr } /// set the OCI bundle path for the instance - pub fn set_bundle(&mut self, bundle: String) -> &mut Self { - self.bundle = Some(bundle); + pub fn set_bundle(&mut self, bundle: impl AsRef) -> &mut Self { + self.bundle = bundle.as_ref().to_path_buf(); self } /// get the OCI bundle path for the instance - pub fn get_bundle(&self) -> Option { - self.bundle.clone() + pub fn get_bundle(&self) -> &Path { + &self.bundle } /// get the wasm engine for the instance diff --git a/crates/containerd-shim-wasm/src/sandbox/mod.rs b/crates/containerd-shim-wasm/src/sandbox/mod.rs index 20f87fc86..ef6db1811 100644 --- a/crates/containerd-shim-wasm/src/sandbox/mod.rs +++ b/crates/containerd-shim-wasm/src/sandbox/mod.rs @@ -14,7 +14,7 @@ pub mod sync; pub use error::{Error, Result}; pub use instance::{Instance, InstanceConfig}; pub use manager::{Sandbox as SandboxService, Service as ManagerService}; -pub use shim::{Cli as ShimCli, Local}; +pub use shim::Cli as ShimCli; pub use stdio::Stdio; pub(crate) mod containerd; diff --git a/crates/containerd-shim-wasm/src/sandbox/shim.rs b/crates/containerd-shim-wasm/src/sandbox/shim.rs deleted file mode 100644 index f9de00db3..000000000 --- a/crates/containerd-shim-wasm/src/sandbox/shim.rs +++ /dev/null @@ -1,1278 +0,0 @@ -//! The shim is the entrypoint for the containerd shim API. It is responsible -//! for commmuincating with the containerd daemon and managing the lifecycle of -//! the container/sandbox. - -use std::collections::HashMap; -use std::env::current_dir; -use std::fs::{self, DirBuilder, File}; -use std::ops::Not; -#[cfg(unix)] -use std::os::unix::fs::DirBuilderExt; -use std::path::Path; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread; -use std::time::Duration; - -use anyhow::Context as AnyhowContext; -use chrono::{DateTime, Utc}; -use containerd_shim::error::Error as ShimError; -use containerd_shim::event::Event; -#[cfg(unix)] -use containerd_shim::mount::mount_rootfs; -use containerd_shim::protos::events::task::{TaskCreate, TaskDelete, TaskExit, TaskIO, TaskStart}; -use containerd_shim::protos::protobuf::well_known_types::timestamp::Timestamp; -use containerd_shim::protos::protobuf::{MessageDyn, MessageField}; -use containerd_shim::protos::shim::shim_ttrpc::Task; -use containerd_shim::protos::types::task::Status; -use containerd_shim::publisher::RemotePublisher; -use containerd_shim::util::{timestamp as new_timestamp, write_address, IntoOption}; -use containerd_shim::{self as shim, api, warn, ExitSignal, TtrpcContext, TtrpcResult}; -use log::{debug, error}; -#[cfg(unix)] -use nix::mount::{mount, MsFlags}; -use oci_spec::runtime::Spec; -use shim::api::{StatsRequest, StatsResponse}; -use shim::Flags; -use ttrpc::context::Context; - -use super::instance::{Instance, InstanceConfig, Nop}; -use super::{oci, Error, SandboxService}; -use crate::sys::metrics::get_metrics; -use crate::sys::networking::setup_namespaces; - -enum InstanceOption { - Instance(I), - Nop(Nop), -} - -impl Instance for InstanceOption { - type Engine = (); - - fn new(_id: String, _cfg: Option<&InstanceConfig>) -> Result { - // this is never called - unimplemented!(); - } - - fn start(&self) -> Result { - match self { - Self::Instance(i) => i.start(), - Self::Nop(i) => i.start(), - } - } - - fn kill(&self, signal: u32) -> Result<()> { - match self { - Self::Instance(i) => i.kill(signal), - Self::Nop(i) => i.kill(signal), - } - } - - fn delete(&self) -> Result<()> { - match self { - Self::Instance(i) => i.delete(), - Self::Nop(i) => i.delete(), - } - } - - fn wait_timeout(&self, t: impl Into>) -> Option<(u32, DateTime)> { - match self { - Self::Instance(i) => i.wait_timeout(t), - Self::Nop(i) => i.wait_timeout(t), - } - } -} - -struct InstanceData { - instance: InstanceOption, - cfg: InstanceConfig, - pid: RwLock>, - state: Arc>, -} - -type Result = std::result::Result; - -impl InstanceData { - fn start(&self) -> Result { - let mut s = self.state.write().unwrap(); - s.start()?; - - let res = self.instance.start(); - - // These state transitions are always `Ok(())` because - // we hold the lock since `s.start()` - let _ = match res { - Ok(_) => s.started(), - Err(_) => s.stop(), - }; - - res - } - - fn kill(&self, signal: u32) -> Result<()> { - let mut s = self.state.write().unwrap(); - s.kill()?; - - self.instance.kill(signal) - } - - fn delete(&self) -> Result<()> { - let mut s = self.state.write().unwrap(); - s.delete()?; - - let res = self.instance.delete(); - - if res.is_err() { - // Always `Ok(())` because we hold the lock since `s.delete()` - let _ = s.stop(); - } - - res - } - - fn wait(&self) -> (u32, DateTime) { - let res = self.instance.wait(); - let mut s = self.state.write().unwrap(); - *s = TaskState::Exited; - res - } - - fn wait_timeout(&self, t: impl Into>) -> Option<(u32, DateTime)> { - let res = self.instance.wait_timeout(t); - if res.is_some() { - let mut s = self.state.write().unwrap(); - *s = TaskState::Exited; - } - res - } -} - -type EventSender = Sender<(String, Box)>; - -#[derive(Debug, Clone, Copy)] -enum TaskState { - Created, - Starting, - Started, - Exited, - Deleting, -} - -impl TaskState { - pub fn start(&mut self) -> Result<()> { - *self = match self { - Self::Created => Ok(Self::Starting), - _ => state_transition_error(*self, Self::Starting), - }?; - Ok(()) - } - - pub fn kill(&mut self) -> Result<()> { - *self = match self { - Self::Started => Ok(Self::Started), - _ => state_transition_error(*self, "Killing"), - }?; - Ok(()) - } - - pub fn delete(&mut self) -> Result<()> { - *self = match self { - Self::Created | Self::Exited => Ok(Self::Deleting), - _ => state_transition_error(*self, Self::Deleting), - }?; - Ok(()) - } - - pub fn started(&mut self) -> Result<()> { - *self = match self { - Self::Starting => Ok(Self::Started), - _ => state_transition_error(*self, Self::Started), - }?; - Ok(()) - } - - pub fn stop(&mut self) -> Result<()> { - *self = match self { - Self::Started | Self::Starting => Ok(Self::Exited), - // This is for potential failure cases where we want delete to be able to be retried. - Self::Deleting => Ok(Self::Exited), - _ => state_transition_error(*self, Self::Exited), - }?; - Ok(()) - } -} - -fn state_transition_error(from: impl std::fmt::Debug, to: impl std::fmt::Debug) -> Result { - Err(Error::FailedPrecondition(format!( - "invalid state transition: {from:?} => {to:?}" - ))) -} - -type LocalInstances = Arc>>>>; - -/// Local implements the Task service for a containerd shim. -/// It defers all task operations to the `Instance` implementation. -#[derive(Clone)] -pub struct Local { - engine: T::Engine, - instances: LocalInstances, - events: Arc>, - exit: Arc, - namespace: String, - containerd_address: String, -} - -#[cfg(test)] -mod localtests { - use std::fs::create_dir; - use std::time::Duration; - - use anyhow::Context; - use oci_spec::runtime; - use serde_json as json; - use tempfile::tempdir; - - use super::*; - - struct LocalWithDescrutor { - local: Arc>, - } - - impl LocalWithDescrutor { - fn new(local: Arc>) -> Self { - Self { local } - } - } - - impl Drop for LocalWithDescrutor { - fn drop(&mut self) { - self.local - .instances - .write() - .unwrap() - .iter() - .for_each(|(_, v)| { - let _ = v.kill(9); - v.delete().unwrap(); - }); - } - } - - fn with_cri_sandbox(spec: Option, id: String) -> runtime::Spec { - let mut s = spec.unwrap_or_default(); - let mut annotations = HashMap::new(); - s.annotations().as_ref().map(|a| { - a.iter().map(|(k, v)| { - annotations.insert(k.to_string(), v.to_string()); - }) - }); - annotations.insert("io.kubernetes.cri.sandbox-id".to_string(), id); - - s.set_annotations(Some(annotations)); - s - } - - fn create_bundle(dir: &std::path::Path, spec: Option) -> Result<()> { - create_dir(dir.join("rootfs"))?; - - let s = spec.unwrap_or_default(); - - json::to_writer(File::create(dir.join("config.json"))?, &s) - .context("could not write config.json")?; - Ok(()) - } - - #[test] - fn test_delete_after_create() { - let dir = tempdir().unwrap(); - let id = "test-delete-after-create"; - create_bundle(dir.path(), None).unwrap(); - - let (tx, _rx) = channel(); - let local = Arc::new(Local::::new( - (), - tx, - Arc::new(ExitSignal::default()), - "test_namespace".into(), - "/test/address".into(), - )); - let mut _wrapped = LocalWithDescrutor::new(local.clone()); - - local - .task_create(api::CreateTaskRequest { - id: id.to_string(), - bundle: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }) - .unwrap(); - - local - .task_delete(api::DeleteRequest { - id: id.to_string(), - ..Default::default() - }) - .unwrap(); - } - - #[test] - fn test_cri_task() -> Result<()> { - // Currently the relationship between the "base" container and the "instances" are pretty weak. - // When a cri sandbox is specified we just assume it's the sandbox container and treat it as such by not actually running the code (which is going to be wasm). - let (etx, _erx) = channel(); - let exit_signal = Arc::new(ExitSignal::default()); - let local = Arc::new(Local::::new( - (), - etx, - exit_signal, - "test_namespace".into(), - "/test/address".into(), - )); - - let mut _wrapped = LocalWithDescrutor::new(local.clone()); - - let temp = tempdir().unwrap(); - let dir = temp.path(); - let sandbox_id = "test-cri-task".to_string(); - create_bundle(dir, Some(with_cri_sandbox(None, sandbox_id.clone())))?; - - local.task_create(api::CreateTaskRequest { - id: "testbase".to_string(), - bundle: dir.to_str().unwrap().to_string(), - ..Default::default() - })?; - - let state = local.task_state(api::StateRequest { - id: "testbase".to_string(), - ..Default::default() - })?; - assert_eq!(state.status(), Status::CREATED); - - // A little janky since this is internal data, but check that this is seen as a sandbox container - let i = local.get_instance("testbase")?; - assert!(matches!(i.instance, InstanceOption::Nop(_))); - - local.task_start(api::StartRequest { - id: "testbase".to_string(), - ..Default::default() - })?; - - let state = local.task_state(api::StateRequest { - id: "testbase".to_string(), - ..Default::default() - })?; - assert_eq!(state.status(), Status::RUNNING); - - let ll = local.clone(); - let (base_tx, base_rx) = channel(); - thread::spawn(move || { - let resp = ll.task_wait(api::WaitRequest { - id: "testbase".to_string(), - ..Default::default() - }); - base_tx.send(resp).unwrap(); - }); - base_rx.try_recv().unwrap_err(); - - let temp2 = tempdir().unwrap(); - let dir2 = temp2.path(); - create_bundle(dir2, Some(with_cri_sandbox(None, sandbox_id)))?; - - local.task_create(api::CreateTaskRequest { - id: "testinstance".to_string(), - bundle: dir2.to_str().unwrap().to_string(), - ..Default::default() - })?; - - let state = local.task_state(api::StateRequest { - id: "testinstance".to_string(), - ..Default::default() - })?; - assert_eq!(state.status(), Status::CREATED); - - // again, this is janky since it is internal data, but check that this is seen as a "real" container. - // this is the inverse of the above test case. - let i = local.get_instance("testinstance")?; - assert!(matches!(i.instance, InstanceOption::Instance(_))); - - local.task_start(api::StartRequest { - id: "testinstance".to_string(), - ..Default::default() - })?; - - let state = local.task_state(api::StateRequest { - id: "testinstance".to_string(), - ..Default::default() - })?; - assert_eq!(state.status(), Status::RUNNING); - - let stats = local.task_stats(api::StatsRequest { - id: "testinstance".to_string(), - ..Default::default() - })?; - assert!(stats.has_stats()); - - let ll = local.clone(); - let (instance_tx, instance_rx) = channel(); - std::thread::spawn(move || { - let resp = ll.task_wait(api::WaitRequest { - id: "testinstance".to_string(), - ..Default::default() - }); - instance_tx.send(resp).unwrap(); - }); - instance_rx.try_recv().unwrap_err(); - - local.task_kill(api::KillRequest { - id: "testinstance".to_string(), - signal: 9, - ..Default::default() - })?; - - instance_rx.recv_timeout(Duration::from_secs(5)).unwrap()?; - - let state = local.task_state(api::StateRequest { - id: "testinstance".to_string(), - ..Default::default() - })?; - assert_eq!(state.status(), Status::STOPPED); - local.task_delete(api::DeleteRequest { - id: "testinstance".to_string(), - ..Default::default() - })?; - - match local - .task_state(api::StateRequest { - id: "testinstance".to_string(), - ..Default::default() - }) - .unwrap_err() - { - Error::NotFound(_) => {} - e => return Err(e), - } - - base_rx.try_recv().unwrap_err(); - let state = local.task_state(api::StateRequest { - id: "testbase".to_string(), - ..Default::default() - })?; - assert_eq!(state.status(), Status::RUNNING); - - local.task_kill(api::KillRequest { - id: "testbase".to_string(), - signal: 9, - ..Default::default() - })?; - - base_rx.recv_timeout(Duration::from_secs(5)).unwrap()?; - let state = local.task_state(api::StateRequest { - id: "testbase".to_string(), - ..Default::default() - })?; - assert_eq!(state.status(), Status::STOPPED); - - local.task_delete(api::DeleteRequest { - id: "testbase".to_string(), - ..Default::default() - })?; - match local - .task_state(api::StateRequest { - id: "testbase".to_string(), - ..Default::default() - }) - .unwrap_err() - { - Error::NotFound(_) => {} - e => return Err(e), - } - - Ok(()) - } - - #[test] - fn test_task_lifecycle() -> Result<()> { - let (etx, _erx) = channel(); // TODO: check events - let exit_signal = Arc::new(ExitSignal::default()); - let local = Arc::new(Local::::new( - (), - etx, - exit_signal, - "test_namespace".into(), - "/test/address".into(), - )); - - let mut _wrapped = LocalWithDescrutor::new(local.clone()); - - let temp = tempdir().unwrap(); - let dir = temp.path(); - create_bundle(dir, None)?; - - match local - .task_state(api::StateRequest { - id: "test".to_string(), - ..Default::default() - }) - .unwrap_err() - { - Error::NotFound(_) => {} - e => return Err(e), - } - - local.task_create(api::CreateTaskRequest { - id: "test".to_string(), - bundle: dir.to_str().unwrap().to_string(), - ..Default::default() - })?; - - match local - .task_create(api::CreateTaskRequest { - id: "test".to_string(), - bundle: dir.to_str().unwrap().to_string(), - ..Default::default() - }) - .unwrap_err() - { - Error::AlreadyExists(_) => {} - e => return Err(e), - } - - let state = local.task_state(api::StateRequest { - id: "test".to_string(), - ..Default::default() - })?; - - assert_eq!(state.status(), Status::CREATED); - - local.task_start(api::StartRequest { - id: "test".to_string(), - ..Default::default() - })?; - - let state = local.task_state(api::StateRequest { - id: "test".to_string(), - ..Default::default() - })?; - - assert_eq!(state.status(), Status::RUNNING); - - let (tx, rx) = channel(); - let ll = local.clone(); - thread::spawn(move || { - let resp = ll.task_wait(api::WaitRequest { - id: "test".to_string(), - ..Default::default() - }); - tx.send(resp).unwrap(); - }); - - rx.try_recv().unwrap_err(); - - let res = local.task_stats(api::StatsRequest { - id: "test".to_string(), - ..Default::default() - })?; - assert!(res.has_stats()); - - local.task_kill(api::KillRequest { - id: "test".to_string(), - signal: 9, - ..Default::default() - })?; - - rx.recv_timeout(Duration::from_secs(5)).unwrap()?; - - let state = local.task_state(api::StateRequest { - id: "test".to_string(), - ..Default::default() - })?; - assert_eq!(state.status(), Status::STOPPED); - - local.task_delete(api::DeleteRequest { - id: "test".to_string(), - ..Default::default() - })?; - - match local - .task_state(api::StateRequest { - id: "test".to_string(), - ..Default::default() - }) - .unwrap_err() - { - Error::NotFound(_) => {} - e => return Err(e), - } - - Ok(()) - } -} - -impl Local { - /// Creates a new local task service. - pub fn new( - engine: T::Engine, - tx: Sender<(String, Box)>, - exit: Arc, - namespace: String, - containerd_address: String, - ) -> Self { - Self { - // Note: engine.clone() is a shallow clone, is really cheap to do, and is safe to pass around. - engine, - instances: Arc::new(RwLock::new(HashMap::new())), - events: Arc::new(Mutex::new(tx)), - exit, - namespace, - containerd_address, - } - } - - fn new_base(&self, id: String) -> InstanceData { - let cfg = InstanceConfig::new( - self.engine.clone(), - self.namespace.clone(), - self.containerd_address.clone(), - ); - InstanceData { - instance: InstanceOption::Nop(Nop::new(id, None).unwrap()), - cfg, - pid: RwLock::new(None), - state: Arc::new(RwLock::new(TaskState::Created)), - } - } - - fn send_event(&self, event: impl Event) { - let topic = event.topic(); - self.events - .lock() - .unwrap() - .send((topic.clone(), Box::new(event))) - .unwrap_or_else(|e| warn!("failed to send event for topic {}: {}", topic, e)); - } - - fn get_instance(&self, id: &str) -> Result>> { - self.instances - .read() - .unwrap() - .get(id) - .ok_or_else(|| Error::NotFound(id.to_string())) - .map(|i| i.clone()) - } - - fn instance_exists(&self, id: &str) -> bool { - self.instances.read().unwrap().contains_key(id) - } - - fn is_empty(&self) -> bool { - self.instances.read().unwrap().is_empty() - } - - fn task_create(&self, req: api::CreateTaskRequest) -> Result { - if !req.checkpoint().is_empty() || !req.parent_checkpoint().is_empty() { - return Err(ShimError::Unimplemented("checkpoint is not supported".to_string()).into()); - } - - if req.terminal { - return Err(Error::InvalidArgument( - "terminal is not supported".to_string(), - )); - } - - if self.instance_exists(req.id()) { - return Err(Error::AlreadyExists(req.id)); - } - - let mut spec = Spec::load( - Path::new(req.bundle()) - .join("config.json") - .as_path() - .to_str() - .unwrap(), - ) - .map_err(|err| Error::InvalidArgument(format!("could not load runtime spec: {}", err)))?; - - if self.is_empty() { - // Check if this is a cri container - // If it is cri, then this is the "pause" container, which we don't need to deal with. - // - // TODO: maybe we can just go ahead and execute the actual container with runc? - if spec.annotations().is_some() { - let annotations = spec.annotations().as_ref().unwrap(); - if annotations.contains_key("io.kubernetes.cri.sandbox-id") { - self.instances - .write() - .unwrap() - .insert(req.id.clone(), Arc::new(self.new_base(req.id.clone()))); - self.send_event(TaskCreate { - container_id: req.id, - bundle: req.bundle, - rootfs: req.rootfs, - io: MessageField::some(TaskIO { - stdin: req.stdin, - stdout: req.stdout, - stderr: req.stderr, - ..Default::default() - }), - ..Default::default() - }); - return Ok(api::CreateTaskResponse { - pid: std::process::id(), // TODO: PID - ..Default::default() - }); - } - } - } - - spec.canonicalize_rootfs(req.bundle()).map_err(|err| { - ShimError::InvalidArgument(format!("could not canonicalize rootfs: {}", err)) - })?; - let rootfs = spec - .root() - .as_ref() - .ok_or_else(|| Error::InvalidArgument("rootfs is not set in runtime spec".to_string()))? - .path(); - let mut mkdir = DirBuilder::new(); - mkdir.recursive(true); - #[cfg(unix)] - mkdir.mode(0o755); - if mkdir.create(rootfs).is_ok() { /* ignore */ } - - let rootfs_mounts = req.rootfs().to_vec(); - if !rootfs_mounts.is_empty() { - for m in rootfs_mounts { - let mount_type = m.type_().none_if(|&x| x.is_empty()); - let source = m.source.as_str().none_if(|&x| x.is_empty()); - - #[cfg(unix)] - mount_rootfs(mount_type, source, &m.options.to_vec(), rootfs)?; - } - } - - let default_mounts = vec![]; - let mounts = spec.mounts().as_ref().unwrap_or(&default_mounts); - for m in mounts { - if m.typ().is_some() { - match m.typ().as_ref().unwrap().as_str() { - "tmpfs" | "proc" | "cgroup" | "sysfs" | "devpts" | "mqueue" => continue, - _ => (), - }; - }; - - let source = m.source().as_deref().map(|x| x.to_str()).unwrap_or(None); - let target = m - .destination() - .strip_prefix(std::path::MAIN_SEPARATOR.to_string()) - .map_err(|err| { - ShimError::InvalidArgument(format!("error stripping path prefix: {}", err)) - })?; - - let rootfs_target = Path::new(rootfs).join(target); - - if source.is_some() { - let md = fs::metadata(source.unwrap()).map_err(|err| { - Error::InvalidArgument(format!("could not get metadata for source: {}", err)) - })?; - - if md.is_dir() { - fs::create_dir_all(&rootfs_target).map_err(|err| { - ShimError::Other(format!( - "error creating directory for mount target {}: {}", - target.to_str().unwrap(), - err - )) - })?; - } else { - let parent = rootfs_target.parent(); - if parent.is_some() { - fs::create_dir_all(parent.unwrap()).map_err(|err| { - ShimError::Other(format!( - "error creating parent for mount target {}: {}", - parent.unwrap().to_str().unwrap(), - err - )) - })?; - } - File::create(&rootfs_target) - .map_err(|err| ShimError::Other(format!("{}", err)))?; - } - } - - let mut newopts = vec![]; - let opts = m.options().as_ref(); - if let Some(os) = opts { - for o in os { - newopts.push(o.to_string()); - } - } - - let mut typ = m.typ().as_deref(); - if typ.is_some() && typ.unwrap() == "bind" { - typ = None; - newopts.push("rbind".to_string()); - } - - #[cfg(unix)] - mount_rootfs(typ, source, &newopts, &rootfs_target).map_err(|err| { - ShimError::Other(format!( - "error mounting {} to {} as {}: {}", - source.unwrap_or_default(), - rootfs_target.to_str().unwrap(), - m.typ().as_deref().unwrap_or("none"), - err - )) - })?; - } - - let engine = self.engine.clone(); - let mut builder = InstanceConfig::new( - engine, - self.namespace.clone(), - self.containerd_address.clone(), - ); - builder - .set_stdin(req.stdin().to_string()) - .set_stdout(req.stdout().to_string()) - .set_stderr(req.stderr().to_string()) - .set_bundle(req.bundle().to_string()); - self.instances.write().unwrap().insert( - req.id().to_string(), - Arc::new(InstanceData { - instance: InstanceOption::Instance(T::new(req.id().to_string(), Some(&builder))?), - cfg: builder, - pid: RwLock::new(None), - state: Arc::new(RwLock::new(TaskState::Created)), - }), - ); - - self.send_event(TaskCreate { - container_id: req.id().into(), - bundle: req.bundle().into(), - rootfs: req.rootfs().into(), - io: MessageField::some(TaskIO { - stdin: req.stdin().into(), - stdout: req.stdout().into(), - stderr: req.stderr().into(), - ..Default::default() - }), - ..Default::default() - }); - - debug!("create done"); - - // Per the spec, the prestart hook must be called as part of the create operation - debug!("call prehook before the start"); - oci::setup_prestart_hooks(spec.hooks())?; - - Ok(api::CreateTaskResponse { - pid: std::process::id(), - ..Default::default() - }) - } - - fn task_start(&self, req: api::StartRequest) -> Result { - if req.exec_id().is_empty().not() { - return Err(ShimError::Unimplemented("exec is not supported".to_string()).into()); - } - - let i = self.get_instance(req.id())?; - let pid = i.start()?; - - let mut pid_w = i.pid.write().unwrap(); - *pid_w = Some(pid); - drop(pid_w); - - self.send_event(TaskStart { - container_id: req.id().into(), - pid, - ..Default::default() - }); - - let sender = self.events.clone(); - let id = req.id().to_string(); - - thread::Builder::new() - .name(format!("{}-wait", req.id())) - .spawn(move || { - let exit_code = i.wait(); - - let timestamp = new_timestamp().unwrap(); - let event = TaskExit { - container_id: id.clone(), - exit_status: exit_code.0, - exited_at: MessageField::some(timestamp), - pid, - id, - ..Default::default() - }; - - let topic = event.topic(); - sender - .lock() - .unwrap() - .send((topic.clone(), Box::new(event))) - .unwrap_or_else(|err| { - error!("failed to send event for topic {}: {}", topic, err) - }); - }) - .context("could not spawn thread to wait exit") - .map_err(Error::from)?; - - debug!("started: {:?}", req); - - Ok(api::StartResponse { - pid, - ..Default::default() - }) - } - - fn task_kill(&self, req: api::KillRequest) -> Result<()> { - if req.exec_id().is_empty().not() { - return Err(Error::InvalidArgument("exec is not supported".to_string())); - } - self.get_instance(req.id())?.kill(req.signal())?; - Ok(()) - } - - fn task_delete(&self, req: api::DeleteRequest) -> Result { - if req.exec_id().is_empty().not() { - return Err(Error::InvalidArgument("exec is not supported".to_string())); - } - - let i = self.get_instance(req.id())?; - - i.delete()?; - - let pid = i.pid.read().unwrap().unwrap_or_default(); - - let mut event = TaskDelete { - container_id: req.id().into(), - pid, - ..Default::default() - }; - - let mut resp = api::DeleteResponse { - pid, - ..Default::default() - }; - - if let Some(ec) = i.wait_timeout(Duration::ZERO) { - event.exit_status = ec.0; - resp.exit_status = ec.0; - - let mut ts = Timestamp::new(); - ts.seconds = ec.1.timestamp(); - ts.nanos = ec.1.timestamp_subsec_nanos() as i32; - - let timestamp = new_timestamp()?; - event.set_exited_at(timestamp.clone()); - resp.set_exited_at(timestamp); - } - - self.instances.write().unwrap().remove(req.id()); - - self.send_event(event); - Ok(resp) - } - - fn task_wait(&self, req: api::WaitRequest) -> Result { - if req.exec_id().is_empty().not() { - return Err(Error::InvalidArgument("exec is not supported".to_string())); - } - - let i = self.get_instance(req.id())?; - - let code = i.wait(); - debug!("wait done: {:?}", req); - - let mut timestamp = Timestamp::new(); - timestamp.seconds = code.1.timestamp(); - timestamp.nanos = code.1.timestamp_subsec_nanos() as i32; - - let mut wr = api::WaitResponse { - exit_status: code.0, - ..Default::default() - }; - wr.set_exited_at(timestamp); - Ok(wr) - } - - fn task_state(&self, req: api::StateRequest) -> Result { - if req.exec_id().is_empty().not() { - return Err(Error::InvalidArgument("exec is not supported".to_string())); - } - - let i = self.get_instance(req.id())?; - let mut state = api::StateResponse { - bundle: i.cfg.get_bundle().unwrap_or_default(), - stdin: i.cfg.get_stdin().unwrap_or_default(), - stdout: i.cfg.get_stdout().unwrap_or_default(), - stderr: i.cfg.get_stderr().unwrap_or_default(), - ..Default::default() - }; - - let pid_lock = i.pid.read().unwrap(); - let pid = *pid_lock; - if pid.is_none() { - state.set_status(Status::CREATED); - return Ok(state); - } - drop(pid_lock); - - state.set_pid(pid.unwrap()); - - if let Some(c) = i.wait_timeout(Duration::ZERO) { - state.set_status(Status::STOPPED); - let ec = c; - state.exit_status = ec.0; - - let mut timestamp = Timestamp::new(); - timestamp.seconds = ec.1.timestamp(); - timestamp.nanos = ec.1.timestamp_subsec_nanos() as i32; - state.set_exited_at(timestamp); - } else { - state.set_status(Status::RUNNING); - } - Ok(state) - } - - fn task_stats(&self, req: StatsRequest) -> Result { - let i = self.get_instance(req.id())?; - let pid_lock = i.pid.read().unwrap(); - let pid = *pid_lock; - if pid.is_none() { - return Err(Error::InvalidArgument("task is not running".to_string())); - } - - let metrics = get_metrics(pid.unwrap())?; - - let mut stats = StatsResponse { - ..Default::default() - }; - stats.set_stats(metrics); - Ok(stats) - } -} - -impl SandboxService for Local { - type Instance = T; - fn new( - namespace: String, - containerd_address: String, - _id: String, - engine: T::Engine, - publisher: RemotePublisher, - ) -> Self { - let (tx, rx) = channel::<(String, Box)>(); - forward_events(namespace.clone(), publisher, rx); - Local::::new( - engine, - tx.clone(), - Arc::new(ExitSignal::default()), - namespace, - containerd_address, - ) - } -} - -impl Task for Local { - fn create( - &self, - _ctx: &TtrpcContext, - req: api::CreateTaskRequest, - ) -> TtrpcResult { - debug!("create: {:?}", req); - let resp = self.task_create(req)?; - Ok(resp) - } - - fn start( - &self, - _ctx: &::ttrpc::TtrpcContext, - req: api::StartRequest, - ) -> TtrpcResult { - debug!("start: {:?}", req); - let resp = self.task_start(req)?; - Ok(resp) - } - - fn kill(&self, _ctx: &TtrpcContext, req: api::KillRequest) -> TtrpcResult { - debug!("kill: {:?}", req); - self.task_kill(req)?; - Ok(api::Empty::new()) - } - - fn delete( - &self, - _ctx: &TtrpcContext, - req: api::DeleteRequest, - ) -> TtrpcResult { - debug!("delete: {:?}", req); - let resp = self.task_delete(req)?; - Ok(resp) - } - - fn wait(&self, _ctx: &TtrpcContext, req: api::WaitRequest) -> TtrpcResult { - debug!("wait: {:?}", req); - let resp = self.task_wait(req)?; - Ok(resp) - } - - fn connect( - &self, - _ctx: &TtrpcContext, - req: api::ConnectRequest, - ) -> TtrpcResult { - debug!("connect: {:?}", req); - - let i = self.get_instance(req.id())?; - let pid = *i.pid.read().unwrap().as_ref().unwrap_or(&0); - - Ok(api::ConnectResponse { - shim_pid: std::process::id(), - task_pid: pid, - ..Default::default() - }) - } - - fn state( - &self, - _ctx: &TtrpcContext, - req: api::StateRequest, - ) -> TtrpcResult { - debug!("state: {:?}", req); - let resp = self.task_state(req)?; - Ok(resp) - } - - fn shutdown(&self, _ctx: &TtrpcContext, _req: api::ShutdownRequest) -> TtrpcResult { - debug!("shutdown"); - - if self.is_empty().not() { - return Ok(api::Empty::new()); - } - - self.exit.signal(); - - Ok(api::Empty::new()) - } - - fn stats( - &self, - _ctx: &::ttrpc::TtrpcContext, - req: StatsRequest, - ) -> ::ttrpc::Result { - log::info!("stats: {:?}", req); - let resp = self.task_stats(req)?; - Ok(resp) - } -} - -/// Cli implements the containerd-shim cli interface using `Local` as the task service. -pub struct Cli { - pub engine: T::Engine, - namespace: String, - containerd_address: String, - phantom: std::marker::PhantomData, - exit: Arc, - _id: String, -} - -impl shim::Shim for Cli -where - I: Instance + Sync + Send, - ::Engine: Default, -{ - type T = Local; - - fn new(_runtime_id: &str, args: &Flags, _config: &mut shim::Config) -> Self { - Cli { - engine: Default::default(), - phantom: std::marker::PhantomData, - namespace: args.namespace.to_string(), - containerd_address: args.address.clone(), - exit: Arc::new(ExitSignal::default()), - _id: args.id.to_string(), - } - } - - fn start_shim(&mut self, opts: containerd_shim::StartOpts) -> shim::Result { - let dir = current_dir().map_err(|err| ShimError::Other(err.to_string()))?; - let spec = Spec::load(dir.join("config.json").to_str().unwrap()).map_err(|err| { - shim::Error::InvalidArgument(format!("error loading runtime spec: {}", err)) - })?; - - let default = HashMap::new() as HashMap; - let annotations = spec.annotations().as_ref().unwrap_or(&default); - - let id = opts.id.clone(); - - let grouping = annotations - .get("io.kubernetes.cri.sandbox-id") - .unwrap_or(&id) - .to_string(); - - setup_namespaces(&spec) - .map_err(|e| shim::Error::Other(format!("failed to setup namespaces: {}", e)))?; - - #[cfg(unix)] - mount::( - None, - "/".as_ref(), - None, - MsFlags::MS_REC | MsFlags::MS_SLAVE, - None, - ) - .map_err(|err| { - shim::Error::Other(format!("failed to remount rootfs as secondary: {}", err)) - })?; - - let envs = vec![] as Vec<(&str, &str)>; - let (_child, address) = shim::spawn(opts, &grouping, envs)?; - - write_address(&address)?; - - Ok(address) - } - - fn wait(&mut self) { - self.exit.wait(); - } - - fn create_task_service(&self, publisher: RemotePublisher) -> Self::T { - let (tx, rx) = channel::<(String, Box)>(); - forward_events(self.namespace.to_string(), publisher, rx); - Local::::new( - self.engine.clone(), - tx.clone(), - self.exit.clone(), - self.namespace.clone(), - self.containerd_address.clone(), - ) - } - - fn delete_shim(&mut self) -> shim::Result { - let timestamp = new_timestamp()?; - Ok(api::DeleteResponse { - exit_status: 137, - exited_at: MessageField::some(timestamp), - ..Default::default() - }) - } -} - -fn forward_events( - namespace: String, - publisher: RemotePublisher, - events: Receiver<(String, Box)>, -) { - thread::Builder::new() - .name("event-publisher".to_string()) - .spawn(move || { - for (topic, event) in events.iter() { - publisher - .publish(Context::default(), &topic, &namespace, event) - .unwrap_or_else(|e| warn!("failed to publish event, topic: {}: {}", &topic, e)); - } - }) - .unwrap(); -} diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/cli.rs b/crates/containerd-shim-wasm/src/sandbox/shim/cli.rs new file mode 100644 index 000000000..4cb24094f --- /dev/null +++ b/crates/containerd-shim-wasm/src/sandbox/shim/cli.rs @@ -0,0 +1,90 @@ +use std::env::current_dir; +use std::sync::Arc; + +use chrono::Utc; +use containerd_shim::error::Error as ShimError; +use containerd_shim::publisher::RemotePublisher; +use containerd_shim::util::write_address; +use containerd_shim::{self as shim, api, ExitSignal}; +use oci_spec::runtime::Spec; +use shim::Flags; + +use crate::sandbox::instance::Instance; +use crate::sandbox::shim::events::{RemoteEventSender, ToTimestamp}; +use crate::sandbox::shim::local::Local; +use crate::sys::networking::setup_namespaces; + +/// Cli implements the containerd-shim cli interface using `Local` as the task service. +pub struct Cli { + engine: T::Engine, + namespace: String, + containerd_address: String, + exit: Arc, + _id: String, +} + +impl shim::Shim for Cli +where + I: Instance + Sync + Send, + ::Engine: Default, +{ + type T = Local; + + fn new(_runtime_id: &str, args: &Flags, _config: &mut shim::Config) -> Self { + Cli { + engine: Default::default(), + namespace: args.namespace.to_string(), + containerd_address: args.address.clone(), + exit: Arc::default(), + _id: args.id.to_string(), + } + } + + fn start_shim(&mut self, opts: containerd_shim::StartOpts) -> shim::Result { + let dir = current_dir().map_err(|err| ShimError::Other(err.to_string()))?; + let spec = Spec::load(dir.join("config.json")).map_err(|err| { + shim::Error::InvalidArgument(format!("error loading runtime spec: {}", err)) + })?; + + let id = opts.id.clone(); + let grouping = spec + .annotations() + .as_ref() + .and_then(|a| a.get("io.kubernetes.cri.sandbox-id")) + .unwrap_or(&id); + + setup_namespaces(&spec) + .map_err(|e| shim::Error::Other(format!("failed to setup namespaces: {}", e)))?; + + let (_child, address) = shim::spawn(opts, grouping, vec![])?; + + write_address(&address)?; + + Ok(address) + } + + fn wait(&mut self) { + self.exit.wait(); + } + + fn create_task_service(&self, publisher: RemotePublisher) -> Self::T { + let events = RemoteEventSender::new(&self.namespace, publisher); + let exit = self.exit.clone(); + let engine = self.engine.clone(); + Local::::new( + engine, + events, + exit, + &self.namespace, + &self.containerd_address, + ) + } + + fn delete_shim(&mut self) -> shim::Result { + Ok(api::DeleteResponse { + exit_status: 137, + exited_at: Some(Utc::now().to_timestamp()).into(), + ..Default::default() + }) + } +} diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/events.rs b/crates/containerd-shim-wasm/src/sandbox/shim/events.rs new file mode 100644 index 000000000..c727fe051 --- /dev/null +++ b/crates/containerd-shim-wasm/src/sandbox/shim/events.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use chrono::{DateTime, TimeZone}; +use containerd_shim::event::Event; +use containerd_shim::publisher::RemotePublisher; +use log::warn; +use protobuf::well_known_types::timestamp::Timestamp; + +pub trait EventSender: Clone + Send + Sync + 'static { + fn send(&self, event: impl Event); +} + +#[derive(Clone)] +pub struct RemoteEventSender { + inner: Arc, +} + +struct Inner { + namespace: String, + publisher: RemotePublisher, +} + +impl RemoteEventSender { + pub fn new(namespace: impl AsRef, publisher: RemotePublisher) -> RemoteEventSender { + let namespace = namespace.as_ref().to_string(); + RemoteEventSender { + inner: Arc::new(Inner { + namespace, + publisher, + }), + } + } +} + +impl EventSender for RemoteEventSender { + fn send(&self, event: impl Event) { + let topic = event.topic(); + let event = Box::new(event); + let publisher = &self.inner.publisher; + if let Err(err) = + publisher.publish(Default::default(), &topic, &self.inner.namespace, event) + { + warn!("failed to publish event, topic: {}: {}", &topic, err) + } + } +} + +pub(super) trait ToTimestamp { + fn to_timestamp(self) -> Timestamp; +} + +impl ToTimestamp for DateTime { + fn to_timestamp(self) -> Timestamp { + Timestamp { + seconds: self.timestamp(), + nanos: self.timestamp_subsec_nanos() as i32, + ..Default::default() + } + } +} diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs b/crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs new file mode 100644 index 000000000..837fae683 --- /dev/null +++ b/crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs @@ -0,0 +1,104 @@ +use std::sync::{Arc, OnceLock, RwLock}; +use std::time::Duration; + +use chrono::{DateTime, Utc}; + +use crate::sandbox::instance::Nop; +use crate::sandbox::shim::instance_option::InstanceOption; +use crate::sandbox::shim::task_state::TaskState; +use crate::sandbox::{Instance, InstanceConfig, Result}; + +pub(super) struct InstanceData { + pub instance: InstanceOption, + cfg: InstanceConfig, + pid: OnceLock, + state: Arc>, +} + +impl InstanceData { + pub fn new_instance(id: impl AsRef, cfg: InstanceConfig) -> Result { + let id = id.as_ref().to_string(); + let instance = InstanceOption::Instance(T::new(id, Some(&cfg))?); + Ok(Self { + instance, + cfg, + pid: OnceLock::default(), + state: Arc::new(RwLock::new(TaskState::Created)), + }) + } + + pub fn new_base(id: impl AsRef, cfg: InstanceConfig) -> Result { + let id = id.as_ref().to_string(); + let instance = InstanceOption::Nop(Nop::new(id, None)?); + Ok(Self { + instance, + cfg, + pid: OnceLock::default(), + state: Arc::new(RwLock::new(TaskState::Created)), + }) + } + + pub fn pid(&self) -> Option { + self.pid.get().copied() + } + + pub fn config(&self) -> &InstanceConfig { + &self.cfg + } + + pub fn start(&self) -> Result { + let mut s = self.state.write().unwrap(); + s.start()?; + + let res = self.instance.start(); + + // These state transitions are always `Ok(())` because + // we hold the lock since `s.start()` + let _ = match res { + Ok(pid) => { + let _ = self.pid.set(pid); + s.started() + } + Err(_) => s.stop(), + }; + + res + } + + pub fn kill(&self, signal: u32) -> Result<()> { + let mut s = self.state.write().unwrap(); + s.kill()?; + + self.instance.kill(signal) + } + + pub fn delete(&self) -> Result<()> { + let mut s = self.state.write().unwrap(); + s.delete()?; + + let res = self.instance.delete(); + + if res.is_err() { + // Always `Ok(())` because we hold the lock since `s.delete()` + let _ = s.stop(); + } + + res + } + + pub fn wait(&self) -> (u32, DateTime) { + let res = self.instance.wait(); + let mut s = self.state.write().unwrap(); + *s = TaskState::Exited; + res + } + + pub fn wait_timeout(&self, t: impl Into>) -> Option<(u32, DateTime)> { + let res = self.instance.wait_timeout(t); + if res.is_some() { + let mut s = self.state.write().unwrap(); + *s = TaskState::Exited; + } + res + } +} diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/instance_option.rs b/crates/containerd-shim-wasm/src/sandbox/shim/instance_option.rs new file mode 100644 index 000000000..46ada3e98 --- /dev/null +++ b/crates/containerd-shim-wasm/src/sandbox/shim/instance_option.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use chrono::{DateTime, Utc}; + +use crate::sandbox::instance::Nop; +use crate::sandbox::{Instance, InstanceConfig, Result}; + +pub(super) enum InstanceOption { + Instance(I), + Nop(Nop), +} + +impl Instance for InstanceOption { + type Engine = (); + + fn new(_id: String, _cfg: Option<&InstanceConfig>) -> Result { + // this is never called + unimplemented!(); + } + + fn start(&self) -> Result { + match self { + Self::Instance(i) => i.start(), + Self::Nop(i) => i.start(), + } + } + + fn kill(&self, signal: u32) -> Result<()> { + match self { + Self::Instance(i) => i.kill(signal), + Self::Nop(i) => i.kill(signal), + } + } + + fn delete(&self) -> Result<()> { + match self { + Self::Instance(i) => i.delete(), + Self::Nop(i) => i.delete(), + } + } + + fn wait_timeout(&self, t: impl Into>) -> Option<(u32, DateTime)> { + match self { + Self::Instance(i) => i.wait_timeout(t), + Self::Nop(i) => i.wait_timeout(t), + } + } +} diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/local.rs b/crates/containerd-shim-wasm/src/sandbox/shim/local.rs new file mode 100644 index 000000000..4c486a173 --- /dev/null +++ b/crates/containerd-shim-wasm/src/sandbox/shim/local.rs @@ -0,0 +1,402 @@ +use std::collections::HashMap; +use std::fs::create_dir_all; +use std::ops::Not; +use std::path::Path; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; + +use anyhow::Context as AnyhowContext; +use containerd_shim::api::{ + ConnectRequest, ConnectResponse, CreateTaskRequest, CreateTaskResponse, DeleteRequest, Empty, + KillRequest, ShutdownRequest, StartRequest, StartResponse, StateRequest, StateResponse, + StatsRequest, StatsResponse, WaitRequest, WaitResponse, +}; +use containerd_shim::error::Error as ShimError; +use containerd_shim::protos::events::task::{TaskCreate, TaskDelete, TaskExit, TaskIO, TaskStart}; +use containerd_shim::protos::shim::shim_ttrpc::Task; +use containerd_shim::protos::types::task::Status; +use containerd_shim::publisher::RemotePublisher; +use containerd_shim::util::IntoOption; +use containerd_shim::{DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult}; +use log::debug; +use oci_spec::runtime::Spec; + +use crate::sandbox::instance::{Instance, InstanceConfig}; +use crate::sandbox::shim::events::{EventSender, RemoteEventSender, ToTimestamp}; +use crate::sandbox::shim::instance_data::InstanceData; +use crate::sandbox::{oci, Error, Result, SandboxService}; +use crate::sys::metrics::get_metrics; + +#[cfg(test)] +mod tests; + +type LocalInstances = RwLock>>>; + +/// Local implements the Task service for a containerd shim. +/// It defers all task operations to the `Instance` implementation. +pub struct Local { + pub engine: T::Engine, + pub(super) instances: LocalInstances, + events: E, + exit: Arc, + namespace: String, + containerd_address: String, +} + +impl Local { + /// Creates a new local task service. + pub fn new( + engine: T::Engine, + events: E, + exit: Arc, + namespace: impl AsRef, + containerd_address: impl AsRef, + ) -> Self { + let instances = RwLock::default(); + let namespace = namespace.as_ref().to_string(); + let containerd_address = containerd_address.as_ref().to_string(); + Self { + engine, + instances, + events, + exit, + namespace, + containerd_address, + } + } + + pub(super) fn get_instance(&self, id: &str) -> Result>> { + let instance = self.instances.read().unwrap().get(id).cloned(); + instance.ok_or_else(|| Error::NotFound(id.to_string())) + } + + fn has_instance(&self, id: &str) -> bool { + self.instances.read().unwrap().contains_key(id) + } + + fn is_empty(&self) -> bool { + self.instances.read().unwrap().is_empty() + } + + fn instance_config(&self) -> InstanceConfig { + InstanceConfig::new( + self.engine.clone(), + &self.namespace, + &self.containerd_address, + ) + } +} + +fn is_cri_container(spec: &Spec) -> bool { + spec.annotations() + .as_ref() + .is_some_and(|annotations| annotations.contains_key("io.kubernetes.cri.sandbox-id")) +} + +// These are the same functions as in Task, but without the TtrcpContext, which is useful for testing +impl Local { + fn task_create(&self, req: CreateTaskRequest) -> Result { + if !req.checkpoint().is_empty() || !req.parent_checkpoint().is_empty() { + return Err(ShimError::Unimplemented("checkpoint is not supported".to_string()).into()); + } + + if req.terminal { + return Err(Error::InvalidArgument( + "terminal is not supported".to_string(), + )); + } + + if self.has_instance(&req.id) { + return Err(Error::AlreadyExists(req.id)); + } + + let mut spec = Spec::load(Path::new(&req.bundle).join("config.json")) + .map_err(|err| Error::InvalidArgument(format!("could not load runtime spec: {err}")))?; + + spec.canonicalize_rootfs(req.bundle()).map_err(|err| { + ShimError::InvalidArgument(format!("could not canonicalize rootfs: {}", err)) + })?; + + let rootfs = spec + .root() + .as_ref() + .ok_or_else(|| Error::InvalidArgument("rootfs is not set in runtime spec".to_string()))? + .path(); + + let _ = create_dir_all(rootfs); + let rootfs_mounts = req.rootfs().to_vec(); + if !rootfs_mounts.is_empty() { + for m in rootfs_mounts { + let mount_type = m.type_().none_if(|&x| x.is_empty()); + let source = m.source.as_str().none_if(|&x| x.is_empty()); + + #[cfg(unix)] + containerd_shim::mount::mount_rootfs( + mount_type, + source, + &m.options.to_vec(), + rootfs, + )?; + } + } + + let mut cfg = self.instance_config(); + cfg.set_bundle(&req.bundle) + .set_stdin(&req.stdin) + .set_stdout(&req.stdout) + .set_stderr(&req.stderr); + + // Check if this is a cri container + let instance = if self.is_empty() && is_cri_container(&spec) { + // If it is cri, then this is the "pause" container, which we don't need to deal with. + // TODO: maybe we can just go ahead and execute the actual container with runc? + InstanceData::new_base(req.id(), cfg)? + } else { + InstanceData::new_instance(req.id(), cfg)? + }; + + self.instances + .write() + .unwrap() + .insert(req.id().to_string(), Arc::new(instance)); + + self.events.send(TaskCreate { + container_id: req.id, + bundle: req.bundle, + rootfs: req.rootfs, + io: Some(TaskIO { + stdin: req.stdin, + stdout: req.stdout, + stderr: req.stderr, + ..Default::default() + }) + .into(), + ..Default::default() + }); + + debug!("create done"); + + // Per the spec, the prestart hook must be called as part of the create operation + debug!("call prehook before the start"); + oci::setup_prestart_hooks(spec.hooks())?; + + Ok(CreateTaskResponse { + pid: std::process::id(), + ..Default::default() + }) + } + + fn task_start(&self, req: StartRequest) -> Result { + if req.exec_id().is_empty().not() { + return Err(ShimError::Unimplemented("exec is not supported".to_string()).into()); + } + + let i = self.get_instance(req.id())?; + let pid = i.start()?; + + self.events.send(TaskStart { + container_id: req.id().into(), + pid, + ..Default::default() + }); + + let events = self.events.clone(); + + let id = req.id().to_string(); + + thread::Builder::new() + .name(format!("{id}-wait")) + .spawn(move || { + let (exit_code, timestamp) = i.wait(); + events.send(TaskExit { + container_id: id.clone(), + exit_status: exit_code, + exited_at: Some(timestamp.to_timestamp()).into(), + pid, + id, + ..Default::default() + }); + }) + .context("could not spawn thread to wait exit") + .map_err(Error::from)?; + + debug!("started: {:?}", req); + + Ok(StartResponse { + pid, + ..Default::default() + }) + } + + fn task_kill(&self, req: KillRequest) -> Result { + if !req.exec_id().is_empty() { + return Err(Error::InvalidArgument("exec is not supported".to_string())); + } + self.get_instance(req.id())?.kill(req.signal())?; + Ok(Empty::new()) + } + + fn task_delete(&self, req: DeleteRequest) -> Result { + if !req.exec_id().is_empty() { + return Err(Error::InvalidArgument("exec is not supported".to_string())); + } + + let i = self.get_instance(req.id())?; + + i.delete()?; + + let pid = i.pid().unwrap_or_default(); + let (exit_code, timestamp) = i.wait_timeout(Duration::ZERO).unzip(); + let timestamp = timestamp.map(ToTimestamp::to_timestamp); + + self.instances.write().unwrap().remove(req.id()); + + self.events.send(TaskDelete { + container_id: req.id().into(), + pid, + exit_status: exit_code.unwrap_or_default(), + exited_at: timestamp.clone().into(), + ..Default::default() + }); + + Ok(DeleteResponse { + pid, + exit_status: exit_code.unwrap_or_default(), + exited_at: timestamp.into(), + ..Default::default() + }) + } + + fn task_wait(&self, req: WaitRequest) -> Result { + if !req.exec_id().is_empty() { + return Err(Error::InvalidArgument("exec is not supported".to_string())); + } + + let i = self.get_instance(req.id())?; + let (exit_code, timestamp) = i.wait(); + + Ok(WaitResponse { + exit_status: exit_code, + exited_at: Some(timestamp.to_timestamp()).into(), + ..Default::default() + }) + } + + fn task_state(&self, req: StateRequest) -> Result { + if !req.exec_id().is_empty() { + return Err(Error::InvalidArgument("exec is not supported".to_string())); + } + + let i = self.get_instance(req.id())?; + let pid = i.pid(); + let (exit_code, timestamp) = i.wait_timeout(Duration::ZERO).unzip(); + let timestamp = timestamp.map(ToTimestamp::to_timestamp); + + let status = if pid.is_none() { + Status::CREATED + } else if exit_code.is_none() { + Status::RUNNING + } else { + Status::STOPPED + }; + + Ok(StateResponse { + bundle: i.config().get_bundle().to_string_lossy().to_string(), + stdin: i.config().get_stdin().to_string_lossy().to_string(), + stdout: i.config().get_stdout().to_string_lossy().to_string(), + stderr: i.config().get_stderr().to_string_lossy().to_string(), + pid: pid.unwrap_or_default(), + exit_status: exit_code.unwrap_or_default(), + exited_at: timestamp.into(), + status: status.into(), + ..Default::default() + }) + } + + fn task_stats(&self, req: StatsRequest) -> Result { + let i = self.get_instance(req.id())?; + let pid = i + .pid() + .ok_or_else(|| Error::InvalidArgument("task is not running".to_string()))?; + + let metrics = get_metrics(pid)?; + + Ok(StatsResponse { + stats: Some(metrics).into(), + ..Default::default() + }) + } +} + +impl SandboxService for Local { + type Instance = T; + fn new( + namespace: String, + containerd_address: String, + _id: String, + engine: T::Engine, + publisher: RemotePublisher, + ) -> Self { + let events = RemoteEventSender::new(&namespace, publisher); + let exit = Arc::default(); + Local::::new(engine, events, exit, namespace, containerd_address) + } +} + +impl Task for Local { + fn create(&self, _: &TtrpcContext, req: CreateTaskRequest) -> TtrpcResult { + debug!("create: {:?}", req); + Ok(self.task_create(req)?) + } + + fn start(&self, _: &TtrpcContext, req: StartRequest) -> TtrpcResult { + debug!("start: {:?}", req); + Ok(self.task_start(req)?) + } + + fn kill(&self, _: &TtrpcContext, req: KillRequest) -> TtrpcResult { + debug!("kill: {:?}", req); + Ok(self.task_kill(req)?) + } + + fn delete(&self, _: &TtrpcContext, req: DeleteRequest) -> TtrpcResult { + debug!("delete: {:?}", req); + Ok(self.task_delete(req)?) + } + + fn wait(&self, _: &TtrpcContext, req: WaitRequest) -> TtrpcResult { + debug!("wait: {:?}", req); + Ok(self.task_wait(req)?) + } + + fn connect(&self, _: &TtrpcContext, req: ConnectRequest) -> TtrpcResult { + debug!("connect: {:?}", req); + let i = self.get_instance(req.id())?; + let shim_pid = std::process::id(); + let task_pid = i.pid().unwrap_or_default(); + Ok(ConnectResponse { + shim_pid, + task_pid, + ..Default::default() + }) + } + + fn state(&self, _: &TtrpcContext, req: StateRequest) -> TtrpcResult { + debug!("state: {:?}", req); + Ok(self.task_state(req)?) + } + + fn shutdown(&self, _: &TtrpcContext, _: ShutdownRequest) -> TtrpcResult { + debug!("shutdown"); + if self.is_empty() { + self.exit.signal(); + } + Ok(Empty::new()) + } + + fn stats(&self, _ctx: &TtrpcContext, req: StatsRequest) -> TtrpcResult { + log::info!("stats: {:?}", req); + Ok(self.task_stats(req)?) + } +} diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs b/crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs new file mode 100644 index 000000000..d8186a28d --- /dev/null +++ b/crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs @@ -0,0 +1,394 @@ +use std::fs::{create_dir, File}; +use std::sync::mpsc::{channel, Sender}; +use std::thread; +use std::time::Duration; + +use anyhow::Context; +use containerd_shim::api::Status; +use containerd_shim::event::Event; +use protobuf::MessageDyn; +use serde_json as json; +use tempfile::tempdir; + +use super::*; +use crate::sandbox::instance::Nop; +use crate::sandbox::shim::events::EventSender; +use crate::sandbox::shim::instance_option::InstanceOption; + +struct LocalWithDescrutor { + local: Arc>, +} + +impl LocalWithDescrutor { + fn new(local: Arc>) -> Self { + Self { local } + } +} + +impl EventSender for Sender<(String, Box)> { + fn send(&self, event: impl Event) { + let _ = self.send((event.topic(), Box::new(event))); + } +} + +impl Drop for LocalWithDescrutor { + fn drop(&mut self) { + self.local + .instances + .write() + .unwrap() + .iter() + .for_each(|(_, v)| { + let _ = v.kill(9); + v.delete().unwrap(); + }); + } +} + +fn with_cri_sandbox(spec: Option, id: String) -> Spec { + let mut s = spec.unwrap_or_default(); + let mut annotations = HashMap::new(); + s.annotations().as_ref().map(|a| { + a.iter().map(|(k, v)| { + annotations.insert(k.to_string(), v.to_string()); + }) + }); + annotations.insert("io.kubernetes.cri.sandbox-id".to_string(), id); + + s.set_annotations(Some(annotations)); + s +} + +fn create_bundle(dir: &std::path::Path, spec: Option) -> Result<()> { + create_dir(dir.join("rootfs"))?; + + let s = spec.unwrap_or_default(); + + json::to_writer(File::create(dir.join("config.json"))?, &s) + .context("could not write config.json")?; + Ok(()) +} + +#[test] +fn test_delete_after_create() { + let dir = tempdir().unwrap(); + let id = "test-delete-after-create"; + create_bundle(dir.path(), None).unwrap(); + + let (tx, _rx) = channel(); + let local = Arc::new(Local::::new( + (), + tx, + Arc::new(ExitSignal::default()), + "test_namespace", + "/test/address", + )); + let mut _wrapped = LocalWithDescrutor::new(local.clone()); + + local + .task_create(CreateTaskRequest { + id: id.to_string(), + bundle: dir.path().to_str().unwrap().to_string(), + ..Default::default() + }) + .unwrap(); + + local + .task_delete(DeleteRequest { + id: id.to_string(), + ..Default::default() + }) + .unwrap(); +} + +#[test] +fn test_cri_task() -> Result<()> { + // Currently the relationship between the "base" container and the "instances" are pretty weak. + // When a cri sandbox is specified we just assume it's the sandbox container and treat it as such by not actually running the code (which is going to be wasm). + let (etx, _erx) = channel(); + let exit_signal = Arc::new(ExitSignal::default()); + let local = Arc::new(Local::::new( + (), + etx, + exit_signal, + "test_namespace", + "/test/address", + )); + + let mut _wrapped = LocalWithDescrutor::new(local.clone()); + + let temp = tempdir().unwrap(); + let dir = temp.path(); + let sandbox_id = "test-cri-task".to_string(); + create_bundle(dir, Some(with_cri_sandbox(None, sandbox_id.clone())))?; + + local.task_create(CreateTaskRequest { + id: "testbase".to_string(), + bundle: dir.to_str().unwrap().to_string(), + ..Default::default() + })?; + + let state = local.task_state(StateRequest { + id: "testbase".to_string(), + ..Default::default() + })?; + assert_eq!(state.status(), Status::CREATED); + + // A little janky since this is internal data, but check that this is seen as a sandbox container + let i = local.get_instance("testbase")?; + assert!(matches!(i.instance, InstanceOption::Nop(_))); + + local.task_start(StartRequest { + id: "testbase".to_string(), + ..Default::default() + })?; + + let state = local.task_state(StateRequest { + id: "testbase".to_string(), + ..Default::default() + })?; + assert_eq!(state.status(), Status::RUNNING); + + let ll = local.clone(); + let (base_tx, base_rx) = channel(); + thread::spawn(move || { + let resp = ll.task_wait(WaitRequest { + id: "testbase".to_string(), + ..Default::default() + }); + base_tx.send(resp).unwrap(); + }); + base_rx.try_recv().unwrap_err(); + + let temp2 = tempdir().unwrap(); + let dir2 = temp2.path(); + create_bundle(dir2, Some(with_cri_sandbox(None, sandbox_id)))?; + + local.task_create(CreateTaskRequest { + id: "testinstance".to_string(), + bundle: dir2.to_str().unwrap().to_string(), + ..Default::default() + })?; + + let state = local.task_state(StateRequest { + id: "testinstance".to_string(), + ..Default::default() + })?; + assert_eq!(state.status(), Status::CREATED); + + // again, this is janky since it is internal data, but check that this is seen as a "real" container. + // this is the inverse of the above test case. + let i = local.get_instance("testinstance")?; + assert!(matches!(i.instance, InstanceOption::Instance(_))); + + local.task_start(StartRequest { + id: "testinstance".to_string(), + ..Default::default() + })?; + + let state = local.task_state(StateRequest { + id: "testinstance".to_string(), + ..Default::default() + })?; + assert_eq!(state.status(), Status::RUNNING); + + let stats = local.task_stats(StatsRequest { + id: "testinstance".to_string(), + ..Default::default() + })?; + assert!(stats.has_stats()); + + let ll = local.clone(); + let (instance_tx, instance_rx) = channel(); + std::thread::spawn(move || { + let resp = ll.task_wait(WaitRequest { + id: "testinstance".to_string(), + ..Default::default() + }); + instance_tx.send(resp).unwrap(); + }); + instance_rx.try_recv().unwrap_err(); + + local.task_kill(KillRequest { + id: "testinstance".to_string(), + signal: 9, + ..Default::default() + })?; + + instance_rx.recv_timeout(Duration::from_secs(5)).unwrap()?; + + let state = local.task_state(StateRequest { + id: "testinstance".to_string(), + ..Default::default() + })?; + assert_eq!(state.status(), Status::STOPPED); + local.task_delete(DeleteRequest { + id: "testinstance".to_string(), + ..Default::default() + })?; + + match local + .task_state(StateRequest { + id: "testinstance".to_string(), + ..Default::default() + }) + .unwrap_err() + { + Error::NotFound(_) => {} + e => return Err(e), + } + + base_rx.try_recv().unwrap_err(); + let state = local.task_state(StateRequest { + id: "testbase".to_string(), + ..Default::default() + })?; + assert_eq!(state.status(), Status::RUNNING); + + local.task_kill(KillRequest { + id: "testbase".to_string(), + signal: 9, + ..Default::default() + })?; + + base_rx.recv_timeout(Duration::from_secs(5)).unwrap()?; + let state = local.task_state(StateRequest { + id: "testbase".to_string(), + ..Default::default() + })?; + assert_eq!(state.status(), Status::STOPPED); + + local.task_delete(DeleteRequest { + id: "testbase".to_string(), + ..Default::default() + })?; + match local + .task_state(StateRequest { + id: "testbase".to_string(), + ..Default::default() + }) + .unwrap_err() + { + Error::NotFound(_) => {} + e => return Err(e), + } + + Ok(()) +} + +#[test] +fn test_task_lifecycle() -> Result<()> { + let (etx, _erx) = channel(); // TODO: check events + let exit_signal = Arc::new(ExitSignal::default()); + let local = Arc::new(Local::::new( + (), + etx, + exit_signal, + "test_namespace", + "/test/address", + )); + + let mut _wrapped = LocalWithDescrutor::new(local.clone()); + + let temp = tempdir().unwrap(); + let dir = temp.path(); + create_bundle(dir, None)?; + + match local + .task_state(StateRequest { + id: "test".to_string(), + ..Default::default() + }) + .unwrap_err() + { + Error::NotFound(_) => {} + e => return Err(e), + } + + local.task_create(CreateTaskRequest { + id: "test".to_string(), + bundle: dir.to_str().unwrap().to_string(), + ..Default::default() + })?; + + match local + .task_create(CreateTaskRequest { + id: "test".to_string(), + bundle: dir.to_str().unwrap().to_string(), + ..Default::default() + }) + .unwrap_err() + { + Error::AlreadyExists(_) => {} + e => return Err(e), + } + + let state = local.task_state(StateRequest { + id: "test".to_string(), + ..Default::default() + })?; + + assert_eq!(state.status(), Status::CREATED); + + local.task_start(StartRequest { + id: "test".to_string(), + ..Default::default() + })?; + + let state = local.task_state(StateRequest { + id: "test".to_string(), + ..Default::default() + })?; + + assert_eq!(state.status(), Status::RUNNING); + + let (tx, rx) = channel(); + let ll = local.clone(); + thread::spawn(move || { + let resp = ll.task_wait(WaitRequest { + id: "test".to_string(), + ..Default::default() + }); + tx.send(resp).unwrap(); + }); + + rx.try_recv().unwrap_err(); + + let res = local.task_stats(StatsRequest { + id: "test".to_string(), + ..Default::default() + })?; + assert!(res.has_stats()); + + local.task_kill(KillRequest { + id: "test".to_string(), + signal: 9, + ..Default::default() + })?; + + rx.recv_timeout(Duration::from_secs(5)).unwrap()?; + + let state = local.task_state(StateRequest { + id: "test".to_string(), + ..Default::default() + })?; + assert_eq!(state.status(), Status::STOPPED); + + local.task_delete(DeleteRequest { + id: "test".to_string(), + ..Default::default() + })?; + + match local + .task_state(StateRequest { + id: "test".to_string(), + ..Default::default() + }) + .unwrap_err() + { + Error::NotFound(_) => {} + e => return Err(e), + } + + Ok(()) +} diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/mod.rs b/crates/containerd-shim-wasm/src/sandbox/shim/mod.rs new file mode 100644 index 000000000..225dd8f81 --- /dev/null +++ b/crates/containerd-shim-wasm/src/sandbox/shim/mod.rs @@ -0,0 +1,13 @@ +//! The shim is the entrypoint for the containerd shim API. It is responsible +//! for commmuincating with the containerd daemon and managing the lifecycle of +//! the container/sandbox. + +mod cli; +mod events; +mod instance_data; +mod instance_option; +mod local; +mod task_state; + +pub use cli::Cli; +pub(crate) use local::Local; diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/task_state.rs b/crates/containerd-shim-wasm/src/sandbox/shim/task_state.rs new file mode 100644 index 000000000..e6cc08158 --- /dev/null +++ b/crates/containerd-shim-wasm/src/sandbox/shim/task_state.rs @@ -0,0 +1,61 @@ +use crate::sandbox::Error::FailedPrecondition; +use crate::sandbox::Result; + +#[derive(Debug, Clone, Copy)] +pub(super) enum TaskState { + Created, + Starting, + Started, + Exited, + Deleting, +} + +impl TaskState { + pub fn start(&mut self) -> Result<()> { + *self = match self { + Self::Created => Ok(Self::Starting), + _ => state_transition_error(*self, Self::Starting), + }?; + Ok(()) + } + + pub fn kill(&mut self) -> Result<()> { + *self = match self { + Self::Started => Ok(Self::Started), + _ => state_transition_error(*self, "Killing"), + }?; + Ok(()) + } + + pub fn delete(&mut self) -> Result<()> { + *self = match self { + Self::Created | Self::Exited => Ok(Self::Deleting), + _ => state_transition_error(*self, Self::Deleting), + }?; + Ok(()) + } + + pub fn started(&mut self) -> Result<()> { + *self = match self { + Self::Starting => Ok(Self::Started), + _ => state_transition_error(*self, Self::Started), + }?; + Ok(()) + } + + pub fn stop(&mut self) -> Result<()> { + *self = match self { + Self::Started | Self::Starting => Ok(Self::Exited), + // This is for potential failure cases where we want delete to be able to be retried. + Self::Deleting => Ok(Self::Exited), + _ => state_transition_error(*self, Self::Exited), + }?; + Ok(()) + } +} + +fn state_transition_error(from: impl std::fmt::Debug, to: impl std::fmt::Debug) -> Result { + Err(FailedPrecondition(format!( + "invalid state transition: {from:?} => {to:?}" + ))) +} diff --git a/crates/containerd-shim-wasm/src/sandbox/stdio.rs b/crates/containerd-shim-wasm/src/sandbox/stdio.rs index e58f33a08..85c3411ef 100644 --- a/crates/containerd-shim-wasm/src/sandbox/stdio.rs +++ b/crates/containerd-shim-wasm/src/sandbox/stdio.rs @@ -33,9 +33,9 @@ impl Stdio { pub fn init_from_cfg(cfg: &InstanceConfig) -> Result { Ok(Self { - stdin: cfg.get_stdin().try_into()?, - stdout: cfg.get_stdout().try_into()?, - stderr: cfg.get_stderr().try_into()?, + stdin: StdioStream::try_from_path(cfg.get_stdin())?, + stdout: StdioStream::try_from_path(cfg.get_stdout())?, + stderr: StdioStream::try_from_path(cfg.get_stderr())?, }) } @@ -90,20 +90,18 @@ impl StdioStream { } } -impl, const FD: StdioRawFd> TryFrom> for StdioStream { - type Error = Error; - fn try_from(path: Option

) -> Result { - let fd = path - .and_then(|path| match path.as_ref() { - path if path.as_os_str().is_empty() => None, - path => Some(StdioOwnedFd::try_from_path(path)), - }) - .transpose() - .or_else(|err| match err.kind() { - NotFound => Ok(None), - _ => Err(err), - })? - .unwrap_or_default(); +impl StdioStream { + fn try_from_path(path: impl AsRef) -> Result { + let path = path.as_ref(); + if path.as_os_str().is_empty() { + return Ok(Self(Arc::default())); + } + + let fd = match StdioOwnedFd::try_from_path(path) { + Err(err) if err.kind() == NotFound => Default::default(), + Err(err) => return Err(err), + Ok(fd) => fd, + }; Ok(Self(Arc::new(fd))) } @@ -126,16 +124,12 @@ mod test { /// Any other error is a real error. #[test] fn test_maybe_open_stdio() -> anyhow::Result<()> { - // None - let s = Stdout::try_from(None::<&Path>)?; - assert!(s.0.take().as_raw_fd().is_none()); - // empty path - let s = Stdout::try_from(Some(""))?; + let s = Stdout::try_from_path("")?; assert!(s.0.take().as_raw_fd().is_none()); // nonexistent path - let s = Stdout::try_from(Some("/some/nonexistent/path"))?; + let s = Stdout::try_from_path("/some/nonexistent/path")?; assert!(s.0.take().as_raw_fd().is_none()); // valid path @@ -145,7 +139,7 @@ mod test { drop(temp); // a valid path should not fail - let s = Stdout::try_from(Some(&path))?; + let s = Stdout::try_from_path(path)?; assert!(s.0.take().as_raw_fd().is_some()); Ok(()) } diff --git a/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs b/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs index 247bb39b5..55c6b111c 100644 --- a/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs +++ b/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs @@ -37,7 +37,7 @@ impl SandboxInstance for Instance { fn new(id: String, cfg: Option<&InstanceConfig>) -> Result { let cfg = cfg.context("missing configuration")?; let engine = cfg.get_engine(); - let bundle = cfg.get_bundle().context("missing bundle")?; + let bundle = cfg.get_bundle().to_path_buf(); let namespace = cfg.get_namespace(); let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::name()); let rootdir = determine_rootdir(&bundle, &namespace, rootdir)?; diff --git a/crates/containerd-shim-wasm/src/testing.rs b/crates/containerd-shim-wasm/src/testing.rs index 4ca1df28b..348668b26 100644 --- a/crates/containerd-shim-wasm/src/testing.rs +++ b/crates/containerd-shim-wasm/src/testing.rs @@ -122,13 +122,13 @@ where let mut cfg = InstanceConfig::new( WasiInstance::Engine::default(), - "test_namespace".into(), - "/run/containerd/containerd.sock".into(), + "test_namespace", + "/run/containerd/containerd.sock", ); - cfg.set_bundle(dir.to_string_lossy().to_string()) - .set_stdout(dir.join("stdout").to_string_lossy().to_string()) - .set_stderr(dir.join("stderr").to_string_lossy().to_string()) - .set_stdin(dir.join("stdin").to_string_lossy().to_string()); + cfg.set_bundle(dir) + .set_stdout(dir.join("stdout")) + .set_stderr(dir.join("stderr")) + .set_stdin(dir.join("stdin")); let instance = WasiInstance::new("test".to_string(), Some(&cfg))?; Ok(WasiTest { instance, tempdir })