diff --git a/src/bson_util.rs b/src/bson_util.rs index 8a9b255f4..d5f4b5047 100644 --- a/src/bson_util.rs +++ b/src/bson_util.rs @@ -3,10 +3,8 @@ use std::{ io::{Read, Write}, }; -use bson::RawBsonRef; - use crate::{ - bson::{Bson, Document}, + bson::{Bson, Document, RawArrayBuf, RawBson, RawBsonRef, RawDocumentBuf}, error::{ErrorKind, Result}, runtime::SyncLittleEndianRead, }; @@ -48,6 +46,14 @@ pub(crate) fn to_bson_array(docs: &[Document]) -> Bson { Bson::Array(docs.iter().map(|doc| Bson::Document(doc.clone())).collect()) } +pub(crate) fn to_raw_bson_array(docs: &[Document]) -> Result { + let mut array = RawArrayBuf::new(); + for doc in docs { + array.push(RawDocumentBuf::from_document(doc)?); + } + Ok(RawBson::Array(array)) +} + #[cfg(test)] pub(crate) fn sort_document(document: &mut Document) { let temp = std::mem::take(document); @@ -62,11 +68,11 @@ pub(crate) fn first_key(document: &Document) -> Option<&str> { document.keys().next().map(String::as_str) } -pub(crate) fn replacement_document_check(replacement: &Document) -> Result<()> { - match first_key(replacement) { - Some(s) if !s.starts_with('$') => Ok(()), +pub(crate) fn replacement_raw_document_check(replacement: &RawDocumentBuf) -> Result<()> { + match replacement.iter().next().transpose()? { + Some((key, _)) if !key.starts_with('$') => Ok(()), _ => Err(ErrorKind::InvalidArgument { - message: "replace document must have first key not starting with '$".to_string(), + message: "replace document must have first key not starting with '$'".to_string(), } .into()), } @@ -119,6 +125,17 @@ pub(crate) fn read_document_bytes(mut reader: R) -> Result> { Ok(bytes) } +pub(crate) fn extend_raw_document_buf( + this: &mut RawDocumentBuf, + other: RawDocumentBuf, +) -> Result<()> { + for result in other.iter() { + let (k, v) = result?; + this.append(k, v.to_raw_bson()); + } + Ok(()) +} + #[cfg(test)] mod test { use crate::bson_util::num_decimal_digits; diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index fa6ee8a25..311d7d42c 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -3,7 +3,8 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use super::wire::Message; use crate::{ - bson::Document, + bson::{rawdoc, Document}, + bson_util::extend_raw_document_buf, client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS}, error::{Error, ErrorKind, Result}, hello::{HelloCommandResponse, HelloReply}, @@ -177,6 +178,19 @@ impl Command { } } +impl Command { + pub(crate) fn into_bson_bytes(mut self) -> Result> { + let mut command = self.body; + + // Clear the body of the command to avoid re-serializing. + self.body = rawdoc! {}; + let rest_of_command = bson::to_raw_document_buf(&self)?; + + extend_raw_document_buf(&mut command, rest_of_command)?; + Ok(command.into_bytes()) + } +} + #[derive(Debug, Clone)] pub(crate) struct RawCommandResponse { pub(crate) source: ServerAddress, diff --git a/src/coll.rs b/src/coll.rs index 7378015aa..77e193917 100644 --- a/src/coll.rs +++ b/src/coll.rs @@ -15,7 +15,7 @@ use serde::{ use self::options::*; use crate::{ - bson::{doc, to_document_with_options, Bson, Document, SerializerOptions}, + bson::{doc, Bson, Document}, bson_util, change_stream::{ event::ChangeStreamEvent, @@ -753,7 +753,14 @@ impl Collection { let mut options = options.into(); resolve_write_concern_with_session!(self, options, session.as_ref())?; - let update = Update::new(self.namespace(), query, update, true, options); + let update = Update::with_update( + self.namespace(), + query, + update, + true, + options, + self.inner.human_readable_serialization, + ); self.client().execute_operation(update, session).await } @@ -806,7 +813,14 @@ impl Collection { let mut options = options.into(); resolve_write_concern_with_session!(self, options, session.as_ref())?; - let update = Update::new(self.namespace(), query, update, false, options); + let update = Update::with_update( + self.namespace(), + query, + update, + false, + options, + self.inner.human_readable_serialization, + ); self.client().execute_operation(update, session).await } @@ -1018,7 +1032,7 @@ where let mut options = options.into(); resolve_write_concern_with_session!(self, options, session.as_ref())?; - let op = FindAndModify::::with_delete(self.namespace(), filter, options); + let op = FindAndModify::with_delete(self.namespace(), filter, options); self.client().execute_operation(op, session).await } @@ -1067,7 +1081,7 @@ where let mut options = options.into(); resolve_write_concern_with_session!(self, options, session.as_ref())?; - let op = FindAndModify::::with_update(self.namespace(), filter, update, options)?; + let op = FindAndModify::with_update(self.namespace(), filter, update, options)?; self.client().execute_operation(op, session).await } @@ -1123,18 +1137,16 @@ where session: impl Into>, ) -> Result> { let mut options = options.into(); - let replacement = to_document_with_options( - replacement.borrow(), - SerializerOptions::builder() - .human_readable(self.inner.human_readable_serialization) - .build(), - )?; - let session = session.into(); - resolve_write_concern_with_session!(self, options, session.as_ref())?; - let op = FindAndModify::::with_replace(self.namespace(), filter, replacement, options)?; + let op = FindAndModify::with_replace( + self.namespace(), + filter, + replacement.borrow(), + options, + self.inner.human_readable_serialization, + )?; self.client().execute_operation(op, session).await } @@ -1395,25 +1407,18 @@ where session: impl Into>, ) -> Result { let mut options = options.into(); - let replacement = to_document_with_options( - replacement.borrow(), - SerializerOptions::builder() - .human_readable(self.inner.human_readable_serialization) - .build(), - )?; - - bson_util::replacement_document_check(&replacement)?; let session = session.into(); resolve_write_concern_with_session!(self, options, session.as_ref())?; - let update = Update::new( + let update = Update::with_replace( self.namespace(), query, - UpdateModifications::Document(replacement), + replacement.borrow(), false, options.map(UpdateOptions::from_replace_options), + self.inner.human_readable_serialization, ); self.client().execute_operation(update, session).await } diff --git a/src/coll/options.rs b/src/coll/options.rs index eeb53c21c..861f310cf 100644 --- a/src/coll/options.rs +++ b/src/coll/options.rs @@ -1,13 +1,13 @@ use std::time::Duration; -use bson::serde_helpers; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use serde_with::skip_serializing_none; use typed_builder::TypedBuilder; use crate::{ - bson::{doc, Bson, Document}, + bson::{doc, serde_helpers, Bson, Document, RawBson, RawDocumentBuf}, concern::{ReadConcern, WriteConcern}, + error::Result, options::Collation, selection_criteria::SelectionCriteria, serde_util, @@ -63,7 +63,7 @@ impl<'de> Deserialize<'de> for ReturnDocument { } /// Specifies the index to use for an operation. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] #[serde(untagged)] #[non_exhaustive] pub enum Hint { @@ -74,11 +74,11 @@ pub enum Hint { } impl Hint { - pub(crate) fn to_bson(&self) -> Bson { - match self { - Hint::Keys(ref d) => Bson::Document(d.clone()), - Hint::Name(ref s) => Bson::String(s.clone()), - } + pub(crate) fn to_raw_bson(&self) -> Result { + Ok(match self { + Hint::Keys(ref d) => RawBson::Document(RawDocumentBuf::from_document(d)?), + Hint::Name(ref s) => RawBson::String(s.clone()), + }) } } @@ -174,17 +174,6 @@ pub enum UpdateModifications { Pipeline(Vec), } -impl UpdateModifications { - pub(crate) fn to_bson(&self) -> Bson { - match self { - UpdateModifications::Document(ref d) => Bson::Document(d.clone()), - UpdateModifications::Pipeline(ref p) => { - Bson::Array(p.iter().map(|d| Bson::Document(d.clone())).collect()) - } - } - } -} - impl From for UpdateModifications { fn from(item: Document) -> Self { UpdateModifications::Document(item) diff --git a/src/operation.rs b/src/operation.rs index 22d1e6a9f..67358171c 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -32,7 +32,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::{ bson::{self, Bson, Document}, - bson_util, + bson_util::{self, extend_raw_document_buf}, client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS}, cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, error::{ @@ -73,7 +73,7 @@ pub(crate) use list_indexes::ListIndexes; pub(crate) use raw_output::RawOutput; pub(crate) use run_command::RunCommand; pub(crate) use run_cursor_command::RunCursorCommand; -pub(crate) use update::Update; +pub(crate) use update::{Update, UpdateOrReplace}; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; const SERVER_4_4_0_WIRE_VERSION: i32 = 9; @@ -224,28 +224,28 @@ impl From for Error { } } -/// Appends a serializable struct to the input document. -/// The serializable struct MUST serialize to a Document, otherwise an error will be thrown. +/// Appends a serializable struct to the input document. The serializable struct MUST serialize to a +/// Document; otherwise, an error will be thrown. pub(crate) fn append_options( doc: &mut Document, options: Option<&T>, ) -> Result<()> { - match options { - Some(options) => { - let temp_doc = bson::to_bson(options)?; - match temp_doc { - Bson::Document(d) => { - doc.extend(d); - Ok(()) - } - _ => Err(ErrorKind::Internal { - message: format!("options did not serialize to a Document: {:?}", options), - } - .into()), - } - } - None => Ok(()), + if let Some(options) = options { + let options_doc = bson::to_document(options)?; + doc.extend(options_doc); + } + Ok(()) +} + +pub(crate) fn append_options_to_raw_document( + doc: &mut RawDocumentBuf, + options: Option<&T>, +) -> Result<()> { + if let Some(options) = options { + let options_raw_doc = bson::to_raw_document_buf(options)?; + extend_raw_document_buf(doc, options_raw_doc)?; } + Ok(()) } #[derive(Deserialize, Debug)] diff --git a/src/operation/find_and_modify.rs b/src/operation/find_and_modify.rs index bdc4815e0..7e962d89d 100644 --- a/src/operation/find_and_modify.rs +++ b/src/operation/find_and_modify.rs @@ -1,14 +1,12 @@ mod options; -#[cfg(test)] -mod test; use std::fmt::Debug; -use serde::{de::DeserializeOwned, Deserialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use self::options::FindAndModifyOptions; use crate::{ - bson::{doc, from_document, Bson, Document}, + bson::{doc, from_document, rawdoc, Bson, Document, RawDocumentBuf}, bson_util, cmap::{Command, RawCommandResponse, StreamDescription}, coll::{ @@ -21,105 +19,116 @@ use crate::{ Namespace, }, error::{ErrorKind, Result}, - operation::{append_options, remove_empty_write_concern, OperationWithDefaults, Retryability}, + operation::{ + append_options_to_raw_document, + find_and_modify::options::Modification, + remove_empty_write_concern, + OperationWithDefaults, + Retryability, + }, options::WriteConcern, }; -pub(crate) struct FindAndModify -where - T: DeserializeOwned, -{ +pub(crate) struct FindAndModify<'a, R, T: DeserializeOwned> { ns: Namespace, query: Document, - options: FindAndModifyOptions, + modification: Modification<'a, R>, + human_readable_serialization: Option, + options: Option, _phantom: std::marker::PhantomData, } -impl FindAndModify -where - T: DeserializeOwned, -{ +impl FindAndModify<'_, (), T> { pub fn with_delete( ns: Namespace, query: Document, options: Option, ) -> Self { - let options = - FindAndModifyOptions::from_find_one_and_delete_options(options.unwrap_or_default()); FindAndModify { ns, query, - options, + modification: Modification::Delete, + human_readable_serialization: None, + options: options.map(Into::into), _phantom: Default::default(), } } - pub fn with_replace( + pub fn with_update( ns: Namespace, query: Document, - replacement: Document, - options: Option, + update: UpdateModifications, + options: Option, ) -> Result { - bson_util::replacement_document_check(&replacement)?; - let options = FindAndModifyOptions::from_find_one_and_replace_options( - replacement, - options.unwrap_or_default(), - ); + if let UpdateModifications::Document(ref d) = update { + bson_util::update_document_check(d)?; + }; Ok(FindAndModify { ns, query, - options, + modification: Modification::Update(update.into()), + human_readable_serialization: None, + options: options.map(Into::into), _phantom: Default::default(), }) } +} - pub fn with_update( +impl<'a, R: Serialize, T: DeserializeOwned> FindAndModify<'a, R, T> { + pub fn with_replace( ns: Namespace, query: Document, - update: UpdateModifications, - options: Option, + replacement: &'a R, + options: Option, + human_readable_serialization: bool, ) -> Result { - if let UpdateModifications::Document(ref d) = update { - bson_util::update_document_check(d)?; - }; - let options = FindAndModifyOptions::from_find_one_and_update_options( - update, - options.unwrap_or_default(), - ); Ok(FindAndModify { ns, query, - options, + modification: Modification::Update(replacement.into()), + human_readable_serialization: Some(human_readable_serialization), + options: options.map(Into::into), _phantom: Default::default(), }) } } -impl OperationWithDefaults for FindAndModify -where - T: DeserializeOwned, -{ +impl<'a, R: Serialize, T: DeserializeOwned> OperationWithDefaults for FindAndModify<'a, R, T> { type O = Option; - type Command = Document; + type Command = RawDocumentBuf; const NAME: &'static str = "findAndModify"; - fn build(&mut self, description: &StreamDescription) -> Result { - if self.options.hint.is_some() && description.max_wire_version.unwrap_or(0) < 8 { - return Err(ErrorKind::InvalidArgument { - message: "Specifying a hint to find_one_and_x is not supported on server versions \ - < 4.4" - .to_string(), + fn build(&mut self, description: &StreamDescription) -> Result> { + if let Some(ref options) = self.options { + if options.hint.is_some() && description.max_wire_version.unwrap_or(0) < 8 { + return Err(ErrorKind::InvalidArgument { + message: "Specifying a hint to find_one_and_x is not supported on server \ + versions < 4.4" + .to_string(), + } + .into()); } - .into()); } - let mut body: Document = doc! { + let mut body = rawdoc! { Self::NAME: self.ns.coll.clone(), - "query": self.query.clone(), + "query": RawDocumentBuf::from_document(&self.query)?, }; - remove_empty_write_concern!(Some(&mut self.options)); - append_options(&mut body, Some(&self.options).as_ref())?; + let (key, modification) = match &self.modification { + Modification::Delete => ("remove", true.into()), + Modification::Update(update_or_replace) => ( + "update", + update_or_replace + .to_raw_bson(self.human_readable_serialization.unwrap_or_default())?, + ), + }; + body.append(key, modification); + + if let Some(ref mut options) = self.options { + remove_empty_write_concern!(Some(options)); + } + append_options_to_raw_document(&mut body, self.options.as_ref())?; Ok(Command::new( Self::NAME.to_string(), @@ -128,6 +137,10 @@ where )) } + fn serialize_command(&mut self, cmd: Command) -> Result> { + cmd.into_bson_bytes() + } + fn handle_response( &self, response: RawCommandResponse, @@ -150,7 +163,7 @@ where } fn write_concern(&self) -> Option<&WriteConcern> { - self.options.write_concern.as_ref() + self.options.as_ref().and_then(|o| o.write_concern.as_ref()) } fn retryability(&self) -> Retryability { diff --git a/src/operation/find_and_modify/options.rs b/src/operation/find_and_modify/options.rs index 6c46cf0f2..fcdb90af0 100644 --- a/src/operation/find_and_modify/options.rs +++ b/src/operation/find_and_modify/options.rs @@ -11,19 +11,17 @@ use crate::{ FindOneAndUpdateOptions, Hint, ReturnDocument, - UpdateModifications, }, collation::Collation, concern::WriteConcern, + operation::UpdateOrReplace, serde_util, }; -#[derive(Clone, Debug, Serialize)] -pub(super) enum Modification { - #[serde(rename = "remove", serialize_with = "serde_util::serialize_true")] +#[derive(Clone, Debug)] +pub(super) enum Modification<'a, T> { Delete, - #[serde(rename = "update")] - Update(UpdateModifications), + Update(UpdateOrReplace<'a, T>), } #[serde_with::skip_serializing_none] @@ -31,9 +29,6 @@ pub(super) enum Modification { #[builder(field_defaults(setter(into)))] #[serde(rename_all = "camelCase")] pub(super) struct FindAndModifyOptions { - #[serde(flatten)] - pub(crate) modification: Modification, - #[builder(default)] pub(crate) sort: Option, @@ -77,71 +72,60 @@ pub(super) struct FindAndModifyOptions { pub(crate) comment: Option, } -impl FindAndModifyOptions { - pub(super) fn from_find_one_and_delete_options( - opts: FindOneAndDeleteOptions, - ) -> FindAndModifyOptions { - let mut modify_opts = FindAndModifyOptions::builder() - .modification(Modification::Delete) - .build(); - - modify_opts.collation = opts.collation; - modify_opts.max_time = opts.max_time; - modify_opts.projection = opts.projection; - modify_opts.sort = opts.sort; - modify_opts.write_concern = opts.write_concern; - modify_opts.hint = opts.hint; - modify_opts.let_vars = opts.let_vars; - modify_opts.comment = opts.comment; - modify_opts +impl From for FindAndModifyOptions { + fn from(options: FindOneAndDeleteOptions) -> Self { + Self { + sort: options.sort, + new: None, + upsert: None, + bypass_document_validation: None, + write_concern: options.write_concern, + array_filters: None, + max_time: options.max_time, + projection: options.projection, + collation: options.collation, + hint: options.hint, + let_vars: options.let_vars, + comment: options.comment, + } } +} - pub(super) fn from_find_one_and_replace_options( - replacement: Document, - opts: FindOneAndReplaceOptions, - ) -> FindAndModifyOptions { - let replacement = UpdateModifications::Document(replacement); - let mut modify_opts = FindAndModifyOptions::builder() - .modification(Modification::Update(replacement)) - .build(); - - modify_opts.collation = opts.collation; - modify_opts.bypass_document_validation = opts.bypass_document_validation; - modify_opts.max_time = opts.max_time; - modify_opts.projection = opts.projection; - modify_opts.new = return_document_to_bool(opts.return_document); - modify_opts.sort = opts.sort; - modify_opts.upsert = opts.upsert; - modify_opts.write_concern = opts.write_concern; - modify_opts.hint = opts.hint; - modify_opts.let_vars = opts.let_vars; - modify_opts.comment = opts.comment; - - modify_opts +impl From for FindAndModifyOptions { + fn from(options: FindOneAndUpdateOptions) -> Self { + Self { + sort: options.sort, + new: return_document_to_bool(options.return_document), + upsert: options.upsert, + bypass_document_validation: options.bypass_document_validation, + write_concern: options.write_concern, + array_filters: options.array_filters, + max_time: options.max_time, + projection: options.projection, + collation: options.collation, + hint: options.hint, + let_vars: options.let_vars, + comment: options.comment, + } } +} - pub(super) fn from_find_one_and_update_options( - update: UpdateModifications, - opts: FindOneAndUpdateOptions, - ) -> FindAndModifyOptions { - let mut modify_opts = FindAndModifyOptions::builder() - .modification(Modification::Update(update)) - .build(); - - modify_opts.collation = opts.collation; - modify_opts.array_filters = opts.array_filters; - modify_opts.bypass_document_validation = opts.bypass_document_validation; - modify_opts.max_time = opts.max_time; - modify_opts.projection = opts.projection; - modify_opts.new = return_document_to_bool(opts.return_document); - modify_opts.sort = opts.sort; - modify_opts.upsert = opts.upsert; - modify_opts.write_concern = opts.write_concern; - modify_opts.hint = opts.hint; - modify_opts.let_vars = opts.let_vars; - modify_opts.comment = opts.comment; - - modify_opts +impl From for FindAndModifyOptions { + fn from(options: FindOneAndReplaceOptions) -> Self { + Self { + sort: options.sort, + new: return_document_to_bool(options.return_document), + upsert: options.upsert, + bypass_document_validation: options.bypass_document_validation, + write_concern: options.write_concern, + array_filters: None, + max_time: options.max_time, + projection: options.projection, + collation: options.collation, + hint: options.hint, + let_vars: options.let_vars, + comment: options.comment, + } } } diff --git a/src/operation/find_and_modify/test.rs b/src/operation/find_and_modify/test.rs deleted file mode 100644 index 3c95ab9f6..000000000 --- a/src/operation/find_and_modify/test.rs +++ /dev/null @@ -1,513 +0,0 @@ -use std::time::Duration; - -use crate::{ - bson::{doc, oid::ObjectId, Bson, Document}, - bson_util, - cmap::StreamDescription, - coll::options::ReturnDocument, - operation::{test::handle_response_test, FindAndModify, Operation}, - options::{ - FindOneAndDeleteOptions, - FindOneAndReplaceOptions, - FindOneAndUpdateOptions, - Hint, - UpdateModifications, - }, - Namespace, -}; - -// delete tests - -fn empty_delete() -> FindAndModify { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! {}; - FindAndModify::with_delete(ns, filter, None) -} - -#[test] -fn build_with_delete_hint() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { - "x": 2, - "y": { "$gt": 1 }, - }; - - let options = FindOneAndDeleteOptions { - hint: Some(Hint::Keys(doc! { "x": 1, "y": -1 })), - ..Default::default() - }; - - let mut op = FindAndModify::::with_delete(ns, filter.clone(), Some(options)); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "hint": { - "x": 1, - "y": -1, - }, - "remove": true - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn build_with_delete_no_options() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - - let mut op = FindAndModify::::with_delete(ns, filter.clone(), None); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "remove": true - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn build_with_delete() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let max_time = Duration::from_millis(2u64); - let options = FindOneAndDeleteOptions { - max_time: Some(max_time), - ..Default::default() - }; - - let mut op = FindAndModify::::with_delete(ns, filter.clone(), Some(options)); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "maxTimeMS": max_time.as_millis() as i32, - "remove": true - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn handle_success_delete() { - let op = empty_delete(); - let value = doc! { - "_id" : Bson::ObjectId(ObjectId::new()), - "name" : "Tom", - "state" : "active", - "rating" : 100, - "score" : 5 - }; - let ok_response = doc! { - "lastErrorObject" : { - "connectionId" : 1, - "updatedExisting" : true, - "n" : 1, - "syncMillis" : 0, - "writtenTo" : null, - "err" : null, - "ok" : 1 - }, - "value" : value.clone(), - "ok" : 1 - }; - - let result = handle_response_test(&op, ok_response).unwrap(); - assert_eq!(result.unwrap(), value); -} - -#[test] -fn handle_null_value_delete() { - let op = empty_delete(); - - let result = handle_response_test(&op, doc! { "ok": 1.0, "value": Bson::Null }).unwrap(); - assert_eq!(result, None); -} - -#[test] -fn handle_no_value_delete() { - let op = empty_delete(); - - handle_response_test(&op, doc! { "ok": 1.0 }).unwrap_err(); -} - -// replace tests - -fn empty_replace() -> FindAndModify { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! {}; - let replacement = doc! { "x": { "inc": 1 } }; - FindAndModify::with_replace(ns, filter, replacement, None).unwrap() -} - -#[test] -fn build_with_replace_hint() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let replacement = doc! { "x": { "inc": 1 } }; - let options = FindOneAndReplaceOptions { - hint: Some(Hint::Keys(doc! { "x": 1, "y": -1 })), - upsert: Some(false), - bypass_document_validation: Some(true), - return_document: Some(ReturnDocument::After), - ..Default::default() - }; - - let mut op = FindAndModify::::with_replace( - ns, - filter.clone(), - replacement.clone(), - Some(options), - ) - .unwrap(); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "update": replacement, - "upsert": false, - "bypassDocumentValidation": true, - "new": true, - "hint": { - "x": 1, - "y": -1, - }, - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn build_with_replace_no_options() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let replacement = doc! { "x": { "inc": 1 } }; - - let mut op = - FindAndModify::::with_replace(ns, filter.clone(), replacement.clone(), None) - .unwrap(); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "update": replacement, - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn build_with_replace() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let replacement = doc! { "x": { "inc": 1 } }; - let options = FindOneAndReplaceOptions { - upsert: Some(false), - bypass_document_validation: Some(true), - return_document: Some(ReturnDocument::After), - ..Default::default() - }; - - let mut op = FindAndModify::::with_replace( - ns, - filter.clone(), - replacement.clone(), - Some(options), - ) - .unwrap(); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "update": replacement, - "upsert": false, - "bypassDocumentValidation": true, - "new": true - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn handle_success_replace() { - let op = empty_replace(); - let value = doc! { - "_id" : Bson::ObjectId(ObjectId::new()), - "name" : "Tom", - "state" : "active", - "rating" : 100, - "score" : 5 - }; - let ok_response = doc! { - "lastErrorObject" : { - "connectionId" : 1, - "updatedExisting" : true, - "n" : 1, - "syncMillis" : 0, - "writtenTo" : null, - "err" : null, - "ok" : 1 - }, - "value" : value.clone(), - "ok" : 1 - }; - - let result = handle_response_test(&op, ok_response).unwrap(); - assert_eq!(result.unwrap(), value); -} - -#[test] -fn handle_null_value_replace() { - let op = empty_replace(); - let result = handle_response_test(&op, doc! { "ok": 1.0, "value": Bson::Null }).unwrap(); - assert_eq!(result, None); -} - -#[test] -fn handle_no_value_replace() { - let op = empty_replace(); - handle_response_test(&op, doc! { "ok": 1.0 }).unwrap_err(); -} - -// update tests - -fn empty_update() -> FindAndModify { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! {}; - let update = UpdateModifications::Document(doc! { "$x": { "$inc": 1 } }); - FindAndModify::with_update(ns, filter, update, None).unwrap() -} - -#[test] -fn build_with_update_hint() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let update = UpdateModifications::Document(doc! { "$x": { "$inc": 1 } }); - let options = FindOneAndUpdateOptions { - hint: Some(Hint::Keys(doc! { "x": 1, "y": -1 })), - upsert: Some(false), - bypass_document_validation: Some(true), - ..Default::default() - }; - - let mut op = - FindAndModify::::with_update(ns, filter.clone(), update.clone(), Some(options)) - .unwrap(); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "update": update.to_bson(), - "upsert": false, - "bypassDocumentValidation": true, - "hint": { - "x": 1, - "y": -1, - }, - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn build_with_update_no_options() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let update = UpdateModifications::Document(doc! { "$x": { "$inc": 1 } }); - let mut op = - FindAndModify::::with_update(ns, filter.clone(), update.clone(), None).unwrap(); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "update": update.to_bson(), - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn build_with_update() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let update = UpdateModifications::Document(doc! { "$x": { "$inc": 1 } }); - let options = FindOneAndUpdateOptions { - upsert: Some(false), - bypass_document_validation: Some(true), - ..Default::default() - }; - - let mut op = - FindAndModify::::with_update(ns, filter.clone(), update.clone(), Some(options)) - .unwrap(); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "findAndModify"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "findAndModify": "test_coll", - "query": filter, - "update": update.to_bson(), - "upsert": false, - "bypassDocumentValidation": true - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn handle_success_update() { - let op = empty_update(); - let value = doc! { - "_id" : Bson::ObjectId(ObjectId::new()), - "name" : "Tom", - "state" : "active", - "rating" : 100, - "score" : 5 - }; - let ok_response = doc! { - "lastErrorObject" : { - "connectionId" : 1, - "updatedExisting" : true, - "n" : 1, - "syncMillis" : 0, - "writtenTo" : null, - "err" : null, - "ok" : 1 - }, - "value" : value.clone(), - "ok" : 1 - }; - - let result = handle_response_test(&op, ok_response).unwrap(); - assert_eq!(result.unwrap(), value); -} - -#[test] -fn handle_null_value_update() { - let op = empty_update(); - let result = handle_response_test(&op, doc! { "ok": 1.0, "value": Bson::Null }).unwrap(); - assert_eq!(result, None); -} - -#[test] -fn handle_no_value_update() { - let op = empty_update(); - handle_response_test(&op, doc! { "ok": 1.0 }).unwrap_err(); -} diff --git a/src/operation/insert.rs b/src/operation/insert.rs index f9f1e3254..4c51b1c59 100644 --- a/src/operation/insert.rs +++ b/src/operation/insert.rs @@ -19,6 +19,7 @@ use crate::{ }, options::{InsertManyOptions, WriteConcern}, results::InsertManyResult, + serde_util, Namespace, }; @@ -86,17 +87,8 @@ impl<'a, T: Serialize> OperationWithDefaults for Insert<'a, T> { .take(description.max_write_batch_size as usize) .enumerate() { - let mut doc = if self.human_readable_serialization { - let serializer_options = bson::SerializerOptions::builder() - .human_readable(true) - .build(); - bson::RawDocumentBuf::from_document(&bson::to_document_with_options( - d, - serializer_options, - )?)? - } else { - bson::to_raw_document_buf(d)? - }; + let mut doc = + serde_util::to_raw_document_buf_with_options(d, self.human_readable_serialization)?; let id = match doc.get("_id")? { Some(b) => b.try_into()?, None => { diff --git a/src/operation/update.rs b/src/operation/update.rs index d9442aee3..8c524f9e5 100644 --- a/src/operation/update.rs +++ b/src/operation/update.rs @@ -1,118 +1,179 @@ #[cfg(test)] mod test; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use crate::{ - bson::{doc, Bson, Document}, + bson::{doc, rawdoc, Document, RawArrayBuf, RawBson, RawDocumentBuf}, bson_util, cmap::{Command, RawCommandResponse, StreamDescription}, error::{convert_bulk_errors, Result}, operation::{OperationWithDefaults, Retryability, WriteResponseBody}, options::{UpdateModifications, UpdateOptions, WriteConcern}, results::UpdateResult, + serde_util::to_raw_document_buf_with_options, Namespace, }; +#[derive(Clone, Debug)] +pub(crate) enum UpdateOrReplace<'a, T = ()> { + UpdateModifications(UpdateModifications), + Replacement(&'a T), +} + +impl<'a, T: Serialize> UpdateOrReplace<'a, T> { + pub(crate) fn to_raw_bson(&self, human_readable_serialization: bool) -> Result { + match self { + Self::UpdateModifications(update_modifications) => match update_modifications { + UpdateModifications::Document(document) => { + Ok(RawDocumentBuf::from_document(document)?.into()) + } + UpdateModifications::Pipeline(pipeline) => bson_util::to_raw_bson_array(pipeline), + }, + Self::Replacement(replacement) => { + let replacement_doc = + to_raw_document_buf_with_options(replacement, human_readable_serialization)?; + bson_util::replacement_raw_document_check(&replacement_doc)?; + Ok(replacement_doc.into()) + } + } + } +} + +impl From for UpdateOrReplace<'_> { + fn from(update_modifications: UpdateModifications) -> Self { + Self::UpdateModifications(update_modifications) + } +} + +impl<'a, T: Serialize> From<&'a T> for UpdateOrReplace<'a, T> { + fn from(t: &'a T) -> Self { + Self::Replacement(t) + } +} + #[derive(Debug)] -pub(crate) struct Update { +pub(crate) struct Update<'a, T = ()> { ns: Namespace, filter: Document, - update: UpdateModifications, + update: UpdateOrReplace<'a, T>, multi: Option, options: Option, + human_readable_serialization: bool, } -impl Update { +impl Update<'_> { #[cfg(test)] fn empty() -> Self { - Update::new( - Namespace { - db: String::new(), - coll: String::new(), - }, - Document::new(), - UpdateModifications::Document(Document::new()), + Self::with_update( + Namespace::new("db", "coll"), + doc! {}, + UpdateModifications::Document(doc! {}), false, None, + false, ) } - pub(crate) fn new( + pub(crate) fn with_update( ns: Namespace, filter: Document, update: UpdateModifications, multi: bool, options: Option, + human_readable_serialization: bool, + ) -> Self { + Self { + ns, + filter, + update: update.into(), + multi: multi.then(|| true), + options, + human_readable_serialization, + } + } +} + +impl<'a, T: Serialize> Update<'a, T> { + pub(crate) fn with_replace( + ns: Namespace, + filter: Document, + update: &'a T, + multi: bool, + options: Option, + human_readable_serialization: bool, ) -> Self { Self { ns, filter, - update, - multi: if multi { Some(true) } else { None }, + update: update.into(), + multi: multi.then(|| true), options, + human_readable_serialization, } } } -impl OperationWithDefaults for Update { +impl<'a, T: Serialize> OperationWithDefaults for Update<'a, T> { type O = UpdateResult; - type Command = Document; + type Command = RawDocumentBuf; const NAME: &'static str = "update"; - fn build(&mut self, _description: &StreamDescription) -> Result { - let mut body = doc! { + fn build(&mut self, _description: &StreamDescription) -> Result> { + let mut body = rawdoc! { Self::NAME: self.ns.coll.clone(), }; - let mut update = doc! { - "q": self.filter.clone(), - "u": self.update.to_bson(), + let mut update = rawdoc! { + "q": RawDocumentBuf::from_document(&self.filter)?, + "u": self.update.to_raw_bson(self.human_readable_serialization)?, }; if let Some(ref options) = self.options { if let Some(upsert) = options.upsert { - update.insert("upsert", upsert); + update.append("upsert", upsert); } if let Some(ref array_filters) = options.array_filters { - update.insert("arrayFilters", bson_util::to_bson_array(array_filters)); + update.append("arrayFilters", bson_util::to_raw_bson_array(array_filters)?); } if let Some(ref hint) = options.hint { - update.insert("hint", hint.to_bson()); + update.append("hint", hint.to_raw_bson()?); } if let Some(ref collation) = options.collation { - update.insert("collation", bson::to_bson(collation)?); + update.append("collation", bson::to_raw_document_buf(&collation)?); } if let Some(bypass_doc_validation) = options.bypass_document_validation { - body.insert("bypassDocumentValidation", bypass_doc_validation); + body.append("bypassDocumentValidation", bypass_doc_validation); } if let Some(ref write_concern) = options.write_concern { if !write_concern.is_empty() { - body.insert("writeConcern", bson::to_bson(write_concern)?); + body.append("writeConcern", bson::to_raw_document_buf(write_concern)?); } } if let Some(ref let_vars) = options.let_vars { - body.insert("let", let_vars); + body.append("let", bson::to_raw_document_buf(&let_vars)?); } if let Some(ref comment) = options.comment { - body.insert("comment", comment); + body.append("comment", RawBson::try_from(comment.clone())?); } }; if let Some(multi) = self.multi { - update.insert("multi", multi); + update.append("multi", multi); } - body.insert("updates", vec![Bson::Document(update)]); - body.insert("ordered", true); // command monitoring tests expect this (SPEC-1130) + let mut updates = RawArrayBuf::new(); + updates.push(update); + body.append("updates", updates); + body.append("ordered", true); // command monitoring tests expect this (SPEC-1130) Ok(Command::new( Self::NAME.to_string(), @@ -121,6 +182,10 @@ impl OperationWithDefaults for Update { )) } + fn serialize_command(&mut self, cmd: Command) -> Result> { + cmd.into_bson_bytes() + } + fn handle_response( &self, raw_response: RawCommandResponse, diff --git a/src/operation/update/test.rs b/src/operation/update/test.rs index 25e1c0ce4..a4e4c4cad 100644 --- a/src/operation/update/test.rs +++ b/src/operation/update/test.rs @@ -2,154 +2,10 @@ use pretty_assertions::assert_eq; use crate::{ bson::{doc, Bson}, - bson_util, - cmap::StreamDescription, - coll::options::Hint, - concern::{Acknowledgment, WriteConcern}, error::{ErrorKind, WriteConcernError, WriteError, WriteFailure}, - operation::{test::handle_response_test, Operation, Update}, - options::{UpdateModifications, UpdateOptions}, - Namespace, + operation::{test::handle_response_test, Update}, }; -#[test] -fn build() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let update = UpdateModifications::Document(doc! { "x": { "$inc": 1 } }); - let wc = WriteConcern { - w: Some(Acknowledgment::Majority), - ..Default::default() - }; - let options = UpdateOptions { - upsert: Some(false), - bypass_document_validation: Some(true), - write_concern: Some(wc), - ..Default::default() - }; - - let mut op = Update::new(ns, filter.clone(), update.clone(), false, Some(options)); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "update"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "update": "test_coll", - "updates": [ - { - "q": filter, - "u": update.to_bson(), - "upsert": false, - } - ], - "writeConcern": { - "w": "majority" - }, - "bypassDocumentValidation": true, - "ordered": true, - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn build_hint() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let update = UpdateModifications::Document(doc! { "x": { "$inc": 1 } }); - let wc = WriteConcern { - w: Some(Acknowledgment::Majority), - ..Default::default() - }; - let options = UpdateOptions { - upsert: Some(false), - bypass_document_validation: Some(true), - write_concern: Some(wc), - hint: Some(Hint::Keys(doc! { "x": 1, "y": -1 })), - ..Default::default() - }; - - let mut op = Update::new(ns, filter.clone(), update.clone(), false, Some(options)); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "update"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "update": "test_coll", - "updates": [ - { - "q": filter, - "u": update.to_bson(), - "upsert": false, - "hint": { - "x": 1, - "y": -1, - }, - } - ], - "writeConcern": { - "w": "majority" - }, - "bypassDocumentValidation": true, - "ordered": true, - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - -#[test] -fn build_many() { - let ns = Namespace { - db: "test_db".to_string(), - coll: "test_coll".to_string(), - }; - let filter = doc! { "x": { "$gt": 1 } }; - let update = UpdateModifications::Document(doc! { "x": { "$inc": 1 } }); - - let mut op = Update::new(ns, filter.clone(), update.clone(), true, None); - - let description = StreamDescription::new_testing(); - let mut cmd = op.build(&description).unwrap(); - - assert_eq!(cmd.name.as_str(), "update"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - - let mut expected_body = doc! { - "update": "test_coll", - "updates": [ - { - "q": filter, - "u": update.to_bson(), - "multi": true, - } - ], - "ordered": true, - }; - - bson_util::sort_document(&mut cmd.body); - bson_util::sort_document(&mut expected_body); - - assert_eq!(cmd.body, expected_body); -} - #[test] fn handle_success() { let op = Update::empty(); diff --git a/src/serde_util.rs b/src/serde_util.rs index 80ac8014e..a25443dbd 100644 --- a/src/serde_util.rs +++ b/src/serde_util.rs @@ -1,9 +1,10 @@ use std::time::Duration; +use bson::SerializerOptions; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{ - bson::{doc, Bson, Document}, + bson::{doc, Bson, Document, RawDocumentBuf}, bson_util::get_u64, error::{Error, Result}, }; @@ -124,10 +125,6 @@ pub(crate) fn serialize_result_error_as_string( .serialize(serializer) } -pub(crate) fn serialize_true(s: S) -> std::result::Result { - s.serialize_bool(true) -} - #[cfg(feature = "aws-auth")] pub(crate) fn deserialize_datetime_option_from_double_or_string<'de, D>( deserializer: D, @@ -154,6 +151,22 @@ where Ok(Some(date_time)) } +pub(crate) fn to_raw_document_buf_with_options( + doc: &T, + human_readable_serialization: bool, +) -> Result { + let raw_doc = if human_readable_serialization { + let doc = bson::to_document_with_options( + doc, + SerializerOptions::builder().human_readable(true).build(), + )?; + RawDocumentBuf::from_document(&doc)? + } else { + bson::to_raw_document_buf(doc)? + }; + Ok(raw_doc) +} + #[cfg(test)] pub(crate) fn deserialize_nonempty_vec<'de, D, T>( deserializer: D, diff --git a/src/test/coll.rs b/src/test/coll.rs index 47a2f584e..fda319b00 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -591,12 +591,11 @@ async fn delete_hint_test(options: Option, name: &str) { let event_hint = events[0].command.get_array("deletes").unwrap()[0] .as_document() .unwrap() - .get("hint"); - let expected_hint = match options { - Some(options) => options.hint.map(|hint| hint.to_bson()), - None => None, - }; - assert_eq!(event_hint, expected_hint.as_ref()); + .get("hint") + .cloned() + .map(|bson| bson::from_bson(bson).unwrap()); + let expected_hint = options.and_then(|options| options.hint); + assert_eq!(event_hint, expected_hint); } #[cfg_attr(feature = "tokio-runtime", tokio::test)] @@ -639,8 +638,12 @@ async fn find_one_and_delete_hint_test(options: Option, let events = client.get_command_started_events(&["findAndModify"]); assert_eq!(events.len(), 1); - let event_hint = events[0].command.get("hint").cloned(); - let expected_hint = options.and_then(|options| options.hint.map(|hint| hint.to_bson())); + let event_hint = events[0] + .command + .get("hint") + .cloned() + .map(|bson| bson::from_bson(bson).unwrap()); + let expected_hint = options.and_then(|options| options.hint); assert_eq!(event_hint, expected_hint); }