From 1458dc306092d900d4b7b4003e105c9babd6f0c5 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Wed, 17 Jul 2024 22:30:05 +0800 Subject: [PATCH] Invalidation research --- Cargo.toml | 2 +- examples/axum/Cargo.toml | 8 +- examples/axum/src/api.rs | 6 +- examples/axum/src/api/chat.rs | 2 +- examples/axum/src/api/invalidation.rs | 113 ++++++++++++++++++++++++++ examples/axum/src/api/library.rs | 1 + examples/axum/src/api/streaming.rs | 3 + examples/axum/src/main.rs | 3 + integrations/tauri/Cargo.toml | 14 ++++ integrations/tauri/src/lib.rs | 8 ++ middleware/invalidation/Cargo.toml | 1 + middleware/invalidation/README.md | 82 +++++++++++++++++++ middleware/invalidation/src/lib.rs | 20 +---- middleware/playground/Cargo.toml | 15 ++++ middleware/playground/README.md | 3 + middleware/playground/src/lib.rs | 8 ++ 16 files changed, 263 insertions(+), 26 deletions(-) create mode 100644 examples/axum/src/api/invalidation.rs create mode 100644 examples/axum/src/api/library.rs create mode 100644 examples/axum/src/api/streaming.rs create mode 100644 integrations/tauri/Cargo.toml create mode 100644 integrations/tauri/src/lib.rs create mode 100644 middleware/invalidation/README.md create mode 100644 middleware/playground/Cargo.toml create mode 100644 middleware/playground/README.md create mode 100644 middleware/playground/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index e79b861f..9b14b6ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,4 +18,4 @@ panic_in_result_fn = { level = "warn", priority = -1 } # TODO: Remove this [patch.crates-io] specta = { git = "https://github.com/oscartbeaumont/specta", rev = "9c41ff0e95a357fd00893f5e2f9d642eac3438ef" } -specta-typescript = { git = "https://github.com/oscartbeaumont/specta", rev = "9c41ff0e95a357fd00893f5e2f9d642eac3438ef" } \ No newline at end of file +specta-typescript = { git = "https://github.com/oscartbeaumont/specta", rev = "9c41ff0e95a357fd00893f5e2f9d642eac3438ef" } diff --git a/examples/axum/Cargo.toml b/examples/axum/Cargo.toml index 71a3d58d..40a2aa90 100644 --- a/examples/axum/Cargo.toml +++ b/examples/axum/Cargo.toml @@ -6,15 +6,15 @@ publish = false [dependencies] axum = "0.7.5" -rspc = { version = "0.3.0", path = "../../rspc" } -rspc-axum = { version = "0.2.0", path = "../../integrations/axum", features = ["ws"] } +rspc = { path = "../../rspc" } +rspc-axum = { path = "../../integrations/axum", features = ["ws"] } tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] } thiserror = "1.0.62" async-stream = "0.3.5" tracing = "0.1.40" tracing-subscriber = "0.3.18" -rspc-tracing = { version = "0.0.0", path = "../../middleware/tracing" } -rspc-openapi = { version = "0.0.0", path = "../../middleware/openapi" } +rspc-tracing = { path = "../../middleware/tracing" } +rspc-openapi = { path = "../../middleware/openapi" } serde = { version = "1", features = ["derive"] } specta = { version = "=2.0.0-rc.15", features = ["derive"] } # TODO: Drop requirement on `derive` specta-util = "0.0.2" # TODO: We need this for `TypeCollection` which is cringe diff --git a/examples/axum/src/api.rs b/examples/axum/src/api.rs index 9d84bb26..6d5377f1 100644 --- a/examples/axum/src/api.rs +++ b/examples/axum/src/api.rs @@ -1,4 +1,4 @@ -use std::{error, marker::PhantomData, path::PathBuf}; +use std::{error, marker::PhantomData, path::PathBuf, sync::Arc}; use rspc::{ procedure::{Procedure, ProcedureBuilder, ResolverInput, ResolverOutput}, @@ -9,6 +9,7 @@ use specta_util::TypeCollection; use thiserror::Error; pub(crate) mod chat; +pub(crate) mod invalidation; pub(crate) mod store; #[derive(Debug, Error)] @@ -17,7 +18,10 @@ pub enum Error {} // `Clone` is only required for usage with Websockets #[derive(Clone)] pub struct Context { + // For this example we nest context's for each example. + // In the real-world you don't need to do this, we do this so the examples are more self-contained. pub chat: chat::Ctx, + pub invalidation: Arc, } pub type Router = rspc::Router; diff --git a/examples/axum/src/api/chat.rs b/examples/axum/src/api/chat.rs index 8b0b3cc8..2abc7548 100644 --- a/examples/axum/src/api/chat.rs +++ b/examples/axum/src/api/chat.rs @@ -80,7 +80,7 @@ pub fn mount() -> Router { Ok(stream! { let mut chat = ctx.chat.chat.subscribe(); while let Ok(msg) = chat.recv().await { - yield Ok(msg); // TODO: error handling + yield Ok(msg); } }) }) diff --git a/examples/axum/src/api/invalidation.rs b/examples/axum/src/api/invalidation.rs new file mode 100644 index 00000000..1b7eac93 --- /dev/null +++ b/examples/axum/src/api/invalidation.rs @@ -0,0 +1,113 @@ +//! TODO: Document how this example works. +//! +//! TODO: Expand this example to one which pushes data directly to the frontend. + +use std::{ + collections::HashMap, + future::Future, + sync::{Arc, Mutex, PoisonError}, +}; + +use async_stream::stream; +use rspc::middleware::Middleware; +use serde::{Deserialize, Serialize}; +use specta::Type; +use tokio::sync::broadcast; + +use super::{BaseProcedure, Context, Router}; + +#[derive(Clone)] +pub struct Ctx { + keys: Arc>>, + tx: broadcast::Sender, +} + +impl Ctx { + pub fn new() -> Arc { + Arc::new(Self { + keys: Default::default(), + tx: broadcast::channel(100).0, + }) + } +} + +#[derive(Debug, Clone, Serialize, Type)] +pub enum InvalidateEvent { + InvalidateKey(String), +} + +#[derive(Deserialize, Type)] +struct SetKeyInput { + key: String, + value: String, +} + +pub fn mount() -> Router { + Router::new() + .procedure("get", { + ::builder() + // TODO: Why does `TCtx` need a hardcoded type??? + .with(invalidation(|ctx: Context, key, _result| async move { + ctx.invalidation + .tx + .send(InvalidateEvent::InvalidateKey(key)) + .unwrap(); + })) + .mutation(|ctx, key: String| async move { + let value = ctx + .invalidation + .keys + .lock() + .unwrap_or_else(PoisonError::into_inner) + .get(&key) + .cloned(); + + Ok(value) + }) + }) + .procedure("set", { + ::builder().mutation(|ctx, input: SetKeyInput| async move { + ctx.invalidation + .keys + .lock() + .unwrap_or_else(PoisonError::into_inner) + .insert(input.key, input.value); + + Ok(()) + }) + }) + .procedure("invalidation", { + // The frontend will subscribe to this for when to invalidate queries. + ::builder().subscription(|ctx, _: ()| async move { + Ok(stream! { + let mut tx = ctx.invalidation.tx.subscribe(); + while let Ok(msg) = tx.recv().await { + yield Ok(msg); + } + }) + }) + }) +} + +fn invalidation( + handler: impl Fn(TCtx, TInput, &Result) -> F + Send + Sync + 'static, +) -> Middleware +where + TError: Send + 'static, + TCtx: Clone + Send + 'static, + TInput: Clone + Send + 'static, + TResult: Send + 'static, + F: Future + Send + 'static, +{ + let handler = Arc::new(handler); + Middleware::new(move |ctx: TCtx, input: TInput, next| { + let handler = handler.clone(); + async move { + let ctx2 = ctx.clone(); + let input2 = input.clone(); + let result = next.exec(ctx, input).await; + handler(ctx2, input2, &result).await; + result + } + }) +} diff --git a/examples/axum/src/api/library.rs b/examples/axum/src/api/library.rs new file mode 100644 index 00000000..2ad75493 --- /dev/null +++ b/examples/axum/src/api/library.rs @@ -0,0 +1 @@ +// TODO: Library middleware diff --git a/examples/axum/src/api/streaming.rs b/examples/axum/src/api/streaming.rs new file mode 100644 index 00000000..b7b8fb84 --- /dev/null +++ b/examples/axum/src/api/streaming.rs @@ -0,0 +1,3 @@ +// TODO: Streaming DB data +// TODO: File upload +// TODO: File download diff --git a/examples/axum/src/main.rs b/examples/axum/src/main.rs index 85c5ac6c..6ca69d66 100644 --- a/examples/axum/src/main.rs +++ b/examples/axum/src/main.rs @@ -1,5 +1,6 @@ use std::net::Ipv6Addr; +use api::invalidation; use axum::{routing::get, Router}; use tokio::sync::broadcast; use tracing::info; @@ -13,8 +14,10 @@ async fn main() { let router = api::mount().build().unwrap(); let chat_tx = broadcast::channel(100).0; + let invalidation = invalidation::Ctx::new(); let ctx_fn = move || api::Context { chat: api::chat::Ctx::new(chat_tx.clone()), + invalidation: invalidation.clone(), }; let app = Router::new() diff --git a/integrations/tauri/Cargo.toml b/integrations/tauri/Cargo.toml new file mode 100644 index 00000000..18a9ed70 --- /dev/null +++ b/integrations/tauri/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "rspc-tauri" +version = "0.0.1" +edition = "2021" + +[dependencies] + +# /bin/sh RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features +[package.metadata."docs.rs"] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true diff --git a/integrations/tauri/src/lib.rs b/integrations/tauri/src/lib.rs new file mode 100644 index 00000000..e0d4af98 --- /dev/null +++ b/integrations/tauri/src/lib.rs @@ -0,0 +1,8 @@ +//! rspc-tauri: Support for exposing rspc via Tauri IPC +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc( + html_logo_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png", + html_favicon_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png" +)] + +// TODO diff --git a/middleware/invalidation/Cargo.toml b/middleware/invalidation/Cargo.toml index 4e470ac2..9ffdf2a4 100644 --- a/middleware/invalidation/Cargo.toml +++ b/middleware/invalidation/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" publish = false # TODO: Crate metadata & publish [dependencies] +async-stream = "0.3.5" rspc = { path = "../../rspc" } # /bin/sh RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features diff --git a/middleware/invalidation/README.md b/middleware/invalidation/README.md new file mode 100644 index 00000000..bc88cf39 --- /dev/null +++ b/middleware/invalidation/README.md @@ -0,0 +1,82 @@ +# rspc Invalidation + +For now this is not going to be released as we need to work out if their is any value to an official middleware, instead of an example project implementing the same thing user-space? + +## Questions + +For my own future reference: https://discord.com/channels/@me/813276814801764382/1263123489477361828 + +### Pull vs Push based invalidation events + +Pull based is where the middleware is applied to the query. +Push based is where the middleware is applied to the mutation. + +I think we want a pull-based so resources can define their dependencies a-la React dependencies array. + +### Stream or not? + +I'm leaning stream-based because it pushes the type safety concern onto the end user. + +```rust +::builder() + // "Pull"-based. Applied to queries. (I personally a "Pull"-based approach is better) + .with(rspc_invalidation::invalidation( + |input, result, operation| operation.key() == "store.set", + )) + .with(rspc_invalidation::invalidation( + // TODO: how is `input().id` even gonna work lol + |input, result, operation| { + operation.key() == "notes.update" && operation.input().id == input.id + }, + )) + // "Push"-based. Applied to mutations. + .with(rspc_invalidation::invalidation( + |input, result, invalidate| invalidate("store.get", ()), + )) + .with(rspc_invalidation::invalidation( + |input, result, operation| invalidate("notes.get", input.id), + )) + // "Pull"-based but with stream. + .with(rspc_invalidation::invalidation(|input: TArgs| { + stream! { + // If practice subscribe to some central event bus for changes + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + yield Invalidate; // pub struct Invalidate; + } + } + })) + .query(...) +``` + +### Exposing result of procedure to invalidation closure + +If we expose result to the invalidate callback either the `Stream` or the value must be `Clone` which is not great, although the constrain can be applied locally by the middleware. + +If we expose the result and use a stream-based approach do we spawn a new invalidation closure for every result? I think this is something we will wanna leave the user in control of but no idea what that API would look like. + +### How do we get `BuiltRouter` into `Procedure`? + +It kinda has to come in via context or we need some magic system within rspc's core. Otherwise we basically have a recursive dependency. + +### Frontend? + +Will we expose a package or will it be on the user to hook it up? + +## Other concerns + +## User activity + +Really we wanna only push invalidation events that are related to parts of the app the user currently has active. An official system would need to take this into account somehow. Maybe some integration with the frontend router and websocket state using the `TCtx`??? + +## Data or invalidation + +If we can be pretty certain the frontend wants the new data we can safely push it straight to the frontend instead of just asking the frontend to refetch. This will be much faster but if your not tracking user-activity it will be way slower because of the potential volume of data. + +Tracking user activity pretty much requires some level of router integration which might be nice to have an abstraction for but it's also hard. + +## Authorization + +**This is why rspc can't own the subscription!!!** + +We should also have a way to take into account authorization and what invalidation events the user is able to see. For something like Spacedrive we never had this problem because we are a desktop app but any web app would require this. \ No newline at end of file diff --git a/middleware/invalidation/src/lib.rs b/middleware/invalidation/src/lib.rs index 9f4d3688..146c4019 100644 --- a/middleware/invalidation/src/lib.rs +++ b/middleware/invalidation/src/lib.rs @@ -5,22 +5,4 @@ html_favicon_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png" )] -use rspc::middleware::Middleware; - -/// TODO -pub fn invalidation( - handler: impl Fn(TThisInput) -> bool, -) -> Middleware -where - TError: 'static, - TThisCtx: Send + 'static, - TThisInput: Send + 'static, - TThisResult: Send + 'static, -{ - Middleware::new(|ctx, input, next| async move { next.exec(ctx, input).await }) -} - -// TODO: Attach the subscription for updates -// TODO: How do we invoke the procedures from the `invalidation.subscribe` procedure? - -// TODO: What will the frontend component look like??? +// TODO: Refer to `../README.md` to see status diff --git a/middleware/playground/Cargo.toml b/middleware/playground/Cargo.toml new file mode 100644 index 00000000..d19277c4 --- /dev/null +++ b/middleware/playground/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "rspc-playground" +version = "0.0.0" +edition = "2021" +publish = false + +[dependencies] + +# /bin/sh RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features +[package.metadata."docs.rs"] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true diff --git a/middleware/playground/README.md b/middleware/playground/README.md new file mode 100644 index 00000000..5bdc3658 --- /dev/null +++ b/middleware/playground/README.md @@ -0,0 +1,3 @@ +# rspc Playground + +Coming soon... \ No newline at end of file diff --git a/middleware/playground/src/lib.rs b/middleware/playground/src/lib.rs new file mode 100644 index 00000000..4f90e3f6 --- /dev/null +++ b/middleware/playground/src/lib.rs @@ -0,0 +1,8 @@ +//! rspc-playground: Playground for testing rspc procedures +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc( + html_logo_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png", + html_favicon_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png" +)] + +// TODO: A HTML playground for testing rspc procedures kinda like SwaggerUI.