From 98327f1b0c02d76957a6571668353ec903063d66 Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Fri, 14 Apr 2023 16:53:02 +0200 Subject: [PATCH] Improve cluster replies to be no longer be stringy typed Signed-off-by: Heinz N. Gies --- src/connectors/impls/cluster_kv.rs | 42 +++++++++++++------- src/raft/api.rs | 3 +- src/raft/api/client.rs | 28 +++++++++++--- src/raft/api/cluster.rs | 6 +-- src/raft/api/kv.rs | 37 +++++++++++------- src/raft/manager.rs | 34 +++++++++++++--- src/raft/node.rs | 13 +------ src/raft/store.rs | 58 ++++++++++++++++++++++++---- src/raft/store/statemachine/apps.rs | 20 +++------- src/raft/store/statemachine/kv.rs | 17 ++------ src/raft/store/statemachine/nodes.rs | 4 +- src/raft/test.rs | 7 +++- tremor-cli/src/cluster.rs | 2 +- 13 files changed, 173 insertions(+), 98 deletions(-) diff --git a/src/connectors/impls/cluster_kv.rs b/src/connectors/impls/cluster_kv.rs index f28d5a69d6..c1f2f99d5a 100644 --- a/src/connectors/impls/cluster_kv.rs +++ b/src/connectors/impls/cluster_kv.rs @@ -13,6 +13,7 @@ // limitations under the License. // #![cfg_attr(coverage, no_coverage)] +use crate::ids::GenericAlias; use crate::{ channel::{bounded, Receiver, Sender}, errors::already_created_error, @@ -37,7 +38,7 @@ enum Command { /// ``` /// /// Response: the value behing "the-key" or `null` - Get { key: String }, //strict: bool + Get { key: String, strict: bool }, /// Format: /// ```json /// {"put": "the-key"} @@ -98,7 +99,7 @@ impl<'v> TryFrom<&'v Value<'v>> for Command { if let Some(key) = v.get_str("get").map(ToString::to_string) { Ok(Command::Get { key, - // strict: v.get_bool("strict").unwrap_or(false), + strict: v.get_bool("strict").unwrap_or(false), }) } else if let Some(key) = v.get_str("put").map(ToString::to_string) { Ok(Command::Put { key }) @@ -231,7 +232,8 @@ impl Connector for Kv { path: vec![], }; let sink = KvSink { - raft_tx: ctx.raft().clone(), + alias: ctx.alias().clone(), + raft: ctx.raft().clone(), tx: self.tx.clone(), codec, origin_uri, @@ -246,7 +248,8 @@ impl Connector for Kv { } struct KvSink { - raft_tx: raft::Manager, + alias: Alias, + raft: raft::Manager, tx: Sender, codec: Json, origin_uri: EventOriginUri, @@ -254,10 +257,9 @@ struct KvSink { } impl KvSink { - fn decode(&mut self, mut v: Option, ingest_ns: u64) -> Result> { + fn decode(&mut self, mut v: Option>, ingest_ns: u64) -> Result> { if let Some(v) = v.as_mut() { - // ALLOW: we no longer need the string afterwards - let data: &mut [u8] = unsafe { v.as_bytes_mut() }; + let data: &mut [u8] = v.as_mut_slice(); // TODO: We could optimize this Ok(self .codec @@ -279,22 +281,36 @@ impl KvSink { ingest_ns: u64, ) -> Result, Value<'static>)>> { match cmd { - Command::Get { key, .. } => { - let key_parts = vec![key.clone()]; + Command::Get { key, strict } => { + let key_parts = vec![ + self.alias.app_id().to_string(), + self.alias.app_instance().to_string(), + self.alias.alias().to_string(), + key.clone(), + ]; let combined_key = key_parts.join("."); self.decode( - dbg!(self.raft_tx.kv_get_local(combined_key).await)?, + if strict { + self.raft.kv_get(combined_key).await? + } else { + self.raft.kv_get_local(combined_key).await? + }, ingest_ns, ) .map(|v| oks(op_name, key, v)) } Command::Put { key } => { // return the new value - let value_str = String::from_utf8(self.encode(value)?)?; - let key_parts = vec![key.clone()]; + let value_vec = self.encode(value)?; + let key_parts = vec![ + self.alias.app_id().to_string(), + self.alias.app_instance().to_string(), + self.alias.alias().to_string(), + key.clone(), + ]; let combined_key = key_parts.join("."); - self.raft_tx.kv_set(combined_key, value_str).await?; + self.raft.kv_set(combined_key, value_vec).await?; Ok(oks(op_name, key, value.clone_static())) } // Command::Swap { key } => { // // return the old value diff --git a/src/raft/api.rs b/src/raft/api.rs index 56277334f1..d16e16eb91 100644 --- a/src/raft/api.rs +++ b/src/raft/api.rs @@ -58,7 +58,7 @@ pub(crate) type ReplySender = OneShotSender; pub(crate) enum APIStoreReq { GetApp(AppId, ReplySender>), GetApps(ReplySender>), - KVGet(String, ReplySender>), + KVGet(String, ReplySender>>), GetNode(NodeId, ReplySender>), GetNodes(ReplySender>), GetNodeId(Addr, ReplySender>), @@ -258,7 +258,6 @@ pub enum APIError { /// fallback error type Other(String), } - impl IntoResponse for APIError { fn into_response(self) -> Response { let status = match &self { diff --git a/src/raft/api/client.rs b/src/raft/api/client.rs index d839956a35..eac60168ac 100644 --- a/src/raft/api/client.rs +++ b/src/raft/api/client.rs @@ -114,8 +114,8 @@ impl Tremor { /// /// # Errors /// if the api call fails - pub async fn write(&self, req: &TremorSet) -> ClientResult { - self.api_req::("api/kv/write", Method::POST, Some(req)) + pub async fn write(&self, req: &TremorSet) -> ClientResult> { + self.api_req::>("api/kv/write", Method::POST, Some(req)) .await } /// Read value by key, in an inconsistent mode. @@ -124,10 +124,10 @@ impl Tremor { /// /// # Errors /// if the api call fails - pub async fn read(&self, req: &str) -> ClientResult> { + pub async fn read(&self, req: &str) -> ClientResult> { let tremor_res: TremorResponse = self.api_req("api/kv/read", Method::POST, Some(req)).await?; - Ok(tremor_res.value) + Ok(tremor_res.into_kv_value()?) } /// Consistent Read value by key. @@ -136,11 +136,11 @@ impl Tremor { /// /// # Errors /// if the api call fails - pub async fn consistent_read(&self, req: &str) -> ClientResult> { + pub async fn consistent_read(&self, req: &str) -> ClientResult> { let tremor_res: TremorResponse = self .api_req("api/kv/consistent_read", Method::POST, Some(req)) .await?; - Ok(tremor_res.value) + Ok(tremor_res.into_kv_value()?) } } @@ -435,8 +435,24 @@ impl From for Error { Self::HTTP(e) } } + +impl From for Error { + fn from(e: crate::Error) -> Self { + Self::Other(e.to_string()) + } +} impl<'s> From<&'s str> for Error { fn from(e: &'s str) -> Self { Self::Other(e.into()) } } + +impl Error { + #[must_use] + pub fn is_not_found(&self) -> bool { + match self { + Self::HTTP(e) => e.status() == Some(reqwest::StatusCode::NOT_FOUND), + Self::Other(_) => false, + } + } +} diff --git a/src/raft/api/cluster.rs b/src/raft/api/cluster.rs index 20e1fa097d..cab2c7c89e 100644 --- a/src/raft/api/cluster.rs +++ b/src/raft/api/cluster.rs @@ -91,11 +91,7 @@ async fn add_node( .await .to_api_result(&uri, &state) .await?; - let node_id = response - .data - .value - .ok_or_else(|| APIError::Other("Invalid node_id".to_string()))? - .parse::()?; + let node_id: NodeId = NodeId::try_from(response.data)?; debug!("node {addr} added to the cluster as node {node_id}"); Ok(Json(node_id)) } diff --git a/src/raft/api/kv.rs b/src/raft/api/kv.rs index f019ac7478..ed04628ec3 100644 --- a/src/raft/api/kv.rs +++ b/src/raft/api/kv.rs @@ -14,9 +14,10 @@ use crate::raft::{ api::{APIError, APIRequest, APIResult, ToAPIResult}, - store::{TremorResponse, TremorSet}, + store::TremorSet, }; use axum::{extract, routing::post, Router}; +use http::StatusCode; use tokio::time::timeout; use super::API_WORKER_TIMEOUT; @@ -32,29 +33,31 @@ async fn write( extract::State(state): extract::State, extract::OriginalUri(uri): extract::OriginalUri, extract::Json(body): extract::Json, -) -> APIResult { +) -> APIResult> { let res = state .raft .client_write(body.into()) .await .to_api_result(&uri, &state) .await?; - if let Some(value) = res.data.value { - Ok(value) - } else { - Err(APIError::Store( - "State machine didn't return the stored value upon write".to_string(), - )) - } + + Ok(res.data.into_kv_value()?) } /// read a value from the current node, not necessarily the leader, thus this value can be stale async fn read( extract::State(state): extract::State, extract::Json(key): extract::Json, -) -> APIResult { +) -> APIResult> { let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??; - Ok(TremorResponse { value }) + if let Some(value) = value { + Ok(value) + } else { + Err(APIError::HTTP { + status: StatusCode::NOT_FOUND, + message: "Key not found".to_string(), + }) + } } /// read a value from the leader. If this request is received by another node, it will return a redirect @@ -62,13 +65,19 @@ async fn consistent_read( extract::State(state): extract::State, extract::OriginalUri(uri): extract::OriginalUri, extract::Json(key): extract::Json, -) -> APIResult { +) -> APIResult> { // this will fail if we are not a leader state.ensure_leader(Some(uri.clone())).await?; // here we are safe to read let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??; - // Ensure that we are still the leader at the end of the read so we can guarantee freshness state.ensure_leader(Some(uri)).await?; - Ok(TremorResponse { value }) + if let Some(value) = value { + Ok(value) + } else { + Err(APIError::HTTP { + status: StatusCode::NOT_FOUND, + message: "Key not found".to_string(), + }) + } } diff --git a/src/raft/manager.rs b/src/raft/manager.rs index 978661163b..4289921ee8 100644 --- a/src/raft/manager.rs +++ b/src/raft/manager.rs @@ -132,7 +132,7 @@ impl Manager { } // kv - pub async fn kv_set(&self, key: String, value: String) -> Result> { + pub async fn kv_set(&self, key: String, value: Vec) -> Result> { match self.is_leader().await { Ok(_) => self.kv_set_local(key, value).await, Err(Error( @@ -145,17 +145,39 @@ impl Manager { _, )) => { let client = crate::raft::api::client::Tremor::new(n.api())?; - Ok(Some(client.write(&TremorSet { key, value }).await?)) + Ok(client.write(&TremorSet { key, value }).await?) } Err(e) => Err(e), } } - pub async fn kv_set_local(&self, key: String, value: String) -> Result> { + pub async fn kv_set_local(&self, key: String, value: Vec) -> Result> { let tremor_res = self.client_write(TremorSet { key, value }).await?; - Ok(tremor_res.data.value) + tremor_res.data.into_kv_value() } - - pub async fn kv_get_local(&self, key: String) -> Result> { + pub async fn kv_get(&self, key: String) -> Result>> { + match self.is_leader().await { + Ok(_) => self.kv_get_local(key).await, + Err(Error( + ErrorKind::CheckIsLeaderError(RaftError::APIError( + CheckIsLeaderError::ForwardToLeader(ForwardToLeader { + leader_node: Some(n), + .. + }), + )), + _, + )) => { + let client = crate::raft::api::client::Tremor::new(n.api())?; + let res = client.read(&key).await; + match res { + Ok(v) => Ok(Some(v)), + Err(e) if e.is_not_found() => Ok(None), + Err(e) => Err(e.into()), + } + } + Err(e) => Err(e), + } + } + pub async fn kv_get_local(&self, key: String) -> Result>> { let (tx, rx) = oneshot(); let command = APIStoreReq::KVGet(key, tx); self.send(command).await?; diff --git a/src/raft/node.rs b/src/raft/node.rs index 751d53e81a..8a1d8e5026 100644 --- a/src/raft/node.rs +++ b/src/raft/node.rs @@ -21,7 +21,7 @@ use crate::{ api::{self, ServerState}, network::{raft, Raft as TarPCRaftService}, store::{NodesRequest, Store, TremorRequest}, - ClusterError, ClusterResult, Manager, Network, + ClusterError, ClusterResult, Manager, Network, NodeId, }, system::{Runtime, ShutdownMode, WorldConfig}, }; @@ -424,16 +424,7 @@ impl Node { .await { Ok(r) => { - let assigned_node_id = r - .data - .value - .ok_or_else(|| { - ClusterError::Other("Invalid Response from raft for AddNode".to_string()) - })? - .parse::() - .map_err(|e| { - ClusterError::Other(format!("Invalid node_id returned from AddNode: {e}")) - })?; + let assigned_node_id = NodeId::try_from(r.data)?; debug_assert_eq!(node_id, assigned_node_id, "Adding initial leader resulted in a differing node_id: {assigned_node_id}, expected: {node_id}"); let (worker_handle, server_state) = api::initialize(node_id, addr, raft.clone(), store, store_tx, store_rx); diff --git a/src/raft/store.rs b/src/raft/store.rs index 0cc014755c..fdf238d74d 100644 --- a/src/raft/store.rs +++ b/src/raft/store.rs @@ -49,7 +49,7 @@ use super::{node::Addr, NodeId, TremorRaftConfig}; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum KvRequest { /// Set a key to the provided value in the cluster state - Set { key: String, value: String }, + Set { key: String, value: Vec }, } /// Operations on the nodes known to the cluster @@ -134,7 +134,7 @@ pub enum TremorInstanceState { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct TremorSet { pub key: String, - pub value: String, + pub value: Vec, } impl From for TremorRequest { @@ -151,12 +151,56 @@ impl From for TremorRequest { * In this example it will return a optional value from a given key in * the `ExampleRequest.Set`. * - * TODO: `SHould` we explain how to create multiple `AppDataResponse`? + * TODO: `Should` we explain how to create multiple `AppDataResponse`? * */ #[derive(Serialize, Deserialize, Debug, Clone, Default)] -pub struct TremorResponse { - pub value: Option, +pub enum TremorResponse { + #[default] + None, + KvValue(Vec), + AppId(AppId), + NodeId(NodeId), + AppFlowInstanceId(AppFlowInstanceId), +} + +impl TremorResponse { + pub(crate) fn into_kv_value(self) -> crate::Result> { + match self { + TremorResponse::KvValue(v) => Ok(v), + _ => Err(RuntimeError::from("Not a kv value")), + } + } +} + +impl TryFrom for AppId { + type Error = RuntimeError; + fn try_from(response: TremorResponse) -> crate::Result { + match response { + TremorResponse::AppId(id) => Ok(id), + _ => Err(RuntimeError::from("Not an app id")), + } + } +} + +impl TryFrom for NodeId { + type Error = RuntimeError; + fn try_from(response: TremorResponse) -> crate::Result { + match response { + TremorResponse::NodeId(id) => Ok(id), + _ => Err(RuntimeError::from("Not a node id")), + } + } +} + +impl TryFrom for AppFlowInstanceId { + type Error = RuntimeError; + fn try_from(response: TremorResponse) -> crate::Result { + match response { + TremorResponse::AppFlowInstanceId(id) => Ok(id), + _ => Err(RuntimeError::from("Not an app flow instance id")), + } + } } #[derive(Serialize, Deserialize, Debug)] @@ -646,7 +690,7 @@ impl RaftStorage for Store { sm.set_last_applied_log(entry.log_id)?; match entry.payload { - EntryPayload::Blank => result.push(TremorResponse { value: None }), + EntryPayload::Blank => result.push(TremorResponse::None), EntryPayload::Normal(ref request) => { result.push(sm.handle_request(entry.log_id, request).await?); } @@ -656,7 +700,7 @@ impl RaftStorage for Store { Some(entry.log_id), mem.clone(), ))?; - result.push(TremorResponse { value: None }); + result.push(TremorResponse::None); } }; } diff --git a/src/raft/store/statemachine/apps.rs b/src/raft/store/statemachine/apps.rs index f57fce9e76..123ab38368 100644 --- a/src/raft/store/statemachine/apps.rs +++ b/src/raft/store/statemachine/apps.rs @@ -233,15 +233,11 @@ impl RaftStateMachine for AppsStateMachine { match cmd { AppsRequest::InstallApp { app, file } => { self.load_archive(file)?; - Ok(TremorResponse { - value: Some(app.name().to_string()), - }) + Ok(TremorResponse::AppId(app.name().clone())) } AppsRequest::UninstallApp { app, force } => { self.uninstall_app(app, *force).await?; - Ok(TremorResponse { - value: Some(app.to_string()), - }) + Ok(TremorResponse::AppId(app.clone())) } AppsRequest::Deploy { app, @@ -252,21 +248,15 @@ impl RaftStateMachine for AppsStateMachine { } => { self.deploy_flow(app, flow.clone(), instance.clone(), config.clone(), *state) .await?; - Ok(TremorResponse { - value: Some(instance.to_string()), - }) + Ok(TremorResponse::AppFlowInstanceId(instance.clone())) } AppsRequest::Undeploy(instance) => { self.stop_and_remove_flow(instance).await?; - Ok(TremorResponse { - value: Some(instance.to_string()), - }) + Ok(TremorResponse::AppFlowInstanceId(instance.clone())) } AppsRequest::InstanceStateChange { instance, state } => { self.change_flow_state(instance, *state).await?; - Ok(TremorResponse { - value: Some(instance.to_string()), - }) + Ok(TremorResponse::AppFlowInstanceId(instance.clone())) } } } diff --git a/src/raft/store/statemachine/kv.rs b/src/raft/store/statemachine/kv.rs index 79fc7cf672..6043d519d8 100644 --- a/src/raft/store/statemachine/kv.rs +++ b/src/raft/store/statemachine/kv.rs @@ -49,25 +49,18 @@ impl KvStateMachine { } /// Store `value` at `key` in the distributed KV store - fn insert(&self, key: &str, value: &str) -> StorageResult<()> { + fn insert(&self, key: &str, value: &[u8]) -> StorageResult<()> { self.db - .put_cf(Self::cf(&self.db)?, key.as_bytes(), value.as_bytes()) + .put_cf(Self::cf(&self.db)?, key.as_bytes(), value) .map_err(store_w_err) } /// try to obtain the value at the given `key`. /// Returns `Ok(None)` if there is no value for that key. - pub(crate) fn get(&self, key: &str) -> StorageResult> { + pub(crate) fn get(&self, key: &str) -> StorageResult>> { let key = key.as_bytes(); self.db .get_cf(Self::cf(&self.db)?, key) - .map(|value| { - if let Some(value) = value { - Some(String::from_utf8(value).ok()?) - } else { - None - } - }) .map_err(store_r_err) } } @@ -114,9 +107,7 @@ impl RaftStateMachine for KvStateMachine { match cmd { KvRequest::Set { key, value } => { self.insert(key, value)?; - Ok(TremorResponse { - value: Some(value.clone()), - }) + Ok(TremorResponse::KvValue(value.clone())) } } } diff --git a/src/raft/store/statemachine/nodes.rs b/src/raft/store/statemachine/nodes.rs index 123bdfd697..53432af9fa 100644 --- a/src/raft/store/statemachine/nodes.rs +++ b/src/raft/store/statemachine/nodes.rs @@ -120,9 +120,7 @@ impl RaftStateMachine for NodesStateMachine { } }; - Ok(TremorResponse { - value: Some(node_id.to_string()), - }) + Ok(TremorResponse::NodeId(node_id)) } fn column_families() -> &'static [&'static str] { &Self::COLUMN_FAMILIES diff --git a/src/raft/test.rs b/src/raft/test.rs index 562fec2f30..a82f4d9ebb 100644 --- a/src/raft/test.rs +++ b/src/raft/test.rs @@ -1,3 +1,5 @@ +use matches::assert_matches; + // Copyright 2022, The Tremor Team // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -187,8 +189,9 @@ async fn kill_and_restart_voter() -> ClusterResult<()> { // check that the leader is available // TODO: solidify to guard against timing issues let client1 = node0.client(); - let k = client1.consistent_read("snot").await?; - assert!(k.is_none()); + let k = client1.consistent_read("snot").await; + // Snot was never set so it should be a 404 + assert_matches!(k, Err(e) if e.is_not_found()); node1.stop().await?; node2.stop().await?; diff --git a/tremor-cli/src/cluster.rs b/tremor-cli/src/cluster.rs index 8d222ef0f9..703f9a5d1b 100644 --- a/tremor-cli/src/cluster.rs +++ b/tremor-cli/src/cluster.rs @@ -146,7 +146,7 @@ impl Cluster { signal_handle.close(); signal_handler_task.abort(); } - // target/debug/tremor cluster start --db-dir temp/test-db2 --api 127.0.0.1:8002 --rpc 127.0.0.1:9002 --join 127.0.0.1:8001 + // target/debug/tremor cluster start --db-dir temp/test-db2 --api 127.0.0.1:8002 --rpc 127.0.0.1:9002 --join 127.0.0.1:8001 // target/debug/tremor cluster start --db-dir temp/test-db3 --api 127.0.0.1:8003 --rpc 127.0.0.1:9003 --join 127.0.0.1:8001 ClusterCommand::Start { db_dir,