diff --git a/src/raft/api.rs b/src/raft/api.rs index d16e16eb91..5767d82b6c 100644 --- a/src/raft/api.rs +++ b/src/raft/api.rs @@ -484,3 +484,9 @@ impl From for APIError { Self::Timeout } } + +impl From for APIError { + fn from(e: simd_json::Error) -> Self { + Self::Other(e.to_string()) + } +} diff --git a/src/raft/api/kv.rs b/src/raft/api/kv.rs index ed04628ec3..cfbd5a8737 100644 --- a/src/raft/api/kv.rs +++ b/src/raft/api/kv.rs @@ -16,8 +16,9 @@ use crate::raft::{ api::{APIError, APIRequest, APIResult, ToAPIResult}, store::TremorSet, }; -use axum::{extract, routing::post, Router}; +use axum::{extract, routing::post, Json, Router}; use http::StatusCode; +use simd_json::OwnedValue; use tokio::time::timeout; use super::API_WORKER_TIMEOUT; @@ -29,14 +30,24 @@ pub(crate) fn endpoints() -> Router { .route("/consistent_read", post(consistent_read)) } +#[derive(Deserialize)] +struct KVSet { + key: String, + value: OwnedValue, +} + async fn write( extract::State(state): extract::State, extract::OriginalUri(uri): extract::OriginalUri, - extract::Json(body): extract::Json, + extract::Json(body): extract::Json, ) -> APIResult> { + let tremor_set = TremorSet { + key: body.key, + value: simd_json::to_vec(&body.value)?, + }; let res = state .raft - .client_write(body.into()) + .client_write(tremor_set.into()) .await .to_api_result(&uri, &state) .await?; @@ -48,10 +59,10 @@ async fn write( async fn read( extract::State(state): extract::State, extract::Json(key): extract::Json, -) -> APIResult> { +) -> APIResult> { let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??; - if let Some(value) = value { - Ok(value) + if let Some(mut value) = value { + Ok(Json(simd_json::from_slice(&mut value)?)) } else { Err(APIError::HTTP { status: StatusCode::NOT_FOUND, @@ -65,15 +76,15 @@ async fn consistent_read( extract::State(state): extract::State, extract::OriginalUri(uri): extract::OriginalUri, extract::Json(key): extract::Json, -) -> APIResult> { +) -> APIResult> { // this will fail if we are not a leader state.ensure_leader(Some(uri.clone())).await?; // here we are safe to read let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??; // Ensure that we are still the leader at the end of the read so we can guarantee freshness state.ensure_leader(Some(uri)).await?; - if let Some(value) = value { - Ok(value) + if let Some(mut value) = value { + Ok(Json(simd_json::from_slice(&mut value)?)) } else { Err(APIError::HTTP { status: StatusCode::NOT_FOUND,