Skip to content

Commit

Permalink
Invalidation research
Browse files Browse the repository at this point in the history
  • Loading branch information
oscartbeaumont committed Jul 17, 2024
1 parent 834084e commit 1458dc3
Show file tree
Hide file tree
Showing 16 changed files with 263 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ panic_in_result_fn = { level = "warn", priority = -1 }
# TODO: Remove this
[patch.crates-io]
specta = { git = "https://github.com/oscartbeaumont/specta", rev = "9c41ff0e95a357fd00893f5e2f9d642eac3438ef" }
specta-typescript = { git = "https://github.com/oscartbeaumont/specta", rev = "9c41ff0e95a357fd00893f5e2f9d642eac3438ef" }
specta-typescript = { git = "https://github.com/oscartbeaumont/specta", rev = "9c41ff0e95a357fd00893f5e2f9d642eac3438ef" }
8 changes: 4 additions & 4 deletions examples/axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ publish = false

[dependencies]
axum = "0.7.5"
rspc = { version = "0.3.0", path = "../../rspc" }
rspc-axum = { version = "0.2.0", path = "../../integrations/axum", features = ["ws"] }
rspc = { path = "../../rspc" }
rspc-axum = { path = "../../integrations/axum", features = ["ws"] }
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] }
thiserror = "1.0.62"
async-stream = "0.3.5"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
rspc-tracing = { version = "0.0.0", path = "../../middleware/tracing" }
rspc-openapi = { version = "0.0.0", path = "../../middleware/openapi" }
rspc-tracing = { path = "../../middleware/tracing" }
rspc-openapi = { path = "../../middleware/openapi" }
serde = { version = "1", features = ["derive"] }
specta = { version = "=2.0.0-rc.15", features = ["derive"] } # TODO: Drop requirement on `derive`
specta-util = "0.0.2" # TODO: We need this for `TypeCollection` which is cringe
Expand Down
6 changes: 5 additions & 1 deletion examples/axum/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{error, marker::PhantomData, path::PathBuf};
use std::{error, marker::PhantomData, path::PathBuf, sync::Arc};

use rspc::{
procedure::{Procedure, ProcedureBuilder, ResolverInput, ResolverOutput},
Expand All @@ -9,6 +9,7 @@ use specta_util::TypeCollection;
use thiserror::Error;

pub(crate) mod chat;
pub(crate) mod invalidation;
pub(crate) mod store;

#[derive(Debug, Error)]
Expand All @@ -17,7 +18,10 @@ pub enum Error {}
// `Clone` is only required for usage with Websockets
#[derive(Clone)]
pub struct Context {
// For this example we nest context's for each example.
// In the real-world you don't need to do this, we do this so the examples are more self-contained.
pub chat: chat::Ctx,
pub invalidation: Arc<invalidation::Ctx>,
}

pub type Router = rspc::Router<Context>;
Expand Down
2 changes: 1 addition & 1 deletion examples/axum/src/api/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn mount() -> Router {
Ok(stream! {
let mut chat = ctx.chat.chat.subscribe();
while let Ok(msg) = chat.recv().await {
yield Ok(msg); // TODO: error handling
yield Ok(msg);
}
})
})
Expand Down
113 changes: 113 additions & 0 deletions examples/axum/src/api/invalidation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! TODO: Document how this example works.
//!
//! TODO: Expand this example to one which pushes data directly to the frontend.

use std::{
collections::HashMap,
future::Future,
sync::{Arc, Mutex, PoisonError},
};

use async_stream::stream;
use rspc::middleware::Middleware;
use serde::{Deserialize, Serialize};
use specta::Type;
use tokio::sync::broadcast;

use super::{BaseProcedure, Context, Router};

#[derive(Clone)]
pub struct Ctx {
keys: Arc<Mutex<HashMap<String, String>>>,
tx: broadcast::Sender<InvalidateEvent>,
}

impl Ctx {
pub fn new() -> Arc<Self> {
Arc::new(Self {
keys: Default::default(),
tx: broadcast::channel(100).0,
})
}
}

#[derive(Debug, Clone, Serialize, Type)]
pub enum InvalidateEvent {
InvalidateKey(String),
}

#[derive(Deserialize, Type)]
struct SetKeyInput {
key: String,
value: String,
}

