Skip to content

Commit bd2c415

Browse files
RUST-1875 Define separate types for summary and verbose bulk write results
1 parent 6adb8c1 commit bd2c415

File tree

11 files changed

+374
-253
lines changed

11 files changed

+374
-253
lines changed

src/action/bulk_write.rs

+59-21
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,62 @@
11
#![allow(missing_docs)]
22

3-
use std::collections::HashMap;
3+
use std::{collections::HashMap, marker::PhantomData};
44

55
use crate::{
66
bson::{Bson, Document},
77
error::{BulkWriteError, Error, ErrorKind, Result},
88
operation::bulk_write::BulkWrite as BulkWriteOperation,
99
options::{BulkWriteOptions, WriteConcern, WriteModel},
10-
results::BulkWriteResult,
10+
results::{BulkWriteResult, SummaryBulkWriteResult, VerboseBulkWriteResult},
1111
Client,
1212
ClientSession,
1313
};
1414

1515
use super::{action_impl, option_setters};
1616

1717
impl Client {
18-
pub fn bulk_write(&self, models: impl IntoIterator<Item = WriteModel>) -> BulkWrite {
18+
pub fn bulk_write(
19+
&self,
20+
models: impl IntoIterator<Item = WriteModel>,
21+
) -> BulkWrite<SummaryBulkWriteResult> {
1922
BulkWrite::new(self, models.into_iter().collect())
2023
}
2124
}
2225

2326
#[must_use]
24-
pub struct BulkWrite<'a> {
27+
pub struct BulkWrite<'a, R> {
2528
client: &'a Client,
2629
models: Vec<WriteModel>,
2730
options: Option<BulkWriteOptions>,
2831
session: Option<&'a mut ClientSession>,
32+
_phantom: PhantomData<R>,
2933
}
3034

