Skip to content

Commit 3a96e7b

Browse files
committed
add stub API for client bulk write
1 parent badc2af commit 3a96e7b

File tree

5 files changed

+504
-0
lines changed

5 files changed

+504
-0
lines changed

src/libmongoc/CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ set (SOURCES ${SOURCES}
657657
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-stream-tls-openssl-bio.c
658658
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-openssl.c
659659
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-ocsp-cache.c
660+
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-bulkwrite.c
660661
)
661662

662663
set (HEADERS
@@ -1216,6 +1217,7 @@ if (ENABLE_EXAMPLES AND ENABLE_SHARED)
12161217
mongoc_add_example (mongoc-ping ${PROJECT_SOURCE_DIR}/examples/mongoc-ping.c)
12171218
mongoc_add_example (mongoc-tail ${PROJECT_SOURCE_DIR}/examples/mongoc-tail.c)
12181219
mongoc_add_example (example-collection-command ${PROJECT_SOURCE_DIR}/examples/example-collection-command.c)
1220+
mongoc_add_example (example-bulkwrite ${PROJECT_SOURCE_DIR}/examples/example-bulkwrite.c)
12191221

12201222
# examples/aggregation/
12211223
mongoc_add_example (aggregation1 ${PROJECT_SOURCE_DIR}/examples/aggregation/aggregation1.c)
+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// example-bulkwrite shows use of `mongoc_client_bulkwrite`.
2+
3+
#include <mongoc/mongoc.h>
4+
5+
#define HANDLE_ERROR(...) \
6+
if (1) { \
7+
fprintf (stderr, __VA_ARGS__); \
8+
fprintf (stderr, "\n"); \
9+
goto fail; \
10+
} else \
11+
(void) 0
12+
13+
int
14+
main (int argc, char *argv[])
15+
{
16+
bool ok = false;
17+
18+
mongoc_init ();
19+
20+
bson_error_t error;
21+
mongoc_client_t *client = mongoc_client_new ("mongodb://localhost:27017");
22+
mongoc_bulkwriteoptions_t *bwo = mongoc_bulkwriteoptions_new ();
23+
mongoc_bulkwriteoptions_set_verboseresults (bwo, true);
24+
mongoc_bulkwrite_t *bw = mongoc_client_bulkwrite_new (client, bwo);
25+
26+
// Insert a document to `db.coll1`
27+
{
28+
bson_t *doc = BCON_NEW ("foo", "1");
29+
if (!mongoc_client_bulkwrite_append_insertone (bw, "db.coll1", -1, doc, NULL, &error)) {
30+
HANDLE_ERROR ("error appending insert one: %s", error.message);
31+
}
32+
bson_destroy (doc);
33+
}
34+
// Insert a document to `db.coll2`
35+
{
36+
bson_t *doc = BCON_NEW ("foo", "2");
37+
if (!mongoc_client_bulkwrite_append_insertone (bw, "db.coll2", -1, doc, NULL, &error)) {
38+
HANDLE_ERROR ("error appending insert one: %s", error.message);
39+
}
40+
bson_destroy (doc);
41+
}
42+
43+
mongoc_bulkwritereturn_t bwr = mongoc_bulkwrite_execute (bw);
44+
45+
printf ("insert count: %" PRId64 "\n", mongoc_bulkwriteresult_insertedcount (bwr.res));
46+
47+
// Print verbose results.
48+
{
49+
const bson_t *vr = mongoc_bulkwriteresult_verboseresults (bwr.res);
50+
BSON_ASSERT (vr);
51+
char *vr_str = bson_as_relaxed_extended_json (vr, NULL);
52+
printf ("verbose results: %s\n", vr_str);
53+
bson_free (vr_str);
54+
}
55+
56+
// Print exception.
57+
if (bwr.exc) {
58+
const bson_t *error_doc;
59+
mongoc_bulkwriteexception_error (bwr.exc, &error, &error_doc);
60+
if (mongoc_error_has_label (error_doc, "RetryableWriteError")) {
61+
printf ("error has label: RetryableWriteError\n");
62+
}
63+
printf ("error: %s\n", error.message);
64+
char *error_doc_str = bson_as_relaxed_extended_json (error_doc, NULL);
65+
printf ("exception: %s\n", error_doc_str);
66+
bson_free (error_doc_str);
67+
}
68+
69+
mongoc_bulkwriteresult_destroy (bwr.res);
70+
mongoc_bulkwriteexception_destroy (bwr.exc);
71+
mongoc_bulkwrite_destroy (bw);
72+
73+
ok = true;
74+
fail:
75+
mongoc_client_destroy (client);
76+
mongoc_bulkwriteoptions_destroy (bwo);
77+
mongoc_cleanup ();
78+
return ok ? EXIT_SUCCESS : EXIT_FAILURE;
79+
}
+303
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/*
2+
* Copyright 2024-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <bson/bson.h>
18+
#include <mongoc/mongoc-error.h>
19+
#include <mongoc-bulkwrite.h>
20+
21+
struct _mongoc_bulkwriteoptions_t {
22+
bool ordered;
23+
bool bypassdocumentvalidation;
24+
const bson_t *let;
25+
const mongoc_write_concern_t *writeconcern;
26+
bool verboseresults;
27+
const bson_t *comment;
28+
mongoc_client_session_t *session;
29+
const bson_t *extra;
30+
uint32_t serverid;
31+
};
32+
33+
mongoc_bulkwriteoptions_t *
34+
mongoc_bulkwriteoptions_new (void)
35+
{
36+
return bson_malloc0 (sizeof (mongoc_bulkwriteoptions_t));
37+
}
38+
void
39+
mongoc_bulkwriteoptions_set_ordered (mongoc_bulkwriteoptions_t *self, bool ordered)
40+
{
41+
BSON_ASSERT_PARAM (self);
42+
self->ordered = ordered;
43+
}
44+
void
45+
mongoc_bulkwriteoptions_set_bypassdocumentvalidation (mongoc_bulkwriteoptions_t *self, bool bypassdocumentvalidation)
46+
{
47+
BSON_ASSERT_PARAM (self);
48+
self->bypassdocumentvalidation = bypassdocumentvalidation;
49+
}
50+
void
51+
mongoc_bulkwriteoptions_set_let (mongoc_bulkwriteoptions_t *self, const bson_t *let)
52+
{
53+
BSON_ASSERT_PARAM (self);
54+
self->let = let;
55+
}
56+
void
57+
mongoc_bulkwriteoptions_set_writeconcern (mongoc_bulkwriteoptions_t *self, const mongoc_write_concern_t *writeconcern)
58+
{
59+
BSON_ASSERT_PARAM (self);
60+
self->writeconcern = writeconcern;
61+
}
62+
void
63+
mongoc_bulkwriteoptions_set_verboseresults (mongoc_bulkwriteoptions_t *self, bool verboseresults)
64+
{
65+
BSON_ASSERT_PARAM (self);
66+
self->verboseresults = verboseresults;
67+
}
68+
void
69+
mongoc_bulkwriteoptions_set_comment (mongoc_bulkwriteoptions_t *self, const bson_t *comment)
70+
{
71+
BSON_ASSERT_PARAM (self);
72+
self->comment = comment;
73+
}
74+
void
75+
mongoc_bulkwriteoptions_set_session (mongoc_bulkwriteoptions_t *self, mongoc_client_session_t *session)
76+
{
77+
BSON_ASSERT_PARAM (self);
78+
self->session = session;
79+
}
80+
void
81+
mongoc_bulkwriteoptions_set_extra (mongoc_bulkwriteoptions_t *self, const bson_t *extra)
82+
{
83+
BSON_ASSERT_PARAM (self);
84+
self->extra = extra;
85+
}
86+
BSON_EXPORT (void)
87+
mongoc_bulkwriteoptions_set_serverid (mongoc_bulkwriteoptions_t *self, uint32_t serverid)
88+
{
89+
BSON_ASSERT_PARAM (self);
90+
self->serverid = serverid;
91+
}
92+
void
93+
mongoc_bulkwriteoptions_destroy (mongoc_bulkwriteoptions_t *self)
94+
{
95+
if (!self) {
96+
return;
97+
}
98+
bson_free (self);
99+
}
100+
101+
102+
struct _mongoc_bulkwrite_t {
103+
mongoc_client_t *client;
104+
bool executed;
105+
};
106+
107+
108+
// `mongoc_client_bulkwrite_new` creates a new bulk write operation.
109+
mongoc_bulkwrite_t *
110+
mongoc_client_bulkwrite_new (mongoc_client_t *self, mongoc_bulkwriteoptions_t *opts)
111+
{
112+
BSON_ASSERT_PARAM (self);
113+
BSON_UNUSED (opts);
114+
mongoc_bulkwrite_t *bw = bson_malloc0 (sizeof (mongoc_bulkwrite_t));
115+
bw->client = self;
116+
return bw;
117+
}
118+
119+
void
120+
mongoc_bulkwrite_destroy (mongoc_bulkwrite_t *self)
121+
{
122+
if (!self) {
123+
return;
124+
}
125+
bson_free (self);
126+
}
127+
128+
struct _mongoc_insertoneopts_t {
129+
bson_validate_flags_t vflags;
130+
};
131+
132+
mongoc_insertoneopts_t *
133+
mongoc_insertoneopts_new (void)
134+
{
135+
return bson_malloc0 (sizeof (mongoc_insertoneopts_t));
136+
}
137+
138+
void
139+
mongoc_insertoneopts_set_validation (mongoc_insertoneopts_t *self, bson_validate_flags_t vflags)
140+
{
141+
BSON_ASSERT_PARAM (self);
142+
self->vflags = vflags;
143+
}
144+
145+
void
146+
mongoc_insertoneopts_destroy (mongoc_insertoneopts_t *self)
147+
{
148+
if (!self) {
149+
return;
150+
}
151+
bson_free (self);
152+
}
153+
154+
bool
155+
mongoc_client_bulkwrite_append_insertone (mongoc_bulkwrite_t *self,
156+
const char *ns,
157+
int ns_len,
158+
const bson_t *document,
159+
mongoc_insertoneopts_t *opts, // may be NULL
160+
bson_error_t *error)
161+
{
162+
BSON_ASSERT_PARAM (self);
163+
BSON_UNUSED (ns);
164+
BSON_UNUSED (ns_len);
165+
BSON_UNUSED (document);
166+
BSON_UNUSED (opts);
167+
168+
if (self->executed) {
169+
bson_set_error (error, MONGOC_ERROR_COMMAND, MONGOC_ERROR_COMMAND_INVALID_ARG, "bulk write already executed");
170+
return false;
171+
}
172+
173+
// TODO: implement.
174+
return true;
175+
}
176+
177+
178+
struct _mongoc_bulkwriteresult_t {
179+
int64_t acknowledged;
180+
int64_t insertedCount;
181+
int64_t upsertedcount;
182+
int64_t matchedcount;
183+
int64_t modifiedcount;
184+
int64_t deletedcount;
185+
bson_t *verbose_results;
186+
uint32_t serverid;
187+
};
188+
189+
bool
190+
mongoc_bulkwriteresult_acknowledged (const mongoc_bulkwriteresult_t *self)
191+
{
192+
BSON_ASSERT_PARAM (self);
193+
return self->acknowledged;
194+
}
195+
196+
int64_t
197+
mongoc_bulkwriteresult_insertedcount (const mongoc_bulkwriteresult_t *self)
198+
{
199+
BSON_ASSERT_PARAM (self);
200+
return self->insertedCount;
201+
}
202+
203+
int64_t
204+
mongoc_bulkwriteresult_upsertedcount (const mongoc_bulkwriteresult_t *self)
205+
{
206+
BSON_ASSERT_PARAM (self);
207+
return self->upsertedcount;
208+
}
209+
210+
int64_t
211+
mongoc_bulkwriteresult_matchedcount (const mongoc_bulkwriteresult_t *self)
212+
{
213+
BSON_ASSERT_PARAM (self);
214+
return self->matchedcount;
215+
}
216+
217+
int64_t
218+
mongoc_bulkwriteresult_modifiedcount (const mongoc_bulkwriteresult_t *self)
219+
{
220+
BSON_ASSERT_PARAM (self);
221+
return self->modifiedcount;
222+
}
223+
224+
int64_t
225+
mongoc_bulkwriteresult_deletedcount (const mongoc_bulkwriteresult_t *self)
226+
{
227+
BSON_ASSERT_PARAM (self);
228+
return self->deletedcount;
229+
}
230+
231+
const bson_t *
232+
mongoc_bulkwriteresult_verboseresults (const mongoc_bulkwriteresult_t *self)
233+
{
234+
BSON_ASSERT_PARAM (self);
235+
return self->verbose_results;
236+
}
237+
238+
uint32_t
239+
mongoc_bulkwriteresult_serverid (const mongoc_bulkwriteresult_t *self)
240+
{
241+
BSON_ASSERT_PARAM (self);
242+
return self->serverid;
243+
}
244+
245+
void
246+
mongoc_bulkwriteresult_destroy (mongoc_bulkwriteresult_t *self)
247+
{
248+
if (!self) {
249+
return;
250+
}
251+
bson_destroy (self->verbose_results);
252+
bson_free (self);
253+
}
254+
255+
struct _mongoc_bulkwriteexception_t {
256+
bson_error_t error;
257+
bson_t *error_document;
258+
};
259+
260+
void
261+
mongoc_bulkwriteexception_error (const mongoc_bulkwriteexception_t *self,
262+
bson_error_t *error,
263+
const bson_t **error_document)
264+
{
265+
BSON_ASSERT_PARAM (self);
266+
memcpy (error, &self->error, sizeof (*error));
267+
if (error_document) {
268+
*error_document = self->error_document;
269+
}
270+
}
271+
272+
void
273+
mongoc_bulkwriteexception_destroy (mongoc_bulkwriteexception_t *self)
274+
{
275+
if (!self) {
276+
return;
277+
}
278+
bson_destroy (self->error_document);
279+
bson_free (self);
280+
}
281+
282+
mongoc_bulkwritereturn_t
283+
mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self)
284+
{
285+
BSON_ASSERT_PARAM (self);
286+
// TODO: implement.
287+
self->executed = true;
288+
// Create stub results.
289+
mongoc_bulkwriteresult_t *bwr = bson_malloc0 (sizeof (mongoc_bulkwriteresult_t));
290+
bwr->insertedCount = 123;
291+
bwr->verbose_results = bson_new_from_json ((const uint8_t *) BSON_STR ({"foo" : "bar"}), -1, NULL);
292+
BSON_ASSERT (bwr->verbose_results);
293+
294+
mongoc_bulkwriteexception_t *bwe = bson_malloc0 (sizeof (mongoc_bulkwriteexception_t));
295+
bwe->error_document = bson_new_from_json (
296+
(const uint8_t *) BSON_STR (
297+
{"errorLabels" : ["RetryableWriteError"], "writeErrors" : [], "writeConcernErrors" : [], "errorReplies" : []}),
298+
-1,
299+
NULL);
300+
BSON_ASSERT (bwe->error_document);
301+
bson_set_error (&bwe->error, MONGOC_ERROR_SERVER, 123, "This is a stub error");
302+
return (mongoc_bulkwritereturn_t){.res = bwr, .exc = bwe};
303+
}

0 commit comments

Comments
 (0)