Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/action/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ use super::{action_impl, option_setters};
impl Client {
pub fn bulk_write(
&self,
models: impl IntoIterator<Item = WriteModel>,
models: impl IntoIterator<Item = impl Into<WriteModel>>,
) -> BulkWrite<SummaryBulkWriteResult> {
BulkWrite::new(self, models.into_iter().collect())
let mut models_vec = Vec::new();
for model in models.into_iter() {
models_vec.push(model.into());
}
Comment on lines +22 to +25
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think models.into_iter().map(Into::into).collect() will do two passes, did a manual impl here to be safe

BulkWrite::new(self, models_vec)
}
}

Expand Down
299 changes: 220 additions & 79 deletions src/client/options/bulk_write.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
#![allow(missing_docs)]

use std::borrow::Borrow;

use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use typed_builder::TypedBuilder;

use crate::{
bson::{rawdoc, Array, Bson, Document, RawDocumentBuf},
bson_util::{get_or_prepend_id_field, replacement_document_check, update_document_check},
error::Result,
options::{UpdateModifications, WriteConcern},
serde_util::serialize_bool_or_true,
Collection,
Namespace,
};

Expand All @@ -32,66 +36,205 @@ pub struct BulkWriteOptions {
#[serde(untagged)]
#[non_exhaustive]
pub enum WriteModel {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypedBuilder doesn't work for enums with directly-embedded fields

#[non_exhaustive]
InsertOne {
#[serde(skip)]
namespace: Namespace,
document: Document,
},
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
UpdateOne {
#[serde(skip)]
namespace: Namespace,
filter: Document,
#[serde(rename = "updateMods")]
update: UpdateModifications,
array_filters: Option<Array>,
collation: Option<Document>,
hint: Option<Bson>,
upsert: Option<bool>,
},
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
UpdateMany {
#[serde(skip)]
namespace: Namespace,
filter: Document,
#[serde(rename = "updateMods")]
update: UpdateModifications,
array_filters: Option<Array>,
collation: Option<Document>,
hint: Option<Bson>,
upsert: Option<bool>,
},
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
ReplaceOne {
#[serde(skip)]
namespace: Namespace,
filter: Document,
#[serde(rename = "updateMods")]
replacement: Document,
collation: Option<Document>,
hint: Option<Bson>,
upsert: Option<bool>,
},
#[non_exhaustive]
DeleteOne {
#[serde(skip)]
namespace: Namespace,
filter: Document,
collation: Option<Document>,
hint: Option<Bson>,
},
#[non_exhaustive]
DeleteMany {
#[serde(skip)]
namespace: Namespace,
InsertOne(InsertOneModel),
UpdateOne(UpdateOneModel),
UpdateMany(UpdateManyModel),
ReplaceOne(ReplaceOneModel),
DeleteOne(DeleteOneModel),
DeleteMany(DeleteManyModel),
}

#[skip_serializing_none]
#[derive(Clone, Debug, Serialize, TypedBuilder)]
#[cfg_attr(test, derive(Deserialize))]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct InsertOneModel {
#[serde(skip_serializing)]
#[builder(!default)]
pub namespace: Namespace,

#[builder(!default)]
pub document: Document,
}

impl From<InsertOneModel> for WriteModel {
fn from(model: InsertOneModel) -> Self {
Self::InsertOne(model)
}
}

#[skip_serializing_none]
#[derive(Clone, Debug, Serialize, TypedBuilder)]
#[cfg_attr(test, derive(Deserialize))]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct UpdateOneModel {
#[serde(skip_serializing)]
#[builder(!default)]
pub namespace: Namespace,

#[builder(!default)]
pub filter: Document,

#[serde(rename(serialize = "updateMods"))]
#[builder(!default)]
pub update: UpdateModifications,

pub array_filters: Option<Array>,

pub collation: Option<Document>,

pub hint: Option<Bson>,

pub upsert: Option<bool>,
}

impl From<UpdateOneModel> for WriteModel {
fn from(model: UpdateOneModel) -> Self {
Self::UpdateOne(model)
}
}

#[skip_serializing_none]
#[derive(Clone, Debug, Serialize, TypedBuilder)]
#[cfg_attr(test, derive(Deserialize))]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct UpdateManyModel {
#[serde(skip_serializing)]
#[builder(!default)]
pub namespace: Namespace,

#[builder(!default)]
pub filter: Document,

#[serde(rename(serialize = "updateMods"))]
#[builder(!default)]
pub update: UpdateModifications,

pub array_filters: Option<Array>,

pub collation: Option<Document>,

pub hint: Option<Bson>,

pub upsert: Option<bool>,
}

impl From<UpdateManyModel> for WriteModel {
fn from(model: UpdateManyModel) -> Self {
Self::UpdateMany(model)
}
}

#[skip_serializing_none]
#[derive(Clone, Debug, Serialize, TypedBuilder)]
#[cfg_attr(test, derive(Deserialize))]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct ReplaceOneModel {
#[serde(skip_serializing)]
#[builder(!default)]
pub namespace: Namespace,

#[builder(!default)]
pub filter: Document,

#[serde(rename(serialize = "updateMods"))]
#[builder(!default)]
pub replacement: Document,

pub collation: Option<Document>,

pub hint: Option<Bson>,

pub upsert: Option<bool>,
}