31-
impl<'a> BulkWrite<'a> {
35+
impl<'a> BulkWrite<'a, SummaryBulkWriteResult> {
36+
pub fn verbose_results(self) -> BulkWrite<'a, VerboseBulkWriteResult> {
37+
BulkWrite {
38+
client: self.client,
39+
models: self.models,
40+
options: self.options,
41+
session: self.session,
42+
_phantom: PhantomData,
43+
}
44+
}
45+
}
46+
47+
impl<'a, R> BulkWrite<'a, R>
48+
where
49+
R: BulkWriteResult,
50+
{
3251
option_setters!(options: BulkWriteOptions;
3352
ordered: bool,
3453
bypass_document_validation: bool,
3554
comment: Bson,
3655
let_vars: Document,
37-
verbose_results: bool,
3856
write_concern: WriteConcern,
3957
);
4058

41-
pub fn session(mut self, session: &'a mut ClientSession) -> BulkWrite<'a> {
59+
pub fn session(mut self, session: &'a mut ClientSession) -> Self {
4260
self.session = Some(session);
4361
self
4462
}
@@ -49,6 +67,7 @@ impl<'a> BulkWrite<'a> {
4967
models,
5068
options: None,
5169
session: None,
70+
_phantom: PhantomData,
5271
}
5372
}
5473

@@ -58,13 +77,8 @@ impl<'a> BulkWrite<'a> {
5877
.and_then(|options| options.ordered)
5978
.unwrap_or(true)
6079
}
61-
}
6280

63-
#[action_impl]
64-
impl<'a> Action for BulkWrite<'a> {
65-
type Future = BulkWriteFuture;
66-
67-
async fn execute(mut self) -> Result<BulkWriteResult> {
81+
async fn execute_inner(mut self) -> Result<R> {
6882
#[cfg(feature = "in-use-encryption-unstable")]
6983
if self.client.should_auto_encrypt().await {
7084
use mongocrypt::error::{Error as EncryptionError, ErrorKind as EncryptionErrorKind};
@@ -100,7 +114,7 @@ impl<'a> Action for BulkWrite<'a> {
100114
.await;
101115
let result = self
102116
.client
103-
.execute_operation::<BulkWriteOperation>(
117+
.execute_operation::<BulkWriteOperation<R>>(
104118
&mut operation,
105119
self.session.as_deref_mut(),
106120
)
@@ -128,18 +142,42 @@ impl<'a> Action for BulkWrite<'a> {
128142
}
129143
}
130144

145+
#[action_impl]
146+
impl<'a> Action for BulkWrite<'a, SummaryBulkWriteResult> {
147+
type Future = SummaryBulkWriteFuture;
148+
149+
async fn execute(mut self) -> Result<SummaryBulkWriteResult> {
150+
self.execute_inner().await
151+
}
152+
}
153+
154+
#[action_impl]
155+
impl<'a> Action for BulkWrite<'a, VerboseBulkWriteResult> {
156+
type Future = VerboseBulkWriteFuture;
157+
158+
async fn execute(mut self) -> Result<VerboseBulkWriteResult> {
159+
self.execute_inner().await
160+
}
161+
}
162+
131163
/// Represents the execution status of a bulk write. The status starts at `None`, indicating that no
132164
/// writes have been attempted yet, and transitions to either `Success` or `Error` as batches are
133165
/// executed. The contents of `Error` can be inspected to determine whether a bulk write can
134166
/// continue with further batches or should be terminated.
135-
enum ExecutionStatus {
136-
Success(BulkWriteResult),
167+
enum ExecutionStatus<R>
168+
where
169+
R: BulkWriteResult,
170+
{
171+
Success(R),
137172
Error(Error),
138173
None,
139174
}
140175

141-
impl ExecutionStatus {
142-
fn with_success(mut self, result: BulkWriteResult) -> Self {
176+
impl<R> ExecutionStatus<R>
177+
where
178+
R: BulkWriteResult,
179+
{
180+
fn with_success(mut self, result: R) -> Self {
143181
match self {
144182
// Merge two successful sets of results together.
145183
Self::Success(ref mut current_result) => {
@@ -149,7 +187,7 @@ impl ExecutionStatus {
149187
// Merge the results of the new batch into the existing bulk write error.
150188
Self::Error(ref mut current_error) => {
151189
let bulk_write_error = Self::get_current_bulk_write_error(current_error);
152-
bulk_write_error.merge_partial_results(result);
190+
bulk_write_error.merge_partial_results(result.into_partial_result());
153191
self
154192
}
155193
Self::None => Self::Success(result),
@@ -163,14 +201,14 @@ impl ExecutionStatus {
163201
// set its source as the error that just occurred.
164202
Self::Success(current_result) => match *error.kind {
165203
ErrorKind::BulkWrite(ref mut bulk_write_error) => {
166-
bulk_write_error.merge_partial_results(current_result);
204+
bulk_write_error.merge_partial_results(current_result.into_partial_result());
167205
Self::Error(error)
168206
}
169207
_ => {
170208
let bulk_write_error: Error = ErrorKind::BulkWrite(BulkWriteError {
171209
write_errors: HashMap::new(),
172210
write_concern_errors: Vec::new(),
173-
partial_result: Some(current_result),
211+
partial_result: Some(current_result.into_partial_result()),
174212
})
175213
.into();
176214
Self::Error(bulk_write_error.with_source(error))

src/client/options/bulk_write.rs

+5-46
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,32 @@
11
#![allow(missing_docs)]
22

3-
use serde::{ser::SerializeMap, Deserialize, Serialize};
3+
use serde::{Deserialize, Serialize};
44
use serde_with::skip_serializing_none;
55

66
use crate::{
77
bson::{rawdoc, Array, Bson, Document, RawDocumentBuf},
88
bson_util::{get_or_prepend_id_field, replacement_document_check, update_document_check},
99
error::Result,
1010
options::{UpdateModifications, WriteConcern},
11+
serde_util::serialize_bool_or_true,
1112
Namespace,
1213
};
1314

1415
#[skip_serializing_none]
15-
#[derive(Clone, Debug, Default, Deserialize)]
16+
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
1617
#[serde(rename_all = "camelCase")]
1718
#[non_exhaustive]
1819
pub struct BulkWriteOptions {
20+
#[serialize_always]
21+
#[serde(serialize_with = "serialize_bool_or_true")]
1922
pub ordered: Option<bool>,
2023
pub bypass_document_validation: Option<bool>,
2124
pub comment: Option<Bson>,
2225
#[serde(rename = "let")]
2326
pub let_vars: Option<Document>,
24-
pub verbose_results: Option<bool>,
2527
pub write_concern: Option<WriteConcern>,
2628
}
2729

28-
impl Serialize for BulkWriteOptions {
29-
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
30-
where
31-
S: serde::Serializer,
32-
{
33-
let BulkWriteOptions {
34-
ordered,
35-
bypass_document_validation,
36-
comment,
37-
let_vars,
38-
verbose_results,
39-
write_concern,
40-
} = self;
41-
42-
let mut map_serializer = serializer.serialize_map(None)?;
43-
44-
let ordered = ordered.unwrap_or(true);
45-
map_serializer.serialize_entry("ordered", &ordered)?;
46-
47-
if let Some(bypass_document_validation) = bypass_document_validation {
48-
map_serializer
49-
.serialize_entry("bypassDocumentValidation", bypass_document_validation)?;
50-
}
51-
52-
if let Some(ref comment) = comment {
53-
map_serializer.serialize_entry("comment", comment)?;
54-
}
55-
56-
if let Some(ref let_vars) = let_vars {
57-
map_serializer.serialize_entry("let", let_vars)?;
58-
}
59-
60-
let errors_only = verbose_results.map(|b| !b).unwrap_or(true);
61-
map_serializer.serialize_entry("errorsOnly", &errors_only)?;
62-
63-
if let Some(ref write_concern) = write_concern {
64-
map_serializer.serialize_entry("writeConcern", write_concern)?;
65-
}
66-
67-
map_serializer.end()
68-
}
69-
}
70-
7130
#[skip_serializing_none]
7231
#[derive(Clone, Debug, Serialize)]
7332
#[serde(untagged)]

src/error.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Contains the `Error` and `Result` types that `mongodb` uses.
22
3-
mod bulk_write;
3+
pub(crate) mod bulk_write;
44

55
use std::{
66
any::Any,

src/error/bulk_write.rs

+23-4
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,46 @@ use std::collections::HashMap;
44

55
use crate::{
66
error::{WriteConcernError, WriteError},
7-
results::BulkWriteResult,
7+
results::{BulkWriteResult, SummaryBulkWriteResult, VerboseBulkWriteResult},
88
};
99

1010
#[derive(Clone, Debug, Default)]
1111
#[non_exhaustive]
1212
pub struct BulkWriteError {
1313
pub write_concern_errors: Vec<WriteConcernError>,
1414
pub write_errors: HashMap<usize, WriteError>,
15-
pub partial_result: Option<BulkWriteResult>,
15+
pub partial_result: Option<PartialBulkWriteResult>,
16+
}
17+
18+
#[derive(Clone, Debug)]
19+
#[cfg_attr(test, derive(serde::Serialize))]
20+
#[cfg_attr(test, serde(untagged))]
21+
pub enum PartialBulkWriteResult {
22+
Summary(SummaryBulkWriteResult),
23+
Verbose(VerboseBulkWriteResult),
24+
}
25+
26+
impl PartialBulkWriteResult {
27+
pub(crate) fn merge(&mut self, other: Self) {
28+
match (self, other) {
29+
(Self::Summary(this), Self::Summary(other)) => this.merge(other),
30+
(Self::Verbose(this), Self::Verbose(other)) => this.merge(other),
31+
// The operation execution path makes this an unreachable state
32+
_ => {}
33+
}
34+
}
1635
}
1736

1837
impl BulkWriteError {
19-
pub(crate) fn merge(&mut self, other: BulkWriteError) {
38+
pub(crate) fn merge(&mut self, other: Self) {
2039
self.write_concern_errors.extend(other.write_concern_errors);
2140
self.write_errors.extend(other.write_errors);
2241
if let Some(other_partial_result) = other.partial_result {
2342
self.merge_partial_results(other_partial_result);
2443
}
2544
}
2645

27-
pub(crate) fn merge_partial_results(&mut self, other_partial_result: BulkWriteResult) {
46+
pub(crate) fn merge_partial_results(&mut self, other_partial_result: PartialBulkWriteResult) {
2847
if let Some(ref mut partial_result) = self.partial_result {
2948
partial_result.merge(other_partial_result);
3049
} else {

0 commit comments

Comments
 (0)