Skip to content

Commit

Permalink
Improve cluster replies to be no longer be stringy typed
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Apr 17, 2023
1 parent bdab963 commit 98327f1
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 98 deletions.
42 changes: 29 additions & 13 deletions src/connectors/impls/cluster_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

// #![cfg_attr(coverage, no_coverage)]
use crate::ids::GenericAlias;
use crate::{
channel::{bounded, Receiver, Sender},
errors::already_created_error,
Expand All @@ -37,7 +38,7 @@ enum Command {
/// ```
///
/// Response: the value behing "the-key" or `null`
Get { key: String }, //strict: bool
Get { key: String, strict: bool },
/// Format:
/// ```json
/// {"put": "the-key"}
Expand Down Expand Up @@ -98,7 +99,7 @@ impl<'v> TryFrom<&'v Value<'v>> for Command {
if let Some(key) = v.get_str("get").map(ToString::to_string) {
Ok(Command::Get {
key,
// strict: v.get_bool("strict").unwrap_or(false),
strict: v.get_bool("strict").unwrap_or(false),
})
} else if let Some(key) = v.get_str("put").map(ToString::to_string) {
Ok(Command::Put { key })
Expand Down Expand Up @@ -231,7 +232,8 @@ impl Connector for Kv {
path: vec![],
};
let sink = KvSink {
raft_tx: ctx.raft().clone(),
alias: ctx.alias().clone(),
raft: ctx.raft().clone(),
tx: self.tx.clone(),
codec,
origin_uri,
Expand All @@ -246,18 +248,18 @@ impl Connector for Kv {
}

struct KvSink {
raft_tx: raft::Manager,
alias: Alias,
raft: raft::Manager,
tx: Sender<SourceReply>,
codec: Json<Sorted>,
origin_uri: EventOriginUri,
source_is_connected: Arc<AtomicBool>,
}

impl KvSink {
fn decode(&mut self, mut v: Option<String>, ingest_ns: u64) -> Result<Value<'static>> {
fn decode(&mut self, mut v: Option<Vec<u8>>, ingest_ns: u64) -> Result<Value<'static>> {
if let Some(v) = v.as_mut() {
// ALLOW: we no longer need the string afterwards
let data: &mut [u8] = unsafe { v.as_bytes_mut() };
let data: &mut [u8] = v.as_mut_slice();
// TODO: We could optimize this
Ok(self
.codec
Expand All @@ -279,22 +281,36 @@ impl KvSink {
ingest_ns: u64,
) -> Result<Vec<(Value<'static>, Value<'static>)>> {
match cmd {
Command::Get { key, .. } => {
let key_parts = vec![key.clone()];
Command::Get { key, strict } => {
let key_parts = vec![
self.alias.app_id().to_string(),
self.alias.app_instance().to_string(),
self.alias.alias().to_string(),
key.clone(),
];
let combined_key = key_parts.join(".");

self.decode(
dbg!(self.raft_tx.kv_get_local(combined_key).await)?,
if strict {
self.raft.kv_get(combined_key).await?
} else {
self.raft.kv_get_local(combined_key).await?
},
ingest_ns,
)
.map(|v| oks(op_name, key, v))
}
Command::Put { key } => {
// return the new value
let value_str = String::from_utf8(self.encode(value)?)?;
let key_parts = vec![key.clone()];
let value_vec = self.encode(value)?;
let key_parts = vec![
self.alias.app_id().to_string(),
self.alias.app_instance().to_string(),
self.alias.alias().to_string(),
key.clone(),
];
let combined_key = key_parts.join(".");
self.raft_tx.kv_set(combined_key, value_str).await?;
self.raft.kv_set(combined_key, value_vec).await?;
Ok(oks(op_name, key, value.clone_static()))
} // Command::Swap { key } => {
// // return the old value
Expand Down
3 changes: 1 addition & 2 deletions src/raft/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub(crate) type ReplySender<T> = OneShotSender<T>;
pub(crate) enum APIStoreReq {
GetApp(AppId, ReplySender<Option<StateApp>>),
GetApps(ReplySender<HashMap<AppId, AppState>>),
KVGet(String, ReplySender<Option<String>>),
KVGet(String, ReplySender<Option<Vec<u8>>>),
GetNode(NodeId, ReplySender<Option<Addr>>),
GetNodes(ReplySender<HashMap<NodeId, Addr>>),
GetNodeId(Addr, ReplySender<Option<NodeId>>),
Expand Down Expand Up @@ -258,7 +258,6 @@ pub enum APIError {
/// fallback error type
Other(String),
}

impl IntoResponse for APIError {
fn into_response(self) -> Response {
let status = match &self {
Expand Down
28 changes: 22 additions & 6 deletions src/raft/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ impl Tremor {
///
/// # Errors
/// if the api call fails
pub async fn write(&self, req: &TremorSet) -> ClientResult<String> {
self.api_req::<TremorSet, String>("api/kv/write", Method::POST, Some(req))
pub async fn write(&self, req: &TremorSet) -> ClientResult<Vec<u8>> {
self.api_req::<TremorSet, Vec<u8>>("api/kv/write", Method::POST, Some(req))
.await
}
/// Read value by key, in an inconsistent mode.
Expand All @@ -124,10 +124,10 @@ impl Tremor {
///
/// # Errors
/// if the api call fails
pub async fn read(&self, req: &str) -> ClientResult<Option<String>> {
pub async fn read(&self, req: &str) -> ClientResult<Vec<u8>> {
let tremor_res: TremorResponse =
self.api_req("api/kv/read", Method::POST, Some(req)).await?;
Ok(tremor_res.value)
Ok(tremor_res.into_kv_value()?)
}

/// Consistent Read value by key.
Expand All @@ -136,11 +136,11 @@ impl Tremor {
///
/// # Errors
/// if the api call fails
pub async fn consistent_read(&self, req: &str) -> ClientResult<Option<String>> {
pub async fn consistent_read(&self, req: &str) -> ClientResult<Vec<u8>> {
let tremor_res: TremorResponse = self
.api_req("api/kv/consistent_read", Method::POST, Some(req))
.await?;
Ok(tremor_res.value)
Ok(tremor_res.into_kv_value()?)
}
}

Expand Down Expand Up @@ -435,8 +435,24 @@ impl From<reqwest::Error> for Error {
Self::HTTP(e)
}
}

impl From<crate::Error> for Error {
fn from(e: crate::Error) -> Self {
Self::Other(e.to_string())
}
}
impl<'s> From<&'s str> for Error {
fn from(e: &'s str) -> Self {
Self::Other(e.into())
}
}

impl Error {
#[must_use]
pub fn is_not_found(&self) -> bool {
match self {
Self::HTTP(e) => e.status() == Some(reqwest::StatusCode::NOT_FOUND),
Self::Other(_) => false,
}
}
}
6 changes: 1 addition & 5 deletions src/raft/api/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ async fn add_node(
.await
.to_api_result(&uri, &state)
.await?;
let node_id = response
.data
.value
.ok_or_else(|| APIError::Other("Invalid node_id".to_string()))?
.parse::<NodeId>()?;
let node_id: NodeId = NodeId::try_from(response.data)?;
debug!("node {addr} added to the cluster as node {node_id}");
Ok(Json(node_id))
}
Expand Down
37 changes: 23 additions & 14 deletions src/raft/api/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

use crate::raft::{
api::{APIError, APIRequest, APIResult, ToAPIResult},
store::{TremorResponse, TremorSet},
store::TremorSet,
};
use axum::{extract, routing::post, Router};
use http::StatusCode;
use tokio::time::timeout;

use super::API_WORKER_TIMEOUT;
Expand All @@ -32,43 +33,51 @@ async fn write(
extract::State(state): extract::State<APIRequest>,
extract::OriginalUri(uri): extract::OriginalUri,
extract::Json(body): extract::Json<TremorSet>,
) -> APIResult<String> {
) -> APIResult<Vec<u8>> {
let res = state
.raft
.client_write(body.into())
.await
.to_api_result(&uri, &state)
.await?;
if let Some(value) = res.data.value {
Ok(value)
} else {
Err(APIError::Store(
"State machine didn't return the stored value upon write".to_string(),
))
}

Ok(res.data.into_kv_value()?)
}

/// read a value from the current node, not necessarily the leader, thus this value can be stale
async fn read(
extract::State(state): extract::State<APIRequest>,
extract::Json(key): extract::Json<String>,
) -> APIResult<TremorResponse> {
) -> APIResult<Vec<u8>> {
let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??;
Ok(TremorResponse { value })
if let Some(value) = value {
Ok(value)
} else {
Err(APIError::HTTP {
status: StatusCode::NOT_FOUND,
message: "Key not found".to_string(),
})
}
}

/// read a value from the leader. If this request is received by another node, it will return a redirect
async fn consistent_read(
extract::State(state): extract::State<APIRequest>,
extract::OriginalUri(uri): extract::OriginalUri,
extract::Json(key): extract::Json<String>,
) -> APIResult<TremorResponse> {
) -> APIResult<Vec<u8>> {
// this will fail if we are not a leader
state.ensure_leader(Some(uri.clone())).await?;
// here we are safe to read
let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??;

// Ensure that we are still the leader at the end of the read so we can guarantee freshness
state.ensure_leader(Some(uri)).await?;
Ok(TremorResponse { value })
if let Some(value) = value {
Ok(value)
} else {
Err(APIError::HTTP {
status: StatusCode::NOT_FOUND,
message: "Key not found".to_string(),
})
}
}
34 changes: 28 additions & 6 deletions src/raft/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Manager {
}

// kv
pub async fn kv_set(&self, key: String, value: String) -> Result<Option<String>> {
pub async fn kv_set(&self, key: String, value: Vec<u8>) -> Result<Vec<u8>> {
match self.is_leader().await {
Ok(_) => self.kv_set_local(key, value).await,
Err(Error(
Expand All @@ -145,17 +145,39 @@ impl Manager {
_,
)) => {
let client = crate::raft::api::client::Tremor::new(n.api())?;
Ok(Some(client.write(&TremorSet { key, value }).await?))
Ok(client.write(&TremorSet { key, value }).await?)
}
Err(e) => Err(e),
}
}
pub async fn kv_set_local(&self, key: String, value: String) -> Result<Option<String>> {
pub async fn kv_set_local(&self, key: String, value: Vec<u8>) -> Result<Vec<u8>> {
let tremor_res = self.client_write(TremorSet { key, value }).await?;
Ok(tremor_res.data.value)
tremor_res.data.into_kv_value()
}

pub async fn kv_get_local(&self, key: String) -> Result<Option<String>> {
pub async fn kv_get(&self, key: String) -> Result<Option<Vec<u8>>> {
match self.is_leader().await {
Ok(_) => self.kv_get_local(key).await,
Err(Error(
ErrorKind::CheckIsLeaderError(RaftError::APIError(
CheckIsLeaderError::ForwardToLeader(ForwardToLeader {
leader_node: Some(n),
..
}),
)),
_,
)) => {
let client = crate::raft::api::client::Tremor::new(n.api())?;
let res = client.read(&key).await;
match res {
Ok(v) => Ok(Some(v)),
Err(e) if e.is_not_found() => Ok(None),
Err(e) => Err(e.into()),
}
}
Err(e) => Err(e),
}
}
pub async fn kv_get_local(&self, key: String) -> Result<Option<Vec<u8>>> {
let (tx, rx) = oneshot();
let command = APIStoreReq::KVGet(key, tx);
self.send(command).await?;
Expand Down
13 changes: 2 additions & 11 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
api::{self, ServerState},
network::{raft, Raft as TarPCRaftService},
store::{NodesRequest, Store, TremorRequest},
ClusterError, ClusterResult, Manager, Network,
ClusterError, ClusterResult, Manager, Network, NodeId,
},
system::{Runtime, ShutdownMode, WorldConfig},
};
Expand Down Expand Up @@ -424,16 +424,7 @@ impl Node {
.await
{
Ok(r) => {
let assigned_node_id = r
.data
.value
.ok_or_else(|| {
ClusterError::Other("Invalid Response from raft for AddNode".to_string())
})?
.parse::<crate::raft::NodeId>()
.map_err(|e| {
ClusterError::Other(format!("Invalid node_id returned from AddNode: {e}"))
})?;
let assigned_node_id = NodeId::try_from(r.data)?;
debug_assert_eq!(node_id, assigned_node_id, "Adding initial leader resulted in a differing node_id: {assigned_node_id}, expected: {node_id}");
let (worker_handle, server_state) =
api::initialize(node_id, addr, raft.clone(), store, store_tx, store_rx);
Expand Down
Loading

0 comments on commit 98327f1

Please sign in to comment.