Skip to content

Commit

Permalink
RUST-877 Delay replacement document serialization until `Operation::b…
Browse files Browse the repository at this point in the history
…uild` (#942)
  • Loading branch information
isabelatkinson authored Aug 31, 2023
1 parent 289443e commit 5b54bb6
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 909 deletions.
31 changes: 24 additions & 7 deletions src/bson_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use std::{
io::{Read, Write},
};

use bson::RawBsonRef;

use crate::{
bson::{Bson, Document},
bson::{Bson, Document, RawArrayBuf, RawBson, RawBsonRef, RawDocumentBuf},
error::{ErrorKind, Result},
runtime::SyncLittleEndianRead,
};
Expand Down Expand Up @@ -48,6 +46,14 @@ pub(crate) fn to_bson_array(docs: &[Document]) -> Bson {
Bson::Array(docs.iter().map(|doc| Bson::Document(doc.clone())).collect())
}

pub(crate) fn to_raw_bson_array(docs: &[Document]) -> Result<RawBson> {
let mut array = RawArrayBuf::new();
for doc in docs {
array.push(RawDocumentBuf::from_document(doc)?);
}
Ok(RawBson::Array(array))
}

#[cfg(test)]
pub(crate) fn sort_document(document: &mut Document) {
let temp = std::mem::take(document);
Expand All @@ -62,11 +68,11 @@ pub(crate) fn first_key(document: &Document) -> Option<&str> {
document.keys().next().map(String::as_str)
}

pub(crate) fn replacement_document_check(replacement: &Document) -> Result<()> {
match first_key(replacement) {
Some(s) if !s.starts_with('$') => Ok(()),
pub(crate) fn replacement_raw_document_check(replacement: &RawDocumentBuf) -> Result<()> {
match replacement.iter().next().transpose()? {
Some((key, _)) if !key.starts_with('$') => Ok(()),
_ => Err(ErrorKind::InvalidArgument {
message: "replace document must have first key not starting with '$".to_string(),
message: "replace document must have first key not starting with '$'".to_string(),
}
.into()),
}
Expand Down Expand Up @@ -119,6 +125,17 @@ pub(crate) fn read_document_bytes<R: Read>(mut reader: R) -> Result<Vec<u8>> {
Ok(bytes)
}

pub(crate) fn extend_raw_document_buf(
this: &mut RawDocumentBuf,
other: RawDocumentBuf,
) -> Result<()> {
for result in other.iter() {
let (k, v) = result?;
this.append(k, v.to_raw_bson());
}
Ok(())
}

#[cfg(test)]
mod test {
use crate::bson_util::num_decimal_digits;
Expand Down
16 changes: 15 additions & 1 deletion src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};

use super::wire::Message;
use crate::{
bson::Document,
bson::{rawdoc, Document},
bson_util::extend_raw_document_buf,
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
error::{Error, ErrorKind, Result},
hello::{HelloCommandResponse, HelloReply},
Expand Down Expand Up @@ -177,6 +178,19 @@ impl<T> Command<T> {
}
}

impl Command<RawDocumentBuf> {
pub(crate) fn into_bson_bytes(mut self) -> Result<Vec<u8>> {
let mut command = self.body;

// Clear the body of the command to avoid re-serializing.
self.body = rawdoc! {};
let rest_of_command = bson::to_raw_document_buf(&self)?;

extend_raw_document_buf(&mut command, rest_of_command)?;
Ok(command.into_bytes())
}
}

#[derive(Debug, Clone)]
pub(crate) struct RawCommandResponse {
pub(crate) source: ServerAddress,
Expand Down
53 changes: 29 additions & 24 deletions src/coll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use serde::{

use self::options::*;
use crate::{
bson::{doc, to_document_with_options, Bson, Document, SerializerOptions},
bson::{doc, Bson, Document},
bson_util,
change_stream::{
event::ChangeStreamEvent,
Expand Down Expand Up @@ -753,7 +753,14 @@ impl<T> Collection<T> {
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;

let update = Update::new(self.namespace(), query, update, true, options);
let update = Update::with_update(
self.namespace(),
query,
update,
true,
options,
self.inner.human_readable_serialization,
);
self.client().execute_operation(update, session).await
}

Expand Down Expand Up @@ -806,7 +813,14 @@ impl<T> Collection<T> {
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;

let update = Update::new(self.namespace(), query, update, false, options);
let update = Update::with_update(
self.namespace(),
query,
update,
false,
options,
self.inner.human_readable_serialization,
);
self.client().execute_operation(update, session).await
}

Expand Down Expand Up @@ -1018,7 +1032,7 @@ where
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;

let op = FindAndModify::<T>::with_delete(self.namespace(), filter, options);
let op = FindAndModify::with_delete(self.namespace(), filter, options);
self.client().execute_operation(op, session).await
}

Expand Down Expand Up @@ -1067,7 +1081,7 @@ where
let mut options = options.into();
resolve_write_concern_with_session!(self, options, session.as_ref())?;

let op = FindAndModify::<T>::with_update(self.namespace(), filter, update, options)?;
let op = FindAndModify::with_update(self.namespace(), filter, update, options)?;
self.client().execute_operation(op, session).await
}

Expand Down Expand Up @@ -1123,18 +1137,16 @@ where
session: impl Into<Option<&mut ClientSession>>,
) -> Result<Option<T>> {
let mut options = options.into();
let replacement = to_document_with_options(
replacement.borrow(),
SerializerOptions::builder()
.human_readable(self.inner.human_readable_serialization)
.build(),
)?;

let session = session.into();

resolve_write_concern_with_session!(self, options, session.as_ref())?;

let op = FindAndModify::<T>::with_replace(self.namespace(), filter, replacement, options)?;
let op = FindAndModify::with_replace(
self.namespace(),
filter,
replacement.borrow(),
options,
self.inner.human_readable_serialization,
)?;
self.client().execute_operation(op, session).await
}

Expand Down Expand Up @@ -1395,25 +1407,18 @@ where
session: impl Into<Option<&mut ClientSession>>,
) -> Result<UpdateResult> {
let mut options = options.into();
let replacement = to_document_with_options(
replacement.borrow(),
SerializerOptions::builder()
.human_readable(self.inner.human_readable_serialization)
.build(),
)?;

bson_util::replacement_document_check(&replacement)?;

let session = session.into();

resolve_write_concern_with_session!(self, options, session.as_ref())?;

let update = Update::new(
let update = Update::with_replace(
self.namespace(),
query,
UpdateModifications::Document(replacement),
replacement.borrow(),
false,
options.map(UpdateOptions::from_replace_options),
self.inner.human_readable_serialization,
);
self.client().execute_operation(update, session).await
}
Expand Down
27 changes: 8 additions & 19 deletions src/coll/options.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::time::Duration;

use bson::serde_helpers;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use serde_with::skip_serializing_none;
use typed_builder::TypedBuilder;

use crate::{
bson::{doc, Bson, Document},
bson::{doc, serde_helpers, Bson, Document, RawBson, RawDocumentBuf},
concern::{ReadConcern, WriteConcern},
error::Result,
options::Collation,
selection_criteria::SelectionCriteria,
serde_util,
Expand Down Expand Up @@ -63,7 +63,7 @@ impl<'de> Deserialize<'de> for ReturnDocument {
}

/// Specifies the index to use for an operation.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(untagged)]
#[non_exhaustive]
pub enum Hint {
Expand All @@ -74,11 +74,11 @@ pub enum Hint {
}

impl Hint {
pub(crate) fn to_bson(&self) -> Bson {
match self {
Hint::Keys(ref d) => Bson::Document(d.clone()),
Hint::Name(ref s) => Bson::String(s.clone()),
}
pub(crate) fn to_raw_bson(&self) -> Result<RawBson> {
Ok(match self {
Hint::Keys(ref d) => RawBson::Document(RawDocumentBuf::from_document(d)?),
Hint::Name(ref s) => RawBson::String(s.clone()),
})
}
}

Expand Down Expand Up @@ -174,17 +174,6 @@ pub enum UpdateModifications {
Pipeline(Vec<Document>),
}

impl UpdateModifications {
pub(crate) fn to_bson(&self) -> Bson {
match self {
UpdateModifications::Document(ref d) => Bson::Document(d.clone()),
UpdateModifications::Pipeline(ref p) => {
Bson::Array(p.iter().map(|d| Bson::Document(d.clone())).collect())
}
}
}
}

impl From<Document> for UpdateModifications {
fn from(item: Document) -> Self {
UpdateModifications::Document(item)
Expand Down
38 changes: 19 additions & 19 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::{
bson::{self, Bson, Document},
bson_util,
bson_util::{self, extend_raw_document_buf},
client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
error::{
Expand Down Expand Up @@ -73,7 +73,7 @@ pub(crate) use list_indexes::ListIndexes;
pub(crate) use raw_output::RawOutput;
pub(crate) use run_command::RunCommand;
pub(crate) use run_cursor_command::RunCursorCommand;
pub(crate) use update::Update;
pub(crate) use update::{Update, UpdateOrReplace};

const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
Expand Down Expand Up @@ -224,28 +224,28 @@ impl From<CommandErrorBody> for Error {
}
}

/// Appends a serializable struct to the input document.
/// The serializable struct MUST serialize to a Document, otherwise an error will be thrown.
/// Appends a serializable struct to the input document. The serializable struct MUST serialize to a
/// Document; otherwise, an error will be thrown.
pub(crate) fn append_options<T: Serialize + Debug>(
doc: &mut Document,
options: Option<&T>,
) -> Result<()> {
match options {
Some(options) => {
let temp_doc = bson::to_bson(options)?;
match temp_doc {
Bson::Document(d) => {
doc.extend(d);
Ok(())
}
_ => Err(ErrorKind::Internal {
message: format!("options did not serialize to a Document: {:?}", options),
}
.into()),
}
}
None => Ok(()),
if let Some(options) = options {
let options_doc = bson::to_document(options)?;
doc.extend(options_doc);
}
Ok(())
}

pub(crate) fn append_options_to_raw_document<T: Serialize>(
doc: &mut RawDocumentBuf,
options: Option<&T>,
) -> Result<()> {
if let Some(options) = options {
let options_raw_doc = bson::to_raw_document_buf(options)?;
extend_raw_document_buf(doc, options_raw_doc)?;
}
Ok(())
}

#[derive(Deserialize, Debug)]
Expand Down
Loading

0 comments on commit 5b54bb6

Please sign in to comment.