Skip to content

Commit

Permalink
feat(wasi-observe): A WASI Observe host component
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Schoepp <[email protected]>
  • Loading branch information
calebschoepp committed Sep 17, 2024
1 parent caacf55 commit 810d05f
Show file tree
Hide file tree
Showing 54 changed files with 2,004 additions and 103 deletions.
55 changes: 53 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ terminal = { path = "crates/terminal" }

subprocess = "0.2.9"
tempfile = "3.8.0"
tokio = { version = "1.23", features = ["full"] }
tokio = { version = "1.40", features = ["full"] }
toml = "0.8"
tracing = { workspace = true }
url = "2.2.2"
Expand All @@ -84,6 +84,7 @@ openssl = { version = "0.10" }
anyhow = { workspace = true, features = ["backtrace"] }
conformance = { path = "tests/conformance-tests" }
conformance-tests = { workspace = true }
fake-opentelemetry-collector = { git = "https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk", rev = "db4331666b8a2a940d45c8942e2b5b9fed8a87be" }
hex = "0.4.3"
http-body-util = { workspace = true }
hyper = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/factor-key-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ anyhow = "1.0"
lru = "0.9.0"
serde = { version = "1.0", features = ["rc"] }
spin-core = { path = "../core" }
spin-factor-observe = { path = "../factor-observe" }
spin-factors = { path = "../factors" }
spin-locked-app = { path = "../locked-app" }
spin-world = { path = "../world" }
Expand Down
29 changes: 28 additions & 1 deletion crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::util::EmptyStoreManager;
use anyhow::{Context, Result};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_factor_observe::ObserveContext;
use spin_world::v2::key_value;
use std::{collections::HashSet, sync::Arc};
use table::Table;
Expand Down Expand Up @@ -37,6 +38,7 @@ pub struct KeyValueDispatch {
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
stores: Table<Arc<dyn Store>>,
observe_context: Option<ObserveContext>,
}

impl KeyValueDispatch {
Expand All @@ -49,12 +51,19 @@ impl KeyValueDispatch {
allowed_stores: HashSet::new(),
manager: Arc::new(EmptyStoreManager),
stores: Table::new(capacity),
observe_context: None,
}
}

pub fn init(&mut self, allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) {
pub fn init(
&mut self,
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
observe_context: Option<ObserveContext>,
) {
self.allowed_stores = allowed_stores;
self.manager = manager;
self.observe_context = observe_context;
}

pub fn get_store(&self, store: Resource<key_value::Store>) -> anyhow::Result<&Arc<dyn Store>> {
Expand All @@ -79,6 +88,9 @@ impl key_value::Host for KeyValueDispatch {}
impl key_value::HostStore for KeyValueDispatch {
#[instrument(name = "spin_key_value.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
Ok(async {
if self.allowed_stores.contains(&name) {
let store = self
Expand All @@ -99,6 +111,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<Option<Vec<u8>>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.get(&key).await)
}
Expand All @@ -110,6 +125,9 @@ impl key_value::HostStore for KeyValueDispatch {
key: String,
value: Vec<u8>,
) -> Result<Result<(), Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.set(&key, &value).await)
}
Expand All @@ -120,6 +138,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<(), Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.delete(&key).await)
}
Expand All @@ -130,6 +151,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<bool, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.exists(&key).await)
}
Expand All @@ -139,6 +163,9 @@ impl key_value::HostStore for KeyValueDispatch {
&mut self,
store: Resource<key_value::Store>,
) -> Result<Result<Vec<String>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.get_keys().await)
}
Expand Down
9 changes: 7 additions & 2 deletions crates/factor-key-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use anyhow::ensure;
use spin_factor_observe::ObserveContext;
use spin_factors::{
ConfigureAppContext, Factor, FactorInstanceBuilder, InitContext, PrepareContext, RuntimeFactors,
};
Expand Down Expand Up @@ -90,17 +91,19 @@ impl Factor for KeyValueFactor {

fn prepare<T: RuntimeFactors>(
&self,
ctx: PrepareContext<T, Self>,
mut ctx: PrepareContext<T, Self>,
) -> anyhow::Result<InstanceBuilder> {
let app_state = ctx.app_state();
let allowed_stores = app_state
.component_allowed_stores
.get(ctx.app_component().id())
.expect("component should be in component_stores")
.clone();
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;
Ok(InstanceBuilder {
store_manager: app_state.store_manager.clone(),
allowed_stores,
observe_context,
})
}
}
Expand Down Expand Up @@ -149,6 +152,7 @@ pub struct InstanceBuilder {
store_manager: Arc<AppStoreManager>,
/// The allowed stores for this component instance.
allowed_stores: HashSet<String>,
observe_context: ObserveContext,
}

impl FactorInstanceBuilder for InstanceBuilder {
Expand All @@ -158,9 +162,10 @@ impl FactorInstanceBuilder for InstanceBuilder {
let Self {
store_manager,
allowed_stores,
observe_context,
} = self;
let mut dispatch = KeyValueDispatch::new_with_capacity(u32::MAX);
dispatch.init(allowed_stores, store_manager);
dispatch.init(allowed_stores, store_manager, Some(observe_context));
Ok(dispatch)
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/factor-llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ llm-cublas = ["llm", "spin-llm-local/cublas"]
anyhow = "1.0"
async-trait = "0.1"
serde = "1.0"
spin-factor-observe = { path = "../factor-observe" }
spin-factors = { path = "../factors" }
spin-llm-local = { path = "../llm-local", optional = true }
spin-llm-remote-http = { path = "../llm-remote-http" }
Expand Down
4 changes: 4 additions & 0 deletions crates/factor-llm/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ impl v2::Host for InstanceState {
prompt: String,
params: Option<v2::InferencingParams>,
) -> Result<v2::InferencingResult, v2::Error> {
self.observe_context.reparent_tracing_span();

if !self.allowed_models.contains(&model) {
return Err(access_denied_error(&model));
}
Expand Down Expand Up @@ -42,6 +44,8 @@ impl v2::Host for InstanceState {
model: v1::EmbeddingModel,
data: Vec<String>,
) -> Result<v2::EmbeddingsResult, v2::Error> {
self.observe_context.reparent_tracing_span();

if !self.allowed_models.contains(&model) {
return Err(access_denied_error(&model));
}
Expand Down
Loading

0 comments on commit 810d05f

Please sign in to comment.