Skip to content

Commit

Permalink
Merge pull request #881 from piodul/new-serialize-api-batches
Browse files Browse the repository at this point in the history
serialization: implement new, type-safe variants of BatchValues
  • Loading branch information
Lorak-mmk authored Dec 15, 2023
2 parents 83d78b0 + fbeafd7 commit 99330c8
Show file tree
Hide file tree
Showing 21 changed files with 994 additions and 168 deletions.
3 changes: 1 addition & 2 deletions scylla-cql/src/frame/request/auth_response.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::frame::frame_errors::ParseError;
use bytes::BufMut;

use crate::frame::request::{RequestOpcode, SerializableRequest};
use crate::frame::types::write_bytes_opt;
Expand All @@ -12,7 +11,7 @@ pub struct AuthResponse {
impl SerializableRequest for AuthResponse {
const OPCODE: RequestOpcode = RequestOpcode::AuthResponse;

fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError> {
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError> {
write_bytes_opt(self.response.as_ref(), buf)
}
}
45 changes: 33 additions & 12 deletions scylla-cql/src/frame/request/batch.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use bytes::{Buf, BufMut};
use std::{borrow::Cow, convert::TryInto};

use crate::frame::{
frame_errors::ParseError,
request::{RequestOpcode, SerializableRequest},
types::{self, SerialConsistency},
value::{BatchValues, BatchValuesIterator, LegacySerializedValues},
use crate::{
frame::{
frame_errors::ParseError,
request::{RequestOpcode, SerializableRequest},
types::{self, SerialConsistency},
value::SerializeValuesError,
},
types::serialize::{
raw_batch::{RawBatchValues, RawBatchValuesIterator},
row::SerializedValues,
RowWriter, SerializationError,
},
};

use super::DeserializableRequest;
Expand All @@ -20,7 +27,7 @@ pub struct Batch<'b, Statement, Values>
where
BatchStatement<'b>: From<&'b Statement>,
Statement: Clone,
Values: BatchValues,
Values: RawBatchValues,
{
pub statements: Cow<'b, [Statement]>,
pub batch_type: BatchType,
Expand Down Expand Up @@ -72,11 +79,11 @@ impl<Statement, Values> SerializableRequest for Batch<'_, Statement, Values>
where
for<'s> BatchStatement<'s>: From<&'s Statement>,
Statement: Clone,
Values: BatchValues,
Values: RawBatchValues,
{
const OPCODE: RequestOpcode = RequestOpcode::Batch;

fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError> {
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError> {
// Serializing type of batch
buf.put_u8(self.batch_type as u8);

Expand All @@ -93,9 +100,23 @@ where
let mut value_lists = self.values.batch_values_iter();
for (idx, statement) in self.statements.iter().enumerate() {
BatchStatement::from(statement).serialize(buf)?;

// Reserve two bytes for length
let length_pos = buf.len();
buf.extend_from_slice(&[0, 0]);
let mut row_writer = RowWriter::new(buf);
value_lists
.write_next_to_request(buf)
.serialize_next(&mut row_writer)
.ok_or_else(|| counts_mismatch_err(idx, self.statements.len()))??;
// Go back and put the length
let count: u16 = match row_writer.value_count().try_into() {
Ok(n) => n,
Err(_) => {
return Err(SerializationError::new(SerializeValuesError::TooManyValues).into())
}
};
buf[length_pos..length_pos + 2].copy_from_slice(&count.to_be_bytes());

n_serialized_statements += 1;
}
// At this point, we have all statements serialized. If any values are still left, we have a mismatch.
Expand Down Expand Up @@ -186,7 +207,7 @@ impl<'s, 'b> From<&'s BatchStatement<'b>> for BatchStatement<'s> {
}
}

impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec<LegacySerializedValues>> {
impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec<SerializedValues>> {
fn deserialize(buf: &mut &[u8]) -> Result<Self, ParseError> {
let batch_type = buf.get_u8().try_into()?;

Expand All @@ -196,7 +217,7 @@ impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec<LegacySeria
let batch_statement = BatchStatement::deserialize(buf)?;

// As stated in CQL protocol v4 specification, values names in Batch are broken and should be never used.
let values = LegacySerializedValues::new_from_frame(buf, false)?;
let values = SerializedValues::new_from_frame(buf)?;

Ok((batch_statement, values))
})
Expand Down Expand Up @@ -233,7 +254,7 @@ impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec<LegacySeria
.then(|| types::read_long(buf))
.transpose()?;

let (statements, values): (Vec<BatchStatement>, Vec<LegacySerializedValues>) =
let (statements, values): (Vec<BatchStatement>, Vec<SerializedValues>) =
statements_with_values.into_iter().unzip();

Ok(Self {
Expand Down
4 changes: 2 additions & 2 deletions scylla-cql/src/frame/request/execute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::frame::frame_errors::ParseError;
use bytes::{BufMut, Bytes};
use bytes::Bytes;

use crate::{
frame::request::{query, RequestOpcode, SerializableRequest},
Expand All @@ -17,7 +17,7 @@ pub struct Execute<'a> {
impl SerializableRequest for Execute<'_> {
const OPCODE: RequestOpcode = RequestOpcode::Execute;

fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError> {
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError> {
// Serializing statement id
types::write_short_bytes(&self.id[..], buf)?;

Expand Down
14 changes: 7 additions & 7 deletions scylla-cql/src/frame/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ pub mod query;
pub mod register;
pub mod startup;

use crate::types::serialize::row::SerializedValues;
use crate::{frame::frame_errors::ParseError, Consistency};
use bytes::{BufMut, Bytes};
use bytes::Bytes;
use num_enum::TryFromPrimitive;

pub use auth_response::AuthResponse;
Expand All @@ -22,7 +23,6 @@ pub use startup::Startup;
use self::batch::BatchStatement;

use super::types::SerialConsistency;
use super::value::LegacySerializedValues;

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, TryFromPrimitive)]
#[repr(u8)]
Expand All @@ -40,7 +40,7 @@ pub enum RequestOpcode {
pub trait SerializableRequest {
const OPCODE: RequestOpcode;

fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError>;
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError>;

fn to_bytes(&self) -> Result<Bytes, ParseError> {
let mut v = Vec::new();
Expand All @@ -59,7 +59,7 @@ pub trait DeserializableRequest: SerializableRequest + Sized {
pub enum Request<'r> {
Query(Query<'r>),
Execute(Execute<'r>),
Batch(Batch<'r, BatchStatement<'r>, Vec<LegacySerializedValues>>),
Batch(Batch<'r, BatchStatement<'r>, Vec<SerializedValues>>),
}

impl<'r> Request<'r> {
Expand Down Expand Up @@ -190,8 +190,8 @@ mod tests {

// Not execute's values, because named values are not supported in batches.
values: vec![
query.parameters.values.deref().to_old_serialized_values(),
query.parameters.values.deref().to_old_serialized_values(),
query.parameters.values.deref().clone(),
query.parameters.values.deref().clone(),
],
};
{
Expand Down Expand Up @@ -262,7 +262,7 @@ mod tests {
serial_consistency: None,
timestamp: None,

values: vec![query.parameters.values.deref().to_old_serialized_values()],
values: vec![query.parameters.values.deref().clone()],
};
{
let mut buf = Vec::new();
Expand Down
3 changes: 1 addition & 2 deletions scylla-cql/src/frame/request/options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::frame::frame_errors::ParseError;
use bytes::BufMut;

use crate::frame::request::{RequestOpcode, SerializableRequest};

Expand All @@ -8,7 +7,7 @@ pub struct Options;
impl SerializableRequest for Options {
const OPCODE: RequestOpcode = RequestOpcode::Options;

fn serialize(&self, _buf: &mut impl BufMut) -> Result<(), ParseError> {
fn serialize(&self, _buf: &mut Vec<u8>) -> Result<(), ParseError> {
Ok(())
}
}
3 changes: 1 addition & 2 deletions scylla-cql/src/frame/request/prepare.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::frame::frame_errors::ParseError;
use bytes::BufMut;

use crate::{
frame::request::{RequestOpcode, SerializableRequest},
Expand All @@ -13,7 +12,7 @@ pub struct Prepare<'a> {
impl<'a> SerializableRequest for Prepare<'a> {
const OPCODE: RequestOpcode = RequestOpcode::Prepare;

fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError> {
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError> {
types::write_long_string(self.query, buf)?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion scylla-cql/src/frame/request/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Query<'q> {
impl SerializableRequest for Query<'_> {
const OPCODE: RequestOpcode = RequestOpcode::Query;

fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError> {
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError> {
types::write_long_string(&self.contents, buf)?;
self.parameters.serialize(buf)?;
Ok(())
Expand Down
4 changes: 1 addition & 3 deletions scylla-cql/src/frame/request/register.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use bytes::BufMut;

use crate::frame::{
frame_errors::ParseError,
request::{RequestOpcode, SerializableRequest},
Expand All @@ -14,7 +12,7 @@ pub struct Register {
impl SerializableRequest for Register {
const OPCODE: RequestOpcode = RequestOpcode::Register;

fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError> {
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError> {
let event_types_list = self
.event_types_to_register_for
.iter()
Expand Down
3 changes: 1 addition & 2 deletions scylla-cql/src/frame/request/startup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::frame::frame_errors::ParseError;
use bytes::BufMut;

use std::collections::HashMap;

Expand All @@ -15,7 +14,7 @@ pub struct Startup {
impl SerializableRequest for Startup {
const OPCODE: RequestOpcode = RequestOpcode::Startup;

fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError> {
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError> {
types::write_string_map(&self.options, buf)?;
Ok(())
}
Expand Down
Loading

0 comments on commit 99330c8

Please sign in to comment.