diff --git a/src/libmongoc/CMakeLists.txt b/src/libmongoc/CMakeLists.txt index 3070c43ace8..31567af6e73 100644 --- a/src/libmongoc/CMakeLists.txt +++ b/src/libmongoc/CMakeLists.txt @@ -657,6 +657,7 @@ set (SOURCES ${SOURCES} ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-stream-tls-openssl-bio.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-openssl.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-ocsp-cache.c + ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-bulkwrite.c ) set (HEADERS @@ -665,6 +666,7 @@ set (HEADERS ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc.h ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-apm.h ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-bulk-operation.h + ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-bulkwrite.h ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-change-stream.h ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-client.h ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-client-pool.h @@ -1216,6 +1218,7 @@ if (ENABLE_EXAMPLES AND ENABLE_SHARED) mongoc_add_example (mongoc-ping ${PROJECT_SOURCE_DIR}/examples/mongoc-ping.c) mongoc_add_example (mongoc-tail ${PROJECT_SOURCE_DIR}/examples/mongoc-tail.c) mongoc_add_example (example-collection-command ${PROJECT_SOURCE_DIR}/examples/example-collection-command.c) + mongoc_add_example (example-bulkwrite ${PROJECT_SOURCE_DIR}/examples/example-bulkwrite.c) # examples/aggregation/ mongoc_add_example (aggregation1 ${PROJECT_SOURCE_DIR}/examples/aggregation/aggregation1.c) diff --git a/src/libmongoc/examples/example-bulkwrite.c b/src/libmongoc/examples/example-bulkwrite.c new file mode 100644 index 00000000000..4860b04ad36 --- /dev/null +++ b/src/libmongoc/examples/example-bulkwrite.c @@ -0,0 +1,79 @@ +// example-bulkwrite shows use of `mongoc_client_bulkwrite`. + +#include + +#define HANDLE_ERROR(...) \ + if (1) { \ + fprintf (stderr, __VA_ARGS__); \ + fprintf (stderr, "\n"); \ + goto fail; \ + } else \ + (void) 0 + +int +main (int argc, char *argv[]) +{ + bool ok = false; + + mongoc_init (); + + bson_error_t error; + mongoc_client_t *client = mongoc_client_new ("mongodb://localhost:27017"); + mongoc_bulkwriteoptions_t *bwo = mongoc_bulkwriteoptions_new (); + mongoc_bulkwriteoptions_set_verboseresults (bwo, true); + mongoc_bulkwrite_t *bw = mongoc_client_bulkwrite_new (client, bwo); + + // Insert a document to `db.coll1` + { + bson_t *doc = BCON_NEW ("foo", "1"); + if (!mongoc_client_bulkwrite_append_insertone (bw, "db.coll1", -1, doc, NULL, &error)) { + HANDLE_ERROR ("error appending insert one: %s", error.message); + } + bson_destroy (doc); + } + // Insert a document to `db.coll2` + { + bson_t *doc = BCON_NEW ("foo", "2"); + if (!mongoc_client_bulkwrite_append_insertone (bw, "db.coll2", -1, doc, NULL, &error)) { + HANDLE_ERROR ("error appending insert one: %s", error.message); + } + bson_destroy (doc); + } + + mongoc_bulkwritereturn_t bwr = mongoc_bulkwrite_execute (bw); + + printf ("insert count: %" PRId64 "\n", mongoc_bulkwriteresult_insertedcount (bwr.res)); + + // Print verbose results. + { + const bson_t *vr = mongoc_bulkwriteresult_verboseresults (bwr.res); + BSON_ASSERT (vr); + char *vr_str = bson_as_relaxed_extended_json (vr, NULL); + printf ("verbose results: %s\n", vr_str); + bson_free (vr_str); + } + + // Print exception. + if (bwr.exc) { + const bson_t *error_doc; + mongoc_bulkwriteexception_error (bwr.exc, &error, &error_doc); + if (mongoc_error_has_label (error_doc, "RetryableWriteError")) { + printf ("error has label: RetryableWriteError\n"); + } + printf ("error: %s\n", error.message); + char *error_doc_str = bson_as_relaxed_extended_json (error_doc, NULL); + printf ("exception: %s\n", error_doc_str); + bson_free (error_doc_str); + } + + mongoc_bulkwriteresult_destroy (bwr.res); + mongoc_bulkwriteexception_destroy (bwr.exc); + mongoc_bulkwrite_destroy (bw); + + ok = true; +fail: + mongoc_client_destroy (client); + mongoc_bulkwriteoptions_destroy (bwo); + mongoc_cleanup (); + return ok ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/src/libmongoc/src/mongoc/mongoc-bulkwrite.c b/src/libmongoc/src/mongoc/mongoc-bulkwrite.c new file mode 100644 index 00000000000..2b61026cdf8 --- /dev/null +++ b/src/libmongoc/src/mongoc/mongoc-bulkwrite.c @@ -0,0 +1,303 @@ +/* + * Copyright 2024-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +struct _mongoc_bulkwriteoptions_t { + bool ordered; + bool bypassdocumentvalidation; + const bson_t *let; + const mongoc_write_concern_t *writeconcern; + bool verboseresults; + const bson_t *comment; + mongoc_client_session_t *session; + const bson_t *extra; + uint32_t serverid; +}; + +mongoc_bulkwriteoptions_t * +mongoc_bulkwriteoptions_new (void) +{ + return bson_malloc0 (sizeof (mongoc_bulkwriteoptions_t)); +} +void +mongoc_bulkwriteoptions_set_ordered (mongoc_bulkwriteoptions_t *self, bool ordered) +{ + BSON_ASSERT_PARAM (self); + self->ordered = ordered; +} +void +mongoc_bulkwriteoptions_set_bypassdocumentvalidation (mongoc_bulkwriteoptions_t *self, bool bypassdocumentvalidation) +{ + BSON_ASSERT_PARAM (self); + self->bypassdocumentvalidation = bypassdocumentvalidation; +} +void +mongoc_bulkwriteoptions_set_let (mongoc_bulkwriteoptions_t *self, const bson_t *let) +{ + BSON_ASSERT_PARAM (self); + self->let = let; +} +void +mongoc_bulkwriteoptions_set_writeconcern (mongoc_bulkwriteoptions_t *self, const mongoc_write_concern_t *writeconcern) +{ + BSON_ASSERT_PARAM (self); + self->writeconcern = writeconcern; +} +void +mongoc_bulkwriteoptions_set_verboseresults (mongoc_bulkwriteoptions_t *self, bool verboseresults) +{ + BSON_ASSERT_PARAM (self); + self->verboseresults = verboseresults; +} +void +mongoc_bulkwriteoptions_set_comment (mongoc_bulkwriteoptions_t *self, const bson_t *comment) +{ + BSON_ASSERT_PARAM (self); + self->comment = comment; +} +void +mongoc_bulkwriteoptions_set_session (mongoc_bulkwriteoptions_t *self, mongoc_client_session_t *session) +{ + BSON_ASSERT_PARAM (self); + self->session = session; +} +void +mongoc_bulkwriteoptions_set_extra (mongoc_bulkwriteoptions_t *self, const bson_t *extra) +{ + BSON_ASSERT_PARAM (self); + self->extra = extra; +} +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_serverid (mongoc_bulkwriteoptions_t *self, uint32_t serverid) +{ + BSON_ASSERT_PARAM (self); + self->serverid = serverid; +} +void +mongoc_bulkwriteoptions_destroy (mongoc_bulkwriteoptions_t *self) +{ + if (!self) { + return; + } + bson_free (self); +} + + +struct _mongoc_bulkwrite_t { + mongoc_client_t *client; + bool executed; +}; + + +// `mongoc_client_bulkwrite_new` creates a new bulk write operation. +mongoc_bulkwrite_t * +mongoc_client_bulkwrite_new (mongoc_client_t *self, mongoc_bulkwriteoptions_t *opts) +{ + BSON_ASSERT_PARAM (self); + BSON_UNUSED (opts); + mongoc_bulkwrite_t *bw = bson_malloc0 (sizeof (mongoc_bulkwrite_t)); + bw->client = self; + return bw; +} + +void +mongoc_bulkwrite_destroy (mongoc_bulkwrite_t *self) +{ + if (!self) { + return; + } + bson_free (self); +} + +struct _mongoc_insertoneopts_t { + bson_validate_flags_t vflags; +}; + +mongoc_insertoneopts_t * +mongoc_insertoneopts_new (void) +{ + return bson_malloc0 (sizeof (mongoc_insertoneopts_t)); +} + +void +mongoc_insertoneopts_set_validation (mongoc_insertoneopts_t *self, bson_validate_flags_t vflags) +{ + BSON_ASSERT_PARAM (self); + self->vflags = vflags; +} + +void +mongoc_insertoneopts_destroy (mongoc_insertoneopts_t *self) +{ + if (!self) { + return; + } + bson_free (self); +} + +bool +mongoc_client_bulkwrite_append_insertone (mongoc_bulkwrite_t *self, + const char *ns, + int ns_len, + const bson_t *document, + mongoc_insertoneopts_t *opts, // may be NULL + bson_error_t *error) +{ + BSON_ASSERT_PARAM (self); + BSON_UNUSED (ns); + BSON_UNUSED (ns_len); + BSON_UNUSED (document); + BSON_UNUSED (opts); + + if (self->executed) { + bson_set_error (error, MONGOC_ERROR_COMMAND, MONGOC_ERROR_COMMAND_INVALID_ARG, "bulk write already executed"); + return false; + } + + // TODO: implement. + return true; +} + + +struct _mongoc_bulkwriteresult_t { + int64_t acknowledged; + int64_t insertedCount; + int64_t upsertedcount; + int64_t matchedcount; + int64_t modifiedcount; + int64_t deletedcount; + bson_t *verbose_results; + uint32_t serverid; +}; + +bool +mongoc_bulkwriteresult_acknowledged (const mongoc_bulkwriteresult_t *self) +{ + BSON_ASSERT_PARAM (self); + return self->acknowledged; +} + +int64_t +mongoc_bulkwriteresult_insertedcount (const mongoc_bulkwriteresult_t *self) +{ + BSON_ASSERT_PARAM (self); + return self->insertedCount; +} + +int64_t +mongoc_bulkwriteresult_upsertedcount (const mongoc_bulkwriteresult_t *self) +{ + BSON_ASSERT_PARAM (self); + return self->upsertedcount; +} + +int64_t +mongoc_bulkwriteresult_matchedcount (const mongoc_bulkwriteresult_t *self) +{ + BSON_ASSERT_PARAM (self); + return self->matchedcount; +} + +int64_t +mongoc_bulkwriteresult_modifiedcount (const mongoc_bulkwriteresult_t *self) +{ + BSON_ASSERT_PARAM (self); + return self->modifiedcount; +} + +int64_t +mongoc_bulkwriteresult_deletedcount (const mongoc_bulkwriteresult_t *self) +{ + BSON_ASSERT_PARAM (self); + return self->deletedcount; +} + +const bson_t * +mongoc_bulkwriteresult_verboseresults (const mongoc_bulkwriteresult_t *self) +{ + BSON_ASSERT_PARAM (self); + return self->verbose_results; +} + +uint32_t +mongoc_bulkwriteresult_serverid (const mongoc_bulkwriteresult_t *self) +{ + BSON_ASSERT_PARAM (self); + return self->serverid; +} + +void +mongoc_bulkwriteresult_destroy (mongoc_bulkwriteresult_t *self) +{ + if (!self) { + return; + } + bson_destroy (self->verbose_results); + bson_free (self); +} + +struct _mongoc_bulkwriteexception_t { + bson_error_t error; + bson_t *error_document; +}; + +void +mongoc_bulkwriteexception_error (const mongoc_bulkwriteexception_t *self, + bson_error_t *error, + const bson_t **error_document) +{ + BSON_ASSERT_PARAM (self); + memcpy (error, &self->error, sizeof (*error)); + if (error_document) { + *error_document = self->error_document; + } +} + +void +mongoc_bulkwriteexception_destroy (mongoc_bulkwriteexception_t *self) +{ + if (!self) { + return; + } + bson_destroy (self->error_document); + bson_free (self); +} + +mongoc_bulkwritereturn_t +mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self) +{ + BSON_ASSERT_PARAM (self); + // TODO: implement. + self->executed = true; + // Create stub results. + mongoc_bulkwriteresult_t *bwr = bson_malloc0 (sizeof (mongoc_bulkwriteresult_t)); + bwr->insertedCount = 123; + bwr->verbose_results = bson_new_from_json ((const uint8_t *) BSON_STR ({"foo" : "bar"}), -1, NULL); + BSON_ASSERT (bwr->verbose_results); + + mongoc_bulkwriteexception_t *bwe = bson_malloc0 (sizeof (mongoc_bulkwriteexception_t)); + bwe->error_document = bson_new_from_json ( + (const uint8_t *) BSON_STR ( + {"errorLabels" : ["RetryableWriteError"], "writeErrors" : [], "writeConcernErrors" : [], "errorReplies" : []}), + -1, + NULL); + BSON_ASSERT (bwe->error_document); + bson_set_error (&bwe->error, MONGOC_ERROR_SERVER, 123, "This is a stub error"); + return (mongoc_bulkwritereturn_t){.res = bwr, .exc = bwe}; +} diff --git a/src/libmongoc/src/mongoc/mongoc-bulkwrite.h b/src/libmongoc/src/mongoc/mongoc-bulkwrite.h new file mode 100644 index 00000000000..f483f95b60f --- /dev/null +++ b/src/libmongoc/src/mongoc/mongoc-bulkwrite.h @@ -0,0 +1,121 @@ +/* + * Copyright 2024-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mongoc-prelude.h" + +#ifndef MONGOC_BULKWRITE_H +#define MONGOC_BULKWRITE_H + +#include +#include + +BSON_BEGIN_DECLS + +typedef struct _mongoc_bulkwriteoptions_t mongoc_bulkwriteoptions_t; +BSON_EXPORT (mongoc_bulkwriteoptions_t *) +mongoc_bulkwriteoptions_new (void); +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_ordered (mongoc_bulkwriteoptions_t *self, bool ordered); +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_bypassdocumentvalidation (mongoc_bulkwriteoptions_t *self, bool bypassdocumentvalidation); +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_let (mongoc_bulkwriteoptions_t *self, const bson_t *let); +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_writeconcern (mongoc_bulkwriteoptions_t *self, const mongoc_write_concern_t *writeconcern); +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_verboseresults (mongoc_bulkwriteoptions_t *self, bool verboseresults); +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_comment (mongoc_bulkwriteoptions_t *self, const bson_t *comment); +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_session (mongoc_bulkwriteoptions_t *self, mongoc_client_session_t *session); +// `mongoc_bulkwriteoptions_set_extra` appends `extra` to bulkWrite command. +// It is intended to support future server options. +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_extra (mongoc_bulkwriteoptions_t *self, const bson_t *extra); +// `mongoc_bulkwriteoptions_set_serverid` identifies which server to perform the operation. This is intended for use by +// wrapping drivers that select a server before running the operation. +BSON_EXPORT (void) +mongoc_bulkwriteoptions_set_serverid (mongoc_bulkwriteoptions_t *self, uint32_t serverid); +BSON_EXPORT (void) +mongoc_bulkwriteoptions_destroy (mongoc_bulkwriteoptions_t *self); + +typedef struct _mongoc_bulkwriteresult_t mongoc_bulkwriteresult_t; +BSON_EXPORT (bool) +mongoc_bulkwriteresult_acknowledged (const mongoc_bulkwriteresult_t *self); +BSON_EXPORT (int64_t) +mongoc_bulkwriteresult_insertedcount (const mongoc_bulkwriteresult_t *self); +BSON_EXPORT (int64_t) +mongoc_bulkwriteresult_upsertedcount (const mongoc_bulkwriteresult_t *self); +BSON_EXPORT (int64_t) +mongoc_bulkwriteresult_matchedcount (const mongoc_bulkwriteresult_t *self); +BSON_EXPORT (int64_t) +mongoc_bulkwriteresult_modifiedcount (const mongoc_bulkwriteresult_t *self); +BSON_EXPORT (int64_t) +mongoc_bulkwriteresult_deletedcount (const mongoc_bulkwriteresult_t *self); +// `mongoc_bulkwriteresult_verboseresults` returns a document with the fields: `insertResults`, `updateResult`, +// `deleteResults`. Returns NULL if verbose results were not requested. +BSON_EXPORT (const bson_t *) +mongoc_bulkwriteresult_verboseresults (const mongoc_bulkwriteresult_t *self); +// `mongoc_bulkwriteresult_get_serverid` identifies which server to performed the operation. This may differ from a +// previously set serverid if a retry occurred. This is intended for use by wrapping drivers that select a server before +// running the operation. +BSON_EXPORT (uint32_t) +mongoc_bulkwriteresult_serverid (const mongoc_bulkwriteresult_t *self); +BSON_EXPORT (void) +mongoc_bulkwriteresult_destroy (mongoc_bulkwriteresult_t *self); + +typedef struct _mongoc_bulkwriteexception_t mongoc_bulkwriteexception_t; +// `mongoc_bulkwriteexception_error` sets `error_document` to a document with the fields: `errorLabels`, +// `writeConcernErrors`, `writeErrors`, `errorReplies`. +BSON_EXPORT (void) +mongoc_bulkwriteexception_error (const mongoc_bulkwriteexception_t *self, + bson_error_t *error, + const bson_t **error_document /* May be NULL */); +BSON_EXPORT (void) +mongoc_bulkwriteexception_destroy (mongoc_bulkwriteexception_t *self); + +// `mongoc_bulkwritereturn_t` may outlive `mongoc_bulkwrite_t`. +typedef struct { + mongoc_bulkwriteresult_t *res; + mongoc_bulkwriteexception_t *exc; // May be NULL +} mongoc_bulkwritereturn_t; + +typedef struct _mongoc_insertoneopts_t mongoc_insertoneopts_t; +BSON_EXPORT (mongoc_insertoneopts_t *) +mongoc_insertoneopts_new (void); +BSON_EXPORT (void) +mongoc_insertoneopts_set_validation (mongoc_insertoneopts_t *self, bson_validate_flags_t vflags); +BSON_EXPORT (void) +mongoc_insertoneopts_destroy (mongoc_insertoneopts_t *self); + +typedef struct _mongoc_bulkwrite_t mongoc_bulkwrite_t; +BSON_EXPORT (mongoc_bulkwrite_t *) +mongoc_client_bulkwrite_new (mongoc_client_t *self, mongoc_bulkwriteoptions_t *opts); +BSON_EXPORT (bool) +mongoc_client_bulkwrite_append_insertone (mongoc_bulkwrite_t *self, + const char *ns, + int ns_len, + const bson_t *document, + mongoc_insertoneopts_t *opts /* May be NULL */, + bson_error_t *error); +BSON_EXPORT (mongoc_bulkwritereturn_t) +mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self); +BSON_EXPORT (void) +mongoc_bulkwrite_destroy (mongoc_bulkwrite_t *self); + +BSON_END_DECLS + +#endif // MONGOC_BULKWRITE_H diff --git a/src/libmongoc/src/mongoc/mongoc.h b/src/libmongoc/src/mongoc/mongoc.h index a43304cec7c..f21d3381075 100644 --- a/src/libmongoc/src/mongoc/mongoc.h +++ b/src/libmongoc/src/mongoc/mongoc.h @@ -25,6 +25,7 @@ #include "mongoc-macros.h" #include "mongoc-apm.h" #include "mongoc-bulk-operation.h" +#include "mongoc-bulkwrite.h" #include "mongoc-change-stream.h" #include "mongoc-client.h" #include "mongoc-client-pool.h"