impl From<ReplaceOneModel> for WriteModel {
fn from(model: ReplaceOneModel) -> Self {
Self::ReplaceOne(model)
}
}

#[skip_serializing_none]
#[derive(Clone, Debug, Serialize, TypedBuilder)]
#[cfg_attr(test, derive(Deserialize))]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct DeleteOneModel {
#[serde(skip_serializing)]
#[builder(!default)]
pub namespace: Namespace,

#[builder(!default)]
pub filter: Document,

pub collation: Option<Document>,

pub hint: Option<Bson>,
}

impl From<DeleteOneModel> for WriteModel {
fn from(model: DeleteOneModel) -> Self {
Self::DeleteOne(model)
}
}

#[skip_serializing_none]
#[derive(Clone, Debug, Serialize, TypedBuilder)]
#[cfg_attr(test, derive(Deserialize))]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct DeleteManyModel {
#[serde(skip_serializing)]
#[builder(!default)]
pub namespace: Namespace,

pub filter: Document,

pub collation: Option<Document>,

pub hint: Option<Bson>,
}

impl From<DeleteManyModel> for WriteModel {
fn from(model: DeleteManyModel) -> Self {
Self::DeleteMany(model)
}
}

impl<T> Collection<T>
where
T: Send + Sync + Serialize,
{
pub fn insert_one_model(&self, document: impl Borrow<T>) -> Result<InsertOneModel> {
let document = bson::to_document(document.borrow())?;
Ok(InsertOneModel::builder()
.namespace(self.namespace())
.document(document)
.build())
}

pub fn replace_one_model(
&self,
filter: Document,
collation: Option<Document>,
hint: Option<Bson>,
},
replacement: impl Borrow<T>,
) -> Result<ReplaceOneModel> {
let replacement = bson::to_document(replacement.borrow())?;
Ok(ReplaceOneModel::builder()
.namespace(self.namespace())
.filter(filter)
.replacement(replacement)
.build())
}
}

pub(crate) enum OperationType {
Expand All @@ -103,34 +246,30 @@ pub(crate) enum OperationType {
impl WriteModel {
pub(crate) fn namespace(&self) -> &Namespace {
match self {
Self::InsertOne { namespace, .. } => namespace,
Self::UpdateOne { namespace, .. } => namespace,
Self::UpdateMany { namespace, .. } => namespace,
Self::ReplaceOne { namespace, .. } => namespace,
Self::DeleteOne { namespace, .. } => namespace,
Self::DeleteMany { namespace, .. } => namespace,
Self::InsertOne(model) => &model.namespace,
Self::UpdateOne(model) => &model.namespace,
Self::UpdateMany(model) => &model.namespace,
Self::ReplaceOne(model) => &model.namespace,
Self::DeleteOne(model) => &model.namespace,
Self::DeleteMany(model) => &model.namespace,
}
}

pub(crate) fn operation_type(&self) -> OperationType {
match self {
Self::InsertOne { .. } => OperationType::Insert,
Self::UpdateOne { .. } | Self::UpdateMany { .. } | Self::ReplaceOne { .. } => {
OperationType::Update
}
Self::DeleteOne { .. } | Self::DeleteMany { .. } => OperationType::Delete,
Self::InsertOne(_) => OperationType::Insert,
Self::UpdateOne(_) | Self::UpdateMany(_) | Self::ReplaceOne(_) => OperationType::Update,
Self::DeleteOne(_) | Self::DeleteMany(_) => OperationType::Delete,
}
}

/// Whether this operation should apply to all documents that match the filter. Returns None if
/// the operation does not use a filter.
pub(crate) fn multi(&self) -> Option<bool> {
match self {
Self::UpdateMany { .. } | Self::DeleteMany { .. } => Some(true),
Self::UpdateOne { .. } | Self::ReplaceOne { .. } | Self::DeleteOne { .. } => {
Some(false)
}
Self::InsertOne { .. } => None,
Self::UpdateMany(_) | Self::DeleteMany(_) => Some(true),
Self::UpdateOne(_) | Self::ReplaceOne(_) | Self::DeleteOne(_) => Some(false),
Self::InsertOne(_) => None,
}
}

Expand All @@ -145,17 +284,19 @@ impl WriteModel {
/// Returns the operation-specific fields that should be included in this model's entry in the
/// ops array. Also returns an inserted ID if this is an insert operation.
pub(crate) fn get_ops_document_contents(&self) -> Result<(RawDocumentBuf, Option<Bson>)> {
if let Self::UpdateOne { update, .. } | Self::UpdateMany { update, .. } = self {
if let Self::UpdateOne(UpdateOneModel { update, .. })
| Self::UpdateMany(UpdateManyModel { update, .. }) = self
{
if let UpdateModifications::Document(update_document) = update {
update_document_check(update_document)?;
}
} else if let Self::ReplaceOne { replacement, .. } = self {
} else if let Self::ReplaceOne(ReplaceOneModel { replacement, .. }) = self {
replacement_document_check(replacement)?;
}

let (mut model_document, inserted_id) = match self {
Self::InsertOne { document, .. } => {
let mut insert_document = RawDocumentBuf::from_document(document)?;
Self::InsertOne(model) => {
let mut insert_document = RawDocumentBuf::from_document(&model.document)?;
let inserted_id = get_or_prepend_id_field(&mut insert_document)?;
(rawdoc! { "document": insert_document }, Some(inserted_id))
}
Expand Down
Loading