Skip to content

Commit 65153b1

Browse files
committed
add stub API for client bulk write
1 parent badc2af commit 65153b1

File tree

6 files changed

+548
-0
lines changed

6 files changed

+548
-0
lines changed

src/libmongoc/CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ set (SOURCES ${SOURCES}
657657
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-stream-tls-openssl-bio.c
658658
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-openssl.c
659659
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-ocsp-cache.c
660+
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-bulkwrite.c
660661
)
661662

662663
set (HEADERS
@@ -1216,6 +1217,7 @@ if (ENABLE_EXAMPLES AND ENABLE_SHARED)
12161217
mongoc_add_example (mongoc-ping ${PROJECT_SOURCE_DIR}/examples/mongoc-ping.c)
12171218
mongoc_add_example (mongoc-tail ${PROJECT_SOURCE_DIR}/examples/mongoc-tail.c)
12181219
mongoc_add_example (example-collection-command ${PROJECT_SOURCE_DIR}/examples/example-collection-command.c)
1220+
mongoc_add_example (example-bulkwrite ${PROJECT_SOURCE_DIR}/examples/example-bulkwrite.c)
12191221

12201222
# examples/aggregation/
12211223
mongoc_add_example (aggregation1 ${PROJECT_SOURCE_DIR}/examples/aggregation/aggregation1.c)
+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// example-bulkwrite shows use of `mongoc_client_bulkwrite`.
2+
3+
#include <mongoc/mongoc.h>
4+
5+
#define HANDLE_ERROR(...) \
6+
if (1) { \
7+
fprintf (stderr, __VA_ARGS__); \
8+
fprintf (stderr, "\n"); \
9+
goto fail; \
10+
} else \
11+
(void) 0
12+
13+
int
14+
main (int argc, char *argv[])
15+
{
16+
bool ok = false;
17+
18+
mongoc_init ();
19+
20+
bson_error_t error;
21+
mongoc_client_t *client = mongoc_client_new ("mongodb://localhost:27017");
22+
mongoc_bulkwriteoptions_t *bwo = mongoc_bulkwriteoptions_new ();
23+
mongoc_bulkwriteoptions_set_verboseresults (bwo, true);
24+
mongoc_bulkwrite_t *bw = mongoc_client_bulkwrite_new (client, bwo);
25+
26+
// Insert a document to `db.coll1`
27+
{
28+
bson_t *doc = BCON_NEW ("foo", "1");
29+
if (!mongoc_client_bulkwrite_append_insertone (bw, "db.coll1", -1, doc, NULL, &error)) {
30+
HANDLE_ERROR ("error appending insert one: %s", error.message);
31+
}
32+
bson_destroy (doc);
33+
}
34+
// Insert a document to `db.coll2`
35+
{
36+
bson_t *doc = BCON_NEW ("foo", "2");
37+
if (!mongoc_client_bulkwrite_append_insertone (bw, "db.coll2", -1, doc, NULL, &error)) {
38+
HANDLE_ERROR ("error appending insert one: %s", error.message);
39+
}
40+
bson_destroy (doc);
41+
}
42+
43+
mongoc_bulkwritereturn_t bwr = mongoc_bulkwrite_execute (bw);
44+
45+
printf ("insert count: %" PRId64 "\n", mongoc_bulkwriteresult_insertedcount (bwr.res));
46+
47+
// Print verbose results.
48+
{
49+
const bson_t *vr = mongoc_bulkwriteresult_verboseresults (bwr.res);
50+
BSON_ASSERT (vr);
51+
char *vr_str = bson_as_relaxed_extended_json (vr, NULL);
52+
printf ("verbose results: %s\n", vr_str);
53+
bson_free (vr_str);
54+
}
55+
56+
// Print exception.
57+
if (bwr.exc) {
58+
const bson_t *error_doc;
59+
mongoc_bulkwriteexception_error (bwr.exc, &error, &error_doc);
60+
if (mongoc_error_has_label (error_doc, "RetryableWriteError")) {
61+
printf ("error has label: RetryableWriteError\n");
62+
}
63+
printf ("error: %s\n", error.message);
64+
char *error_doc_str = bson_as_relaxed_extended_json (error_doc, NULL);
65+
printf ("exception: %s\n", error_doc_str);
66+
bson_free (error_doc_str);
67+
}
68+
69+
mongoc_bulkwriteresult_destroy (bwr.res);
70+
mongoc_bulkwriteexception_destroy (bwr.exc);
71+
mongoc_bulkwrite_destroy (bw);
72+
73+
ok = true;
74+
fail:
75+
mongoc_client_destroy (client);
76+
mongoc_bulkwriteoptions_destroy (bwo);
77+
mongoc_cleanup ();
78+
return ok ? EXIT_SUCCESS : EXIT_FAILURE;
79+
}
+303
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/*
2+
* Copyright 2024-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <bson/bson.h>
18+
#include <mongoc/mongoc-error.h>
19+
#include <mongoc-bulkwrite.h>
20+
#include <mongoc-bulkwrite.h>
21+
22+
struct _mongoc_bulkwriteoptions_t {
23+
bool ordered;
24+
bool bypassdocumentvalidation;
25+
const bson_t *let;
26+
const mongoc_write_concern_t *writeconcern;
27+
bool verboseresults;
28+
const bson_t *comment;
29+
mongoc_client_session_t *session;
30+
const bson_t *extra;
31+
uint32_t serverid;
32+
};
33+
34+
mongoc_bulkwriteoptions_t *
35+
mongoc_bulkwriteoptions_new (void)
36+
{
37+
return bson_malloc0 (sizeof (mongoc_bulkwriteoptions_t));
38+
}
39+
void
40+
mongoc_bulkwriteoptions_set_ordered (mongoc_bulkwriteoptions_t *self, bool ordered)
41+
{
42+
BSON_ASSERT_PARAM (self);
43+
self->ordered = ordered;
44+
}
45+
void
46+
mongoc_bulkwriteoptions_set_bypassdocumentvalidation (mongoc_bulkwriteoptions_t *self, bool bypassdocumentvalidation)
47+
{
48+
BSON_ASSERT_PARAM (self);
49+
self->bypassdocumentvalidation = bypassdocumentvalidation;
50+
}
51+
void
52+
mongoc_bulkwriteoptions_set_let (mongoc_bulkwriteoptions_t *self, const bson_t *let)
53+
{
54+
BSON_ASSERT_PARAM (self);
55+
self->let = let;
56+
}
57+
void
58+
mongoc_bulkwriteoptions_set_writeconcern (mongoc_bulkwriteoptions_t *self, const mongoc_write_concern_t *writeconcern)
59+
{
60+
BSON_ASSERT_PARAM (self);
61+
self->writeconcern = writeconcern;
62+
}
63+
void
64+
mongoc_bulkwriteoptions_set_verboseresults (mongoc_bulkwriteoptions_t *self, bool verboseresults)
65+
{
66+
BSON_ASSERT_PARAM (self);
67+
self->verboseresults = verboseresults;
68+
}
69+
void
70+
mongoc_bulkwriteoptions_set_comment (mongoc_bulkwriteoptions_t *self, const bson_t *comment)
71+
{
72+
BSON_ASSERT_PARAM (self);
73+
self->comment = comment;
74+
}
75+
void
76+
mongoc_bulkwriteoptions_set_session (mongoc_bulkwriteoptions_t *self, mongoc_client_session_t *session)
77+
{
78+
BSON_ASSERT_PARAM (self);
79+
self->session = session;
80+
}
81+
void
82+
mongoc_bulkwriteoptions_set_extra (mongoc_bulkwriteoptions_t *self, const bson_t *extra)
83+
{
84+
BSON_ASSERT_PARAM (self);
85+
self->extra = extra;
86+
}
87+
BSON_EXPORT (void)
88+
mongoc_bulkwriteoptions_set_serverid (mongoc_bulkwriteoptions_t *self, uint32_t serverid)
89+
{
90+
BSON_ASSERT_PARAM (self);
91+
self->serverid = serverid;
92+
}
93+
void
94+
mongoc_bulkwriteoptions_destroy (mongoc_bulkwriteoptions_t *self)
95+
{
96+
if (!self) {
97+
return;
98+
}
99+
bson_free (self);
100+
}
101+
102+
103+
struct _mongoc_bulkwrite_t {
104+
mongoc_client_t *client;
105+
bool executed;
106+
};
107+
108+
109+
// `mongoc_client_bulkwrite_new` creates a new bulk write operation.
110+
mongoc_bulkwrite_t *
111+
mongoc_client_bulkwrite_new (mongoc_client_t *self, mongoc_bulkwriteoptions_t *opts)
112+
{
113+
BSON_ASSERT_PARAM (self);
114+
BSON_UNUSED (opts);
115+
mongoc_bulkwrite_t *bw = bson_malloc0 (sizeof (mongoc_bulkwrite_t));
116+
bw->client = self;
117+
return bw;
118+
}
119+
120+
void
121+
mongoc_bulkwrite_destroy (mongoc_bulkwrite_t *self)
122+
{
123+
if (!self) {
124+
return;
125+
}
126+
bson_free (self);
127+
}
128+
129+
struct _mongoc_insertoneopts_t {
130+
bson_validate_flags_t vflags;
131+
};
132+
133+
mongoc_insertoneopts_t *
134+
mongoc_insertoneopts_new (void)
135+
{
136+
return bson_malloc0 (sizeof (mongoc_insertoneopts_t));
137+
}
138+
139+
void
140+
mongoc_insertoneopts_set_validation (mongoc_insertoneopts_t *self, bson_validate_flags_t vflags)
141+
{
142+
BSON_ASSERT_PARAM (self);
143+
self->vflags = vflags;
144+
}
145+
146+
void
147+
mongoc_insertoneopts_destroy (mongoc_insertoneopts_t *self)
148+
{
149+
if (!self) {
150+
return;
151+
}
152+
bson_free (self);
153+
}
154+
155+
bool
156+
mongoc_client_bulkwrite_append_insertone (mongoc_bulkwrite_t *self,
157+
const char *ns,
158+
int ns_len,
159+
const bson_t *document,
160+
mongoc_insertoneopts_t *opts, // may be NULL
161+
bson_error_t *error)
162+
{
163+
BSON_ASSERT_PARAM (self);
164+
BSON_UNUSED (ns);
165+
BSON_UNUSED (ns_len);
166+
BSON_UNUSED (document);
167+
BSON_UNUSED (opts);
168+
169+
if (self->executed) {
170+
bson_set_error (error, MONGOC_ERROR_COMMAND, MONGOC_ERROR_COMMAND_INVALID_ARG, "bulk write already executed");
171+
return false;
172+
}
173+
174+
// TODO: implement.
175+
return true;
176+
}
177+
178+
179+
struct _mongoc_bulkwriteresult_t {
180+
int64_t acknowledged;
181+
int64_t insertedCount;
182+
int64_t upsertedcount;
183+
int64_t matchedcount;
184+
int64_t modifiedcount;
185+
int64_t deletedcount;
186+
bson_t *verbose_results;
187+
uint32_t serverid;
188+
};
189+
190+
bool
191+
mongoc_bulkwriteresult_acknowledged (const mongoc_bulkwriteresult_t *self)
192+
{
193+
BSON_ASSERT_PARAM (self);
194+
return self->acknowledged;
195+
}
196+
197+
int64_t
198+
mongoc_bulkwriteresult_insertedcount (const mongoc_bulkwriteresult_t *self)
199+
{
200+
BSON_ASSERT_PARAM (self);
201+
return self->insertedCount;
202+
}
203+
204+
int64_t
205+
mongoc_bulkwriteresult_upsertedcount (const mongoc_bulkwriteresult_t *self)
206+
{
207+
BSON_ASSERT_PARAM (self);
208+
return self->upsertedcount;
209+
}
210+
211+
int64_t
212+
mongoc_bulkwriteresult_matchedcount (const mongoc_bulkwriteresult_t *self)
213+
{
214+
BSON_ASSERT_PARAM (self);
215+
return self->matchedcount;
216+
}
217+
218+
int64_t
219+
mongoc_bulkwriteresult_modifiedcount (const mongoc_bulkwriteresult_t *self)
220+
{
221+
BSON_ASSERT_PARAM (self);
222+
return self->modifiedcount;
223+
}
224+
225+
int64_t
226+
mongoc_bulkwriteresult_deletedcount (const mongoc_bulkwriteresult_t *self)
227+
{
228+
BSON_ASSERT_PARAM (self);
229+
return self->deletedcount;
230+
}
231+
232+
const bson_t *
233+
mongoc_bulkwriteresult_verboseresults (const mongoc_bulkwriteresult_t *self)
234+
{
235+
BSON_ASSERT_PARAM (self);
236+
return self->verbose_results;
237+
}
238+
239+
uint32_t
240+
mongoc_bulkwriteresult_serverid (const mongoc_bulkwriteresult_t *self)
241+
{
242+
BSON_ASSERT_PARAM (self);
243+
return self->serverid;
244+
}
245+
246+
void
247+
mongoc_bulkwriteresult_destroy (mongoc_bulkwriteresult_t *self)
248+
{
249+
if (!self) {
250+
return;
251+
}
252+
bson_destroy (self->verbose_results);
253+
bson_free (self);
254+
}
255+
256+
struct _mongoc_bulkwriteexception_t {
257+
bson_error_t error;
258+
bson_t *error_document;
259+
};
260+
261+
void
262+
mongoc_bulkwriteexception_error (const mongoc_bulkwriteexception_t *self,
263+
bson_error_t *error,
264+
const bson_t **error_document)
265+
{
266+
BSON_ASSERT_PARAM (self);
267+
memcpy (error, &self->error, sizeof (*error));
268+
if (error_document) {
269+
*error_document = self->error_document;
270+
}
271+
}
272+
273+
void
274+
mongoc_bulkwriteexception_destroy (mongoc_bulkwriteexception_t *self)
275+
{
276+
if (!self) {
277+
return;
278+
}
279+
bson_destroy (self->error_document);
280+
bson_free (self);
281+
}
282+
283+
mongoc_bulkwritereturn_t
284+
mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self)
285+
{
286+
BSON_ASSERT_PARAM (self);
287+
// TODO: implement.
288+
// Create stub results.
289+
mongoc_bulkwriteresult_t *bwr = bson_malloc0 (sizeof (mongoc_bulkwriteresult_t));
290+
bwr->insertedCount = 123;
291+
bwr->verbose_results = bson_new_from_json ((const uint8_t *) BSON_STR ({"foo" : "bar"}), -1, NULL);
292+
BSON_ASSERT (bwr->verbose_results);
293+
294+
mongoc_bulkwriteexception_t *bwe = bson_malloc0 (sizeof (mongoc_bulkwriteexception_t));
295+
bwe->error_document = bson_new_from_json (
296+
(const uint8_t *) BSON_STR (
297+
{"errorLabels" : ["RetryableWriteError"], "writeErrors" : [], "writeConcernErrors" : [], "errorReplies" : []}),
298+
-1,
299+
NULL);
300+
BSON_ASSERT (bwe->error_document);
301+
bson_set_error (&bwe->error, MONGOC_ERROR_SERVER, 123, "This is a stub error");
302+
return (mongoc_bulkwritereturn_t){.res = bwr, .exc = bwe};
303+
}

0 commit comments

Comments
 (0)