Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WriteTo trait and implement it for Message struct #666

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions sdk/src/binary/binary_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::client::Client;
use crate::error::IggyError;
use crate::write_to::WriteTo;
use async_trait::async_trait;
use bytes::Bytes;

Expand All @@ -16,11 +17,25 @@ pub enum ClientState {

/// A client that can send and receive binary messages.
#[async_trait]
pub trait BinaryClient: Client {
pub(crate) trait BinaryClient: Client {
/// Gets the state of the client.
async fn get_state(&self) -> ClientState;
/// Sets the state of the client.
async fn set_state(&self, state: ClientState);
/// Sends a command and returns the response.
async fn send_with_response(&self, command: u32, payload: Bytes) -> Result<Bytes, IggyError>;
async fn send_with_response(
&self,
command_code: u32,
payload: Bytes,
) -> Result<Bytes, IggyError>;
/// Sends a command and returns the response.
///
/// No additional memory allocation is required, and the Command is serialized directly into the transmission stream.
async fn send_with_response_v2(
&self,
_command_code: u32,
_command: &(dyn WriteTo + Send + Sync),
) -> Result<Bytes, IggyError> {
unimplemented!("send_with_response_v2 is not implemented!")
}
}
2 changes: 1 addition & 1 deletion sdk/src/binary/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl<B: BinaryClient> MessageClient for B {

async fn send_messages(&self, command: &mut SendMessages) -> Result<(), IggyError> {
fail_if_not_authenticated(self).await?;
self.send_with_response(SEND_MESSAGES_CODE, command.as_bytes())
self.send_with_response_v2(SEND_MESSAGES_CODE, command)
.await?;
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/bytes_serializable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ pub trait BytesSerializable {
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
where
Self: Sized;

/// Computes the size of the struct in bytes.
fn size(&self) -> usize {
unimplemented!("size")
}
}
4 changes: 4 additions & 0 deletions sdk/src/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ impl Identifier {
}

impl BytesSerializable for Identifier {
fn size(&self) -> usize {
2 + self.length as usize
}

fn as_bytes(&self) -> Bytes {
let mut bytes = BytesMut::with_capacity(2 + self.length as usize);
bytes.put_u8(self.kind.as_code());
Expand Down
1 change: 1 addition & 0 deletions sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ pub mod topics;
pub mod users;
pub mod utils;
pub mod validatable;
pub mod write_to;
51 changes: 51 additions & 0 deletions sdk/src/messages/send_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::identifier::Identifier;
use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE};
use crate::models::header;
use crate::models::header::{HeaderKey, HeaderValue};
use crate::tcp::client::ConnectionStream;
use crate::validatable::Validatable;
use crate::write_to::WriteTo;
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use serde_with::base64::Base64;
Expand Down Expand Up @@ -293,6 +296,10 @@ impl Display for Message {
}

impl BytesSerializable for Partitioning {
fn size(&self) -> usize {
2 + self.length as usize
}

fn as_bytes(&self) -> Bytes {
let mut bytes = BytesMut::with_capacity(2 + self.length as usize);
bytes.put_u8(self.kind.as_code());
Expand Down Expand Up @@ -324,6 +331,25 @@ impl BytesSerializable for Partitioning {
}
}

#[async_trait]
impl WriteTo for Message {
async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError> {
stream.write(self.id.to_le_bytes().as_ref()).await?;
if let Some(headers) = &self.headers {
let headers_bytes = headers.as_bytes();
stream
.write((headers_bytes.len() as u32).to_le_bytes().as_ref())
.await?;
stream.write(&headers_bytes).await?;
} else {
stream.write(0u32.to_le_bytes().as_ref()).await?;
}
stream.write(self.length.to_le_bytes().as_ref()).await?;
stream.write(self.payload.as_ref()).await?;
Ok(())
}
}

impl BytesSerializable for Message {
fn as_bytes(&self) -> Bytes {
let mut bytes = BytesMut::with_capacity(self.get_size_bytes() as usize);
Expand Down Expand Up @@ -397,7 +423,32 @@ impl FromStr for Message {
}
}

#[async_trait]
impl WriteTo for SendMessages {
async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError> {
stream.write(self.stream_id.as_bytes().as_ref()).await?;
stream.write(self.topic_id.as_bytes().as_ref()).await?;
stream.write(self.partitioning.as_bytes().as_ref()).await?;
for message in &self.messages {
message.write_to(stream).await?;
}
Ok(())
}
}

impl BytesSerializable for SendMessages {
fn size(&self) -> usize {
let messages_size = self
.messages
.iter()
.map(Message::get_size_bytes)
.sum::<u32>();
messages_size as usize
+ self.partitioning.size()
+ self.stream_id.size()
+ self.topic_id.size()
}

fn as_bytes(&self) -> Bytes {
let messages_size = self
.messages
Expand Down
38 changes: 37 additions & 1 deletion sdk/src/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::binary::binary_client::{BinaryClient, ClientState};
use crate::client::Client;
use crate::error::{IggyError, IggyErrorDiscriminants};
use crate::tcp::config::TcpClientConfig;
use crate::write_to::WriteTo;
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use std::fmt::Debug;
Expand Down Expand Up @@ -36,7 +37,7 @@ unsafe impl Send for TcpClient {}
unsafe impl Sync for TcpClient {}

#[async_trait]
pub(crate) trait ConnectionStream: Debug + Sync + Send {
pub(crate) trait ConnectionStream: Debug + Sync + Send + Unpin {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IggyError>;
async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError>;
async fn flush(&mut self) -> Result<(), IggyError>;
Expand Down Expand Up @@ -214,6 +215,41 @@ impl BinaryClient for TcpClient {
*self.state.lock().await = state;
}

async fn send_with_response_v2(
&self,
command_code: u32,
command: &(dyn WriteTo + Send + Sync),
) -> Result<Bytes, IggyError> {
if self.get_state().await == ClientState::Disconnected {
return Err(IggyError::NotConnected);
}

let mut stream = self.stream.lock().await;
if let Some(stream) = stream.as_mut() {
let payload_length = command.size() + REQUEST_INITIAL_BYTES_LENGTH;
trace!("Sending a TCP request...");
stream.write(&(payload_length as u32).to_le_bytes()).await?;
stream.write(&command_code.to_le_bytes()).await?;
command.write_to(stream.as_mut()).await?;
stream.flush().await?;
trace!("Sent a TCP request, waiting for a response...");

let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH];
let read_bytes = stream.read(&mut response_buffer).await?;
if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH {
error!("Received an invalid or empty response.");
return Err(IggyError::EmptyResponse);
}

let status = u32::from_le_bytes(response_buffer[..4].try_into().unwrap());
let length = u32::from_le_bytes(response_buffer[4..].try_into().unwrap());
return self.handle_response(status, length, stream.as_mut()).await;
}

error!("Cannot send data. Client is not connected.");
Err(IggyError::NotConnected)
}

async fn send_with_response(&self, command: u32, payload: Bytes) -> Result<Bytes, IggyError> {
if self.get_state().await == ClientState::Disconnected {
return Err(IggyError::NotConnected);
Expand Down
12 changes: 12 additions & 0 deletions sdk/src/write_to.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use async_trait::async_trait;

use crate::{
bytes_serializable::BytesSerializable, error::IggyError, tcp::client::ConnectionStream,
};

/// The trait for serializing a struct into a stream.
#[async_trait]
pub(crate) trait WriteTo: BytesSerializable {
/// Serialize the struct into the stream.
async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError>;
}
Loading