Skip to content

Commit f258c21

Browse files
authored
RUST-2198 Add run_raw_command method (#1356)
1 parent dbff9ef commit f258c21

File tree

5 files changed

+134
-39
lines changed

5 files changed

+134
-39
lines changed

src/action/run_command.rs

+73-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::Duration;
22

3-
use bson::{Bson, Document};
3+
use bson::{Bson, Document, RawDocumentBuf};
44

55
use crate::{
66
client::session::TransactionState,
@@ -40,7 +40,27 @@ impl Database {
4040
pub fn run_command(&self, command: Document) -> RunCommand {
4141
RunCommand {
4242
db: self,
43-
command,
43+
command: RawDocumentBuf::from_document(&command),
44+
options: None,
45+
session: None,
46+
}
47+
}
48+
49+
/// Runs a database-level command.
50+
///
51+
/// Note that no inspection is done on `doc`, so the command will not use the database's default
52+
/// read concern or write concern. If specific read concern or write concern is desired, it must
53+
/// be specified manually.
54+
/// Please note that run_raw_command doesn't validate WriteConcerns passed into the body of the
55+
/// command document.
56+
///
57+
/// `await` will return d[`Result<Document>`].
58+
#[deeplink]
59+
#[options_doc(run_command)]
60+
pub fn run_raw_command(&self, command: RawDocumentBuf) -> RunCommand {
61+
RunCommand {
62+
db: self,
63+
command: Ok(command),
4464
options: None,
4565
session: None,
4666
}
@@ -55,7 +75,22 @@ impl Database {
5575
pub fn run_cursor_command(&self, command: Document) -> RunCursorCommand {
5676
RunCursorCommand {
5777
db: self,
58-
command,
78+
command: RawDocumentBuf::from_document(&command),
79+
options: None,
80+
session: ImplicitSession,
81+
}
82+
}
83+
84+
/// Runs a database-level command and returns a cursor to the response.
85+
///
86+
/// `await` will return d[`Result<Cursor<Document>>`] or a
87+
/// d[`Result<SessionCursor<Document>>`] if a [`ClientSession`] is provided.
88+
#[deeplink]
89+
#[options_doc(run_cursor_command)]
90+
pub fn run_raw_cursor_command(&self, command: RawDocumentBuf) -> RunCursorCommand {
91+
RunCursorCommand {
92+
db: self,
93+
command: Ok(command),
5994
options: None,
6095
session: ImplicitSession,
6196
}
@@ -79,6 +114,21 @@ impl crate::sync::Database {
79114
self.async_database.run_command(command)
80115
}
81116

117+
/// Runs a database-level command.
118+
///
119+
/// Note that no inspection is done on `doc`, so the command will not use the database's default
120+
/// read concern or write concern. If specific read concern or write concern is desired, it must
121+
/// be specified manually.
122+
/// Please note that run_raw_command doesn't validate WriteConcerns passed into the body of the
123+
/// command document.
124+
///
125+
/// [`run`](RunCommand::run) will return d[`Result<Document>`].
126+
#[deeplink]
127+
#[options_doc(run_command, sync)]
128+
pub fn run_raw_command(&self, command: RawDocumentBuf) -> RunCommand {
129+
self.async_database.run_raw_command(command)
130+
}
131+
82132
/// Runs a database-level command and returns a cursor to the response.
83133
///
84134
/// [`run`](RunCursorCommand::run) will return d[`Result<crate::sync::Cursor<Document>>`] or a
@@ -88,13 +138,23 @@ impl crate::sync::Database {
88138
pub fn run_cursor_command(&self, command: Document) -> RunCursorCommand {
89139
self.async_database.run_cursor_command(command)
90140
}
141+
142+
/// Runs a database-level command and returns a cursor to the response.
143+
///
144+
/// [`run`](RunCursorCommand::run) will return d[`Result<crate::sync::Cursor<Document>>`] or a
145+
/// d[`Result<crate::sync::SessionCursor<Document>>`] if a [`ClientSession`] is provided.
146+
#[deeplink]
147+
#[options_doc(run_cursor_command, sync)]
148+
pub fn run_raw_cursor_command(&self, command: RawDocumentBuf) -> RunCursorCommand {
149+
self.async_database.run_raw_cursor_command(command)
150+
}
91151
}
92152

93153
/// Run a database-level command. Create with [`Database::run_command`].
94154
#[must_use]
95155
pub struct RunCommand<'a> {
96156
db: &'a Database,
97-
command: Document,
157+
command: bson::raw::Result<RawDocumentBuf>,
98158
options: Option<RunCommandOptions>,
99159
session: Option<&'a mut ClientSession>,
100160
}
@@ -115,10 +175,11 @@ impl<'a> Action for RunCommand<'a> {
115175

116176
async fn execute(self) -> Result<Document> {
117177
let mut selection_criteria = self.options.and_then(|o| o.selection_criteria);
178+
let command = self.command?;
118179
if let Some(session) = &self.session {
119180
match session.transaction.state {
120181
TransactionState::Starting | TransactionState::InProgress => {
121-
if self.command.contains_key("readConcern") {
182+
if command.get("readConcern").is_ok_and(|rc| rc.is_some()) {
122183
return Err(ErrorKind::InvalidArgument {
123184
message: "Cannot set read concern after starting a transaction".into(),
124185
}
@@ -139,12 +200,8 @@ impl<'a> Action for RunCommand<'a> {
139200
}
140201
}
141202

142-
let operation = run_command::RunCommand::new(
143-
self.db.name().into(),
144-
self.command,
145-
selection_criteria,
146-
None,
147-
)?;
203+
let operation =
204+
run_command::RunCommand::new(self.db.name().into(), command, selection_criteria, None);
148205
self.db
149206
.client()
150207
.execute_operation(operation, self.session)
@@ -157,7 +214,7 @@ impl<'a> Action for RunCommand<'a> {
157214
#[must_use]
158215
pub struct RunCursorCommand<'a, Session = ImplicitSession> {
159216
db: &'a Database,
160-
command: Document,
217+
command: bson::raw::Result<RawDocumentBuf>,
161218
options: Option<RunCursorCommandOptions>,
162219
session: Session,
163220
}
@@ -192,10 +249,10 @@ impl<'a> Action for RunCursorCommand<'a, ImplicitSession> {
192249
.and_then(|options| options.selection_criteria.clone());
193250
let rcc = run_command::RunCommand::new(
194251
self.db.name().to_string(),
195-
self.command,
252+
self.command?,
196253
selection_criteria,
197254
None,
198-
)?;
255+
);
199256
let rc_command = run_cursor_command::RunCursorCommand::new(rcc, self.options)?;
200257
let client = self.db.client();
201258
client.execute_cursor_operation(rc_command).await
@@ -218,10 +275,10 @@ impl<'a> Action for RunCursorCommand<'a, ExplicitSession<'a>> {
218275
.and_then(|options| options.selection_criteria.clone());
219276
let rcc = run_command::RunCommand::new(
220277
self.db.name().to_string(),
221-
self.command,
278+
self.command?,
222279
selection_criteria,
223280
None,
224-
)?;
281+
);
225282
let rc_command = run_cursor_command::RunCursorCommand::new(rcc, self.options)?;
226283
let client = self.db.client();
227284
client

src/client/csfle/state_machine.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl CryptExecutor {
126126
let db = db.as_ref().ok_or_else(|| {
127127
Error::internal("db required for NeedMongoMarkings state")
128128
})?;
129-
let op = RawOutput(RunCommand::new_raw(db.to_string(), command, None, None)?);
129+
let op = RawOutput(RunCommand::new(db.to_string(), command, None, None));
130130
let mongocryptd_client = self.mongocryptd_client.as_ref().ok_or_else(|| {
131131
Error::invalid_argument("this operation requires mongocryptd")
132132
})?;

src/coll.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ pub mod options;
33

44
use std::{fmt, fmt::Debug, str::FromStr, sync::Arc};
55

6+
use bson::rawdoc;
67
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize};
78

89
use self::options::*;
910
use crate::{
10-
bson::doc,
1111
client::options::ServerAddress,
1212
cmap::conn::PinnedConnectionHandle,
1313
concern::{ReadConcern, WriteConcern},
@@ -199,13 +199,13 @@ where
199199

200200
let op = crate::operation::run_command::RunCommand::new(
201201
ns.db,
202-
doc! {
202+
rawdoc! {
203203
"killCursors": ns.coll.as_str(),
204204
"cursors": [cursor_id]
205205
},
206206
drop_address.map(SelectionCriteria::from_address),
207207
pinned_connection,
208-
)?;
208+
);
209209
self.client().execute_operation(op, None).await?;
210210
Ok(())
211211
}

src/operation/run_command.rs

+3-18
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,17 @@ pub(crate) struct RunCommand<'conn> {
2020

2121
impl<'conn> RunCommand<'conn> {
2222
pub(crate) fn new(
23-
db: String,
24-
command: Document,
25-
selection_criteria: Option<SelectionCriteria>,
26-
pinned_connection: Option<&'conn PinnedConnectionHandle>,
27-
) -> Result<Self> {
28-
Ok(Self {
29-
db,
30-
command: RawDocumentBuf::from_document(&command)?,
31-
selection_criteria,
32-
pinned_connection,
33-
})
34-
}
35-
36-
#[cfg(feature = "in-use-encryption")]
37-
pub(crate) fn new_raw(
3823
db: String,
3924
command: RawDocumentBuf,
4025
selection_criteria: Option<SelectionCriteria>,
4126
pinned_connection: Option<&'conn PinnedConnectionHandle>,
42-
) -> Result<Self> {
43-
Ok(Self {
27+
) -> Self {
28+
Self {
4429
db,
4530
command,
4631
selection_criteria,
4732
pinned_connection,
48-
})
33+
}
4934
}
5035

5136
fn command_name(&self) -> Option<&str> {

src/test/db.rs

+54-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::cmp::Ord;
22

3-
use futures::stream::TryStreamExt;
3+
use bson::RawDocumentBuf;
4+
use futures::{stream::TryStreamExt, StreamExt};
45
use serde::Deserialize;
56

67
use crate::{
@@ -413,3 +414,55 @@ async fn aggregate_with_generics() {
413414
.await
414415
.unwrap();
415416
}
417+
418+
#[tokio::test]
419+
async fn test_run_command() {
420+
let client = Client::for_test().await;
421+
let database = client.database("db");
422+
423+
// Test run_command
424+
{
425+
let got = database.run_command(doc! {"ping": 1}).await.unwrap();
426+
assert_eq!(crate::bson_util::get_int(got.get("ok").unwrap()), Some(1));
427+
}
428+
429+
// Test run_raw_command
430+
{
431+
let mut cmd = RawDocumentBuf::new();
432+
cmd.append("ping", 1);
433+
let got = database.run_raw_command(cmd).await.unwrap();
434+
assert_eq!(crate::bson_util::get_int(got.get("ok").unwrap()), Some(1));
435+
}
436+
437+
// Create a collection with a single document
438+
{
439+
let coll = database.collection("coll");
440+
coll.drop().await.expect("should drop");
441+
coll.insert_one(doc! {"foo": "bar"})
442+
.await
443+
.expect("should insert");
444+
}
445+
446+
// Test run_cursor_command
447+
{
448+
let cursor = database
449+
.run_cursor_command(doc! {"find": "coll", "filter": {}})
450+
.await
451+
.unwrap();
452+
let v: Vec<Result<Document>> = cursor.collect().await;
453+
assert_eq!(v.len(), 1);
454+
assert_eq!(v[0].as_ref().unwrap().get_str("foo"), Ok("bar"));
455+
}
456+
457+
// Test run_raw_cursor_command
458+
{
459+
let mut cmd = RawDocumentBuf::new();
460+
cmd.append("find", "coll");
461+
cmd.append("filter", RawDocumentBuf::new());
462+
463+
let cursor = database.run_raw_cursor_command(cmd).await.unwrap();
464+
let v: Vec<Result<Document>> = cursor.collect().await;
465+
assert_eq!(v.len(), 1);
466+
assert_eq!(v[0].as_ref().unwrap().get_str("foo"), Ok("bar"));
467+
}
468+
}

0 commit comments

Comments
 (0)