Skip to content

Commit f062d16

Browse files
kevinAlbseramongodb
andcommitted
CDRIVER-5869 fix bulk write crash with verbose results (#1843)
* do not use `bson_iter_t` to store `_id` location The `bson_iter_t` stores a pointer to the referred data. The payload buffer `mongoc_bulkwrite_t::ops` may be reallocated and invalidate existing iterators. Instead, store integral offsets into the payload. * fix comment describing `ops_byte_len` and `ops_doc_len` --------- Co-authored-by: Ezra Chung <[email protected]>
1 parent f5d6d9a commit f062d16

File tree

2 files changed

+83
-17
lines changed

2 files changed

+83
-17
lines changed

src/libmongoc/src/mongoc/mongoc-bulkwrite.c

+22-17
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@ mongoc_bulkwriteopts_destroy (mongoc_bulkwriteopts_t *self)
140140
typedef enum { MODEL_OP_INSERT, MODEL_OP_UPDATE, MODEL_OP_DELETE } model_op_t;
141141
typedef struct {
142142
model_op_t op;
143-
bson_iter_t id_iter;
143+
// `id_loc` locates the "_id" field of an insert document.
144+
struct {
145+
size_t op_start; // Offset in `mongoc_bulkwrite_t::ops` to the BSON for the insert op: { "document": ... }
146+
size_t op_len; // Length of insert op.
147+
uint32_t id_offset; // Offset in the insert op to the "_id" field.
148+
} id_loc;
144149
char *ns;
145150
} modeldata_t;
146151

@@ -271,19 +276,14 @@ mongoc_bulkwrite_append_insertone (mongoc_bulkwrite_t *self,
271276
persisted_id_offset += existing_id_offset;
272277
}
273278

274-
BSON_ASSERT (_mongoc_buffer_append (&self->ops, bson_get_data (&op), op.len));
275-
276-
// Store an iterator to the document's `_id` in the persisted payload:
277-
bson_iter_t persisted_id_iter;
278-
{
279-
BSON_ASSERT (mcommon_in_range_size_t_unsigned (op.len));
280-
size_t start = self->ops.len - (size_t) op.len;
281-
BSON_ASSERT (bson_iter_init_from_data_at_offset (
282-
&persisted_id_iter, self->ops.data + start, (size_t) op.len, persisted_id_offset, strlen ("_id")));
283-
}
279+
size_t op_start = self->ops.len; // Save location of `op` to retrieve `_id` later.
280+
BSON_ASSERT (mcommon_in_range_size_t_unsigned (op.len));
281+
BSON_ASSERT (_mongoc_buffer_append (&self->ops, bson_get_data (&op), (size_t) op.len));
284282

285283
self->n_ops++;
286-
modeldata_t md = {.op = MODEL_OP_INSERT, .id_iter = persisted_id_iter, .ns = bson_strdup (ns)};
284+
modeldata_t md = {.op = MODEL_OP_INSERT,
285+
.id_loc = {.op_start = op_start, .op_len = (size_t) op.len, .id_offset = persisted_id_offset},
286+
.ns = bson_strdup (ns)};
287287
_mongoc_array_append_val (&self->arrayof_modeldata, md);
288288
bson_destroy (&op);
289289
return true;
@@ -1310,7 +1310,8 @@ static bool
13101310
_bulkwritereturn_apply_result (mongoc_bulkwritereturn_t *self,
13111311
const bson_t *result,
13121312
size_t ops_doc_offset,
1313-
const mongoc_array_t *arrayof_modeldata)
1313+
const mongoc_array_t *arrayof_modeldata,
1314+
const mongoc_buffer_t *ops)
13141315
{
13151316
BSON_ASSERT_PARAM (self);
13161317
BSON_ASSERT_PARAM (result);
@@ -1428,7 +1429,10 @@ _bulkwritereturn_apply_result (mongoc_bulkwritereturn_t *self,
14281429
break;
14291430
}
14301431
case MODEL_OP_INSERT: {
1431-
_bulkwriteresult_set_insertresult (self->res, &md->id_iter, models_idx);
1432+
bson_iter_t id_iter;
1433+
BSON_ASSERT (bson_iter_init_from_data_at_offset (
1434+
&id_iter, ops->data + md->id_loc.op_start, md->id_loc.op_len, md->id_loc.id_offset, strlen ("_id")));
1435+
_bulkwriteresult_set_insertresult (self->res, &id_iter, models_idx);
14321436
break;
14331437
}
14341438
default:
@@ -1643,9 +1647,9 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
16431647
bool batch_ok = false;
16441648
bson_t cmd_reply = BSON_INITIALIZER;
16451649
mongoc_cursor_t *reply_cursor = NULL;
1646-
// `ops_byte_len` is the number of documents from `ops` to send in this batch.
1650+
// `ops_byte_len` is the number of bytes from `ops` to send in this batch.
16471651
size_t ops_byte_len = 0;
1648-
// `ops_doc_len` is the number of bytes from `ops` to send in this batch.
1652+
// `ops_doc_len` is the number of documents from `ops` to send in this batch.
16491653
size_t ops_doc_len = 0;
16501654

16511655
if (ops_byte_offset == self->ops.len) {
@@ -1837,7 +1841,8 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
18371841
// Iterate over cursor results.
18381842
const bson_t *result;
18391843
while (mongoc_cursor_next (reply_cursor, &result)) {
1840-
if (!_bulkwritereturn_apply_result (&ret, result, ops_doc_offset, &self->arrayof_modeldata)) {
1844+
if (!_bulkwritereturn_apply_result (
1845+
&ret, result, ops_doc_offset, &self->arrayof_modeldata, &self->ops)) {
18411846
goto batch_fail;
18421847
}
18431848
}

src/libmongoc/tests/test-mongoc-bulkwrite.c

+61
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,59 @@ test_bulkwrite_many_namespaces (void *ctx)
592592
}
593593

594594

595+
// `test_bulkwrite_two_large_inserts` is a regression test for CDRIVER-5869.
596+
static void
597+
test_bulkwrite_two_large_inserts (void *unused)
598+
{
599+
BSON_UNUSED (unused);
600+
601+
bson_error_t error;
602+
mongoc_client_t *client = test_framework_new_default_client ();
603+
604+
// Drop prior collection:
605+
{
606+
mongoc_collection_t *coll = mongoc_client_get_collection (client, "db", "coll");
607+
mongoc_collection_drop (coll, NULL);
608+
mongoc_collection_destroy (coll);
609+
}
610+
611+
// Allocate a large string:
612+
size_t large_len = 2095652;
613+
char *large_string = bson_malloc (large_len + 1);
614+
memset (large_string, 'a', large_len);
615+
large_string[large_len] = '\0';
616+
ASSERT (bson_in_range_unsigned (int, large_len));
617+
618+
// Create two large documents:
619+
bson_t *docs[2];
620+
docs[0] = BCON_NEW ("_id", "over_2mib_1");
621+
bson_append_utf8 (docs[0], "unencrypted", -1, large_string, (int) large_len);
622+
docs[1] = BCON_NEW ("_id", "over_2mib_2");
623+
bson_append_utf8 (docs[1], "unencrypted", -1, large_string, (int) large_len);
624+
625+
mongoc_bulkwriteopts_t *bw_opts = mongoc_bulkwriteopts_new ();
626+
mongoc_bulkwriteopts_set_verboseresults (bw_opts, true);
627+
628+
mongoc_bulkwrite_t *bw = mongoc_client_bulkwrite_new (client);
629+
ASSERT_OR_PRINT (mongoc_bulkwrite_append_insertone (bw, "db.coll", docs[0], NULL, &error), error);
630+
ASSERT_OR_PRINT (mongoc_bulkwrite_append_insertone (bw, "db.coll", docs[1], NULL, &error), error);
631+
632+
mongoc_bulkwritereturn_t bwr = mongoc_bulkwrite_execute (bw, bw_opts);
633+
ASSERT_NO_BULKWRITEEXCEPTION (bwr);
634+
ASSERT (bwr.res);
635+
const bson_t *insertresults = mongoc_bulkwriteresult_insertresults (bwr.res);
636+
ASSERT_MATCH (insertresults,
637+
BSON_STR ({"0" : {"insertedId" : "over_2mib_1"}}, {"1" : {"insertedId" : "over_2mib_2"}}));
638+
bson_destroy (docs[0]);
639+
bson_destroy (docs[1]);
640+
mongoc_bulkwrite_destroy (bw);
641+
mongoc_bulkwriteresult_destroy (bwr.res);
642+
mongoc_bulkwriteexception_destroy (bwr.exc);
643+
mongoc_bulkwriteopts_destroy (bw_opts);
644+
mongoc_client_destroy (client);
645+
bson_free (large_string);
646+
}
647+
595648
void
596649
test_bulkwrite_install (TestSuite *suite)
597650
{
@@ -677,4 +730,12 @@ test_bulkwrite_install (TestSuite *suite)
677730
test_framework_skip_if_max_wire_version_less_than_25, // require server 8.0
678731
test_framework_skip_if_mongos // Creating 100k collections is very slow (~5 minutes) on mongos.
679732
);
733+
734+
TestSuite_AddFull (suite,
735+
"/bulkwrite/two_large_inserts",
736+
test_bulkwrite_two_large_inserts,
737+
NULL /* dtor */,
738+
NULL /* ctx */,
739+
test_framework_skip_if_max_wire_version_less_than_25 // require server 8.0
740+
);
680741
}

0 commit comments

Comments
 (0)