pub fn mount() -> Router {
Router::new()
.procedure("get", {
<BaseProcedure>::builder()
// TODO: Why does `TCtx` need a hardcoded type???
.with(invalidation(|ctx: Context, key, _result| async move {
ctx.invalidation
.tx
.send(InvalidateEvent::InvalidateKey(key))
.unwrap();
}))
.mutation(|ctx, key: String| async move {
let value = ctx
.invalidation
.keys
.lock()
.unwrap_or_else(PoisonError::into_inner)
.get(&key)
.cloned();

Ok(value)
})
})
.procedure("set", {
<BaseProcedure>::builder().mutation(|ctx, input: SetKeyInput| async move {
ctx.invalidation
.keys
.lock()
.unwrap_or_else(PoisonError::into_inner)
.insert(input.key, input.value);

Ok(())
})
})
.procedure("invalidation", {
// The frontend will subscribe to this for when to invalidate queries.
<BaseProcedure>::builder().subscription(|ctx, _: ()| async move {
Ok(stream! {
let mut tx = ctx.invalidation.tx.subscribe();
while let Ok(msg) = tx.recv().await {
yield Ok(msg);
}
})
})
})
}

fn invalidation<TError, TCtx, TInput, TResult, F>(
handler: impl Fn(TCtx, TInput, &Result<TResult, TError>) -> F + Send + Sync + 'static,
) -> Middleware<TError, TCtx, TInput, TResult>
where
TError: Send + 'static,
TCtx: Clone + Send + 'static,
TInput: Clone + Send + 'static,
TResult: Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
let handler = Arc::new(handler);
Middleware::new(move |ctx: TCtx, input: TInput, next| {
let handler = handler.clone();
async move {
let ctx2 = ctx.clone();
let input2 = input.clone();
let result = next.exec(ctx, input).await;
handler(ctx2, input2, &result).await;
result
}
})
}
1 change: 1 addition & 0 deletions examples/axum/src/api/library.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
// TODO: Library middleware
3 changes: 3 additions & 0 deletions examples/axum/src/api/streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// TODO: Streaming DB data
// TODO: File upload
// TODO: File download
3 changes: 3 additions & 0 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::Ipv6Addr;

use api::invalidation;
use axum::{routing::get, Router};
use tokio::sync::broadcast;
use tracing::info;
Expand All @@ -13,8 +14,10 @@ async fn main() {
let router = api::mount().build().unwrap();

let chat_tx = broadcast::channel(100).0;
let invalidation = invalidation::Ctx::new();
let ctx_fn = move || api::Context {
chat: api::chat::Ctx::new(chat_tx.clone()),
invalidation: invalidation.clone(),
};

let app = Router::new()
Expand Down
14 changes: 14 additions & 0 deletions integrations/tauri/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "rspc-tauri"
version = "0.0.1"
edition = "2021"

[dependencies]

# /bin/sh RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features
[package.metadata."docs.rs"]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[lints]
workspace = true
8 changes: 8 additions & 0 deletions integrations/tauri/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! rspc-tauri: Support for exposing rspc via Tauri IPC

Check warning on line 1 in integrations/tauri/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc` is missing `package.readme` metadata

warning: package `rspc` is missing `package.readme` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata = note: `-W clippy::cargo-common-metadata` implied by `-W clippy::cargo` = help: to override `-W clippy::cargo` add `#[allow(clippy::cargo_common_metadata)]`

Check warning on line 1 in integrations/tauri/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc-axum` is missing `package.readme` metadata

warning: package `rspc-axum` is missing `package.readme` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata

Check warning on line 1 in integrations/tauri/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc-tauri` is missing `package.description` metadata

warning: package `rspc-tauri` is missing `package.description` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata
#![cfg_attr(docsrs, feature(doc_cfg))]
#![doc(
html_logo_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png",
html_favicon_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png"
)]

// TODO
1 change: 1 addition & 0 deletions middleware/invalidation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"
publish = false # TODO: Crate metadata & publish

[dependencies]
async-stream = "0.3.5"
rspc = { path = "../../rspc" }

# /bin/sh RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features
Expand Down
82 changes: 82 additions & 0 deletions middleware/invalidation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# rspc Invalidation

For now this is not going to be released as we need to work out if their is any value to an official middleware, instead of an example project implementing the same thing user-space?

## Questions

For my own future reference: https://discord.com/channels/@me/813276814801764382/1263123489477361828

### Pull vs Push based invalidation events

Pull based is where the middleware is applied to the query.
Push based is where the middleware is applied to the mutation.

I think we want a pull-based so resources can define their dependencies a-la React dependencies array.

### Stream or not?

I'm leaning stream-based because it pushes the type safety concern onto the end user.

```rust
<BaseProcedure>::builder()
// "Pull"-based. Applied to queries. (I personally a "Pull"-based approach is better)
.with(rspc_invalidation::invalidation(
|input, result, operation| operation.key() == "store.set",
))
.with(rspc_invalidation::invalidation(
// TODO: how is `input().id` even gonna work lol
|input, result, operation| {
operation.key() == "notes.update" && operation.input().id == input.id
},
))
// "Push"-based. Applied to mutations.
.with(rspc_invalidation::invalidation(
|input, result, invalidate| invalidate("store.get", ()),
))
.with(rspc_invalidation::invalidation(
|input, result, operation| invalidate("notes.get", input.id),
))
// "Pull"-based but with stream.
.with(rspc_invalidation::invalidation(|input: TArgs| {
stream! {
// If practice subscribe to some central event bus for changes
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
yield Invalidate; // pub struct Invalidate;
}
}
}))
.query(...)
```

### Exposing result of procedure to invalidation closure

If we expose result to the invalidate callback either the `Stream` or the value must be `Clone` which is not great, although the constrain can be applied locally by the middleware.

If we expose the result and use a stream-based approach do we spawn a new invalidation closure for every result? I think this is something we will wanna leave the user in control of but no idea what that API would look like.

### How do we get `BuiltRouter` into `Procedure`?

It kinda has to come in via context or we need some magic system within rspc's core. Otherwise we basically have a recursive dependency.

### Frontend?

Will we expose a package or will it be on the user to hook it up?

## Other concerns

## User activity

Really we wanna only push invalidation events that are related to parts of the app the user currently has active. An official system would need to take this into account somehow. Maybe some integration with the frontend router and websocket state using the `TCtx`???

## Data or invalidation

If we can be pretty certain the frontend wants the new data we can safely push it straight to the frontend instead of just asking the frontend to refetch. This will be much faster but if your not tracking user-activity it will be way slower because of the potential volume of data.

Tracking user activity pretty much requires some level of router integration which might be nice to have an abstraction for but it's also hard.

## Authorization

**This is why rspc can't own the subscription!!!**

We should also have a way to take into account authorization and what invalidation events the user is able to see. For something like Spacedrive we never had this problem because we are a desktop app but any web app would require this.
20 changes: 1 addition & 19 deletions middleware/invalidation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,4 @@
html_favicon_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png"
)]

use rspc::middleware::Middleware;

/// TODO
pub fn invalidation<TError, TThisCtx, TThisInput, TThisResult>(
handler: impl Fn(TThisInput) -> bool,
) -> Middleware<TError, TThisCtx, TThisInput, TThisResult>
where
TError: 'static,
TThisCtx: Send + 'static,
TThisInput: Send + 'static,
TThisResult: Send + 'static,
{
Middleware::new(|ctx, input, next| async move { next.exec(ctx, input).await })
}

// TODO: Attach the subscription for updates
// TODO: How do we invoke the procedures from the `invalidation.subscribe` procedure?

// TODO: What will the frontend component look like???
// TODO: Refer to `../README.md` to see status
15 changes: 15 additions & 0 deletions middleware/playground/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "rspc-playground"
version = "0.0.0"
edition = "2021"
publish = false

[dependencies]

# /bin/sh RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features
[package.metadata."docs.rs"]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[lints]
workspace = true
3 changes: 3 additions & 0 deletions middleware/playground/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# rspc Playground

Coming soon...
8 changes: 8 additions & 0 deletions middleware/playground/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! rspc-playground: Playground for testing rspc procedures

Check warning on line 1 in middleware/playground/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc` is missing `package.readme` metadata

warning: package `rspc` is missing `package.readme` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata = note: `-W clippy::cargo-common-metadata` implied by `-W clippy::cargo` = help: to override `-W clippy::cargo` add `#[allow(clippy::cargo_common_metadata)]`

Check warning on line 1 in middleware/playground/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc-axum` is missing `package.readme` metadata

warning: package `rspc-axum` is missing `package.readme` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata

Check warning on line 1 in middleware/playground/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc-tauri` is missing `package.description` metadata

warning: package `rspc-tauri` is missing `package.description` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata

Check warning on line 1 in middleware/playground/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc-tauri` is missing `either package.license or package.license_file` metadata

warning: package `rspc-tauri` is missing `either package.license or package.license_file` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata

Check warning on line 1 in middleware/playground/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc-tauri` is missing `package.repository` metadata

warning: package `rspc-tauri` is missing `package.repository` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata

Check warning on line 1 in middleware/playground/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc-tauri` is missing `package.readme` metadata

warning: package `rspc-tauri` is missing `package.readme` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata

Check warning on line 1 in middleware/playground/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

package `rspc-tauri` is missing `package.keywords` metadata

warning: package `rspc-tauri` is missing `package.keywords` metadata | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#cargo_common_metadata
#![cfg_attr(docsrs, feature(doc_cfg))]
#![doc(
html_logo_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png",
html_favicon_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png"
)]

// TODO: A HTML playground for testing rspc procedures kinda like SwaggerUI.

0 comments on commit 1458dc3

Please sign in to comment.