Skip to content

Commit

Permalink
RUST-1588: Add RunCursorCommand (#912)
Browse files Browse the repository at this point in the history
* RUST-1636: Add RunCursorCommand
  • Loading branch information
drshika authored Jul 19, 2023
1 parent af30b76 commit c242539
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 3 deletions.
47 changes: 46 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@ use crate::{
cursor::Cursor,
error::{Error, ErrorKind, Result},
gridfs::{options::GridFsBucketOptions, GridFsBucket},
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
operation::{
Aggregate,
AggregateTarget,
Create,
DropDatabase,
ListCollections,
RunCommand,
RunCursorCommand,
},
options::{
AggregateOptions,
CollectionOptions,
CreateCollectionOptions,
DatabaseOptions,
DropDatabaseOptions,
ListCollectionsOptions,
RunCursorCommandOptions,
},
results::CollectionSpecification,
selection_criteria::SelectionCriteria,
Expand Down Expand Up @@ -469,6 +478,42 @@ impl Database {
.await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command(
&self,
command: Document,
options: impl Into<Option<RunCursorCommandOptions>>,
) -> Result<Cursor<Document>> {
let options: Option<RunCursorCommandOptions> = options.into();
let selection_criteria = options
.as_ref()
.and_then(|options| options.selection_criteria.clone());
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
let rc_command = RunCursorCommand::new(rcc, options)?;
let client = self.client();
client.execute_cursor_operation(rc_command).await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command_with_session(
&self,
command: Document,
options: impl Into<Option<RunCursorCommandOptions>>,
session: &mut ClientSession,
) -> Result<SessionCursor<Document>> {
let mut options: Option<RunCursorCommandOptions> = options.into();
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
let selection_criteria = options
.as_ref()
.and_then(|options| options.selection_criteria.clone());
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
let rc_command = RunCursorCommand::new(rcc, options)?;
let client = self.client();
client
.execute_session_cursor_operation(rc_command, session)
.await
}

/// Runs a database-level command using the provided `ClientSession`.
///
/// If the `ClientSession` provided is currently in a transaction, `command` must not specify a
Expand Down
22 changes: 21 additions & 1 deletion src/db/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use typed_builder::TypedBuilder;
use crate::{
bson::{Bson, Document},
concern::{ReadConcern, WriteConcern},
options::Collation,
options::{Collation, CursorType},
selection_criteria::SelectionCriteria,
serde_util,
};
Expand Down Expand Up @@ -312,3 +312,23 @@ pub struct ChangeStreamPreAndPostImages {
/// If `true`, change streams will be able to include pre- and post-images.
pub enabled: bool,
}

/// Specifies the options to a
/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation.
#[derive(Clone, Debug, Default, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct RunCursorCommandOptions {
/// The default read preference for operations.
pub selection_criteria: Option<SelectionCriteria>,
/// The type of cursor to return.
pub cursor_type: Option<CursorType>,
/// Number of documents to return per batch.
pub batch_size: Option<u32>,
/// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent
/// on subsequent getMore commands.
pub max_time: Option<Duration>,
/// Optional BSON value. Use this value to configure the comment option sent on subsequent
/// getMore commands.
pub comment: Option<Bson>,
}
2 changes: 2 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod list_databases;
mod list_indexes;
mod raw_output;
mod run_command;
mod run_cursor_command;
mod update;

#[cfg(test)]
Expand Down Expand Up @@ -71,6 +72,7 @@ pub(crate) use list_indexes::ListIndexes;
#[cfg(feature = "in-use-encryption-unstable")]
pub(crate) use raw_output::RawOutput;
pub(crate) use run_command::RunCommand;
pub(crate) use run_cursor_command::RunCursorCommand;
pub(crate) use update::Update;

const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
Expand Down
121 changes: 121 additions & 0 deletions src/operation/run_cursor_command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#[cfg(feature = "in-use-encryption-unstable")]
use bson::doc;
use bson::RawDocumentBuf;

use crate::{
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
concern::WriteConcern,
cursor::CursorSpecification,
error::{Error, Result},
operation::{Operation, RunCommand},
options::RunCursorCommandOptions,
selection_criteria::SelectionCriteria,
};

#[derive(Debug, Clone)]
pub(crate) struct RunCursorCommand<'conn> {
run_command: RunCommand<'conn>,
options: Option<RunCursorCommandOptions>,
}

impl<'conn> RunCursorCommand<'conn> {
pub(crate) fn new(
run_command: RunCommand<'conn>,
options: Option<RunCursorCommandOptions>,
) -> Result<Self> {
Ok(Self {
run_command,
options,
})
}
}

impl<'conn> Operation for RunCursorCommand<'conn> {
type O = CursorSpecification;
type Command = RawDocumentBuf;

const NAME: &'static str = "run_cursor_command";

fn build(&mut self, description: &StreamDescription) -> Result<Command<Self::Command>> {
self.run_command.build(description)
}

fn serialize_command(&mut self, cmd: Command<Self::Command>) -> Result<Vec<u8>> {
self.run_command.serialize_command(cmd)
}

fn extract_at_cluster_time(
&self,
response: &bson::RawDocument,
) -> Result<Option<bson::Timestamp>> {
self.run_command.extract_at_cluster_time(response)
}

fn handle_error(&self, error: Error) -> Result<Self::O> {
Err(error)
}

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.run_command.selection_criteria()
}

fn is_acknowledged(&self) -> bool {
self.run_command.is_acknowledged()
}

fn write_concern(&self) -> Option<&WriteConcern> {
self.run_command.write_concern()
}

fn supports_read_concern(&self, description: &StreamDescription) -> bool {
self.run_command.supports_read_concern(description)
}

fn supports_sessions(&self) -> bool {
self.run_command.supports_sessions()
}

fn retryability(&self) -> crate::operation::Retryability {
self.run_command.retryability()
}

fn update_for_retry(&mut self) {
self.run_command.update_for_retry()
}

fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
self.run_command.pinned_connection()
}

fn name(&self) -> &str {
self.run_command.name()
}

fn handle_response(
&self,
response: RawCommandResponse,
description: &StreamDescription,
) -> Result<Self::O> {
let doc = Operation::handle_response(&self.run_command, response, description)?;
let cursor_info = bson::from_document(doc)?;
let batch_size = match &self.options {
Some(options) => options.batch_size.clone(),
None => None,
};
let max_time = match &self.options {
Some(options) => options.max_time.clone(),
None => None,
};
let comment = match &self.options {
Some(options) => options.comment.clone(),
None => None,
};
Ok(CursorSpecification::new(
cursor_info,
description.server_address.clone(),
batch_size,
max_time,
comment,
))
}
}
2 changes: 1 addition & 1 deletion src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
bson::{serde_helpers, Bson, Document},
change_stream::event::ResumeToken,
db::options::CreateCollectionOptions,
Namespace,
serde_util,
Namespace,
};

use bson::{Binary, RawDocumentBuf};
Expand Down

0 comments on commit c242539

Please sign in to comment.