Skip to content

Commit 56a89c5

Browse files
CDRIVER-5869 fix bulk write crash with verbose results (#1843)
* do not use `bson_iter_t` to store `_id` location The `bson_iter_t` stores a pointer to the referred data. The payload buffer `mongoc_bulkwrite_t::ops` may be reallocated and invalidate existing iterators. Instead, store integral offsets into the payload. * fix comment describing `ops_byte_len` and `ops_doc_len` --------- Co-authored-by: Ezra Chung <[email protected]>
1 parent 36c90f6 commit 56a89c5

File tree

2 files changed

+83
-17
lines changed

2 files changed

+83
-17
lines changed

src/libmongoc/src/mongoc/mongoc-bulkwrite.c

+22-17
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,12 @@ mongoc_bulkwriteopts_destroy (mongoc_bulkwriteopts_t *self)
139139
typedef enum { MODEL_OP_INSERT, MODEL_OP_UPDATE, MODEL_OP_DELETE } model_op_t;
140140
typedef struct {
141141
model_op_t op;
142-
bson_iter_t id_iter;
142+
// `id_loc` locates the "_id" field of an insert document.
143+
struct {
144+
size_t op_start; // Offset in `mongoc_bulkwrite_t::ops` to the BSON for the insert op: { "document": ... }
145+
size_t op_len; // Length of insert op.
146+
uint32_t id_offset; // Offset in the insert op to the "_id" field.
147+
} id_loc;
143148
char *ns;
144149
} modeldata_t;
145150

@@ -277,19 +282,14 @@ mongoc_bulkwrite_append_insertone (mongoc_bulkwrite_t *self,
277282
persisted_id_offset += existing_id_offset;
278283
}
279284

280-
BSON_ASSERT (_mongoc_buffer_append (&self->ops, bson_get_data (&op), op.len));
281-
282-
// Store an iterator to the document's `_id` in the persisted payload:
283-
bson_iter_t persisted_id_iter;
284-
{
285-
BSON_ASSERT (mcommon_in_range_size_t_unsigned (op.len));
286-
size_t start = self->ops.len - (size_t) op.len;
287-
BSON_ASSERT (bson_iter_init_from_data_at_offset (
288-
&persisted_id_iter, self->ops.data + start, (size_t) op.len, persisted_id_offset, strlen ("_id")));
289-
}
285+
size_t op_start = self->ops.len; // Save location of `op` to retrieve `_id` later.
286+
BSON_ASSERT (mcommon_in_range_size_t_unsigned (op.len));
287+
BSON_ASSERT (_mongoc_buffer_append (&self->ops, bson_get_data (&op), (size_t) op.len));
290288

291289
self->n_ops++;
292-
modeldata_t md = {.op = MODEL_OP_INSERT, .id_iter = persisted_id_iter, .ns = bson_strdup (ns)};
290+
modeldata_t md = {.op = MODEL_OP_INSERT,
291+
.id_loc = {.op_start = op_start, .op_len = (size_t) op.len, .id_offset = persisted_id_offset},
292+
.ns = bson_strdup (ns)};
293293
_mongoc_array_append_val (&self->arrayof_modeldata, md);
294294
bson_destroy (&op);
295295
return true;
@@ -1340,7 +1340,8 @@ static bool
13401340
_bulkwritereturn_apply_result (mongoc_bulkwritereturn_t *self,
13411341
const bson_t *result,
13421342
size_t ops_doc_offset,
1343-
const mongoc_array_t *arrayof_modeldata)
1343+
const mongoc_array_t *arrayof_modeldata,
1344+
const mongoc_buffer_t *ops)
13441345
{
13451346
BSON_ASSERT_PARAM (self);
13461347
BSON_ASSERT_PARAM (result);
@@ -1458,7 +1459,10 @@ _bulkwritereturn_apply_result (mongoc_bulkwritereturn_t *self,
14581459
break;
14591460
}
14601461
case MODEL_OP_INSERT: {
1461-
_bulkwriteresult_set_insertresult (self->res, &md->id_iter, models_idx);
1462+
bson_iter_t id_iter;
1463+
BSON_ASSERT (bson_iter_init_from_data_at_offset (
1464+
&id_iter, ops->data + md->id_loc.op_start, md->id_loc.op_len, md->id_loc.id_offset, strlen ("_id")));
1465+
_bulkwriteresult_set_insertresult (self->res, &id_iter, models_idx);
14621466
break;
14631467
}
14641468
default:
@@ -1705,9 +1709,9 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
17051709
bool batch_ok = false;
17061710
bson_t cmd_reply = BSON_INITIALIZER;
17071711
mongoc_cursor_t *reply_cursor = NULL;
1708-
// `ops_byte_len` is the number of documents from `ops` to send in this batch.
1712+
// `ops_byte_len` is the number of bytes from `ops` to send in this batch.
17091713
size_t ops_byte_len = 0;
1710-
// `ops_doc_len` is the number of bytes from `ops` to send in this batch.
1714+
// `ops_doc_len` is the number of documents from `ops` to send in this batch.
17111715
size_t ops_doc_len = 0;
17121716

17131717
if (ops_byte_offset == self->ops.len) {
@@ -1903,7 +1907,8 @@ mongoc_bulkwrite_execute (mongoc_bulkwrite_t *self, const mongoc_bulkwriteopts_t
19031907
// Iterate over cursor results.
19041908
const bson_t *result;
19051909
while (mongoc_cursor_next (reply_cursor, &result)) {
1906-
if (!_bulkwritereturn_apply_result (&ret, result, ops_doc_offset, &self->arrayof_modeldata)) {
1910+
if (!_bulkwritereturn_apply_result (
1911+
&ret, result, ops_doc_offset, &self->arrayof_modeldata, &self->ops)) {
19071912
goto batch_fail;
19081913
}
19091914
}

src/libmongoc/tests/test-mongoc-bulkwrite.c

+61
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,59 @@ test_bulkwrite_execute_requires_client (void *ctx)
629629
mongoc_client_destroy (client);
630630
}
631631

632+
// `test_bulkwrite_two_large_inserts` is a regression test for CDRIVER-5869.
633+
static void
634+
test_bulkwrite_two_large_inserts (void *unused)
635+
{
636+
BSON_UNUSED (unused);
637+
638+
bson_error_t error;
639+
mongoc_client_t *client = test_framework_new_default_client ();
640+
641+
// Drop prior collection:
642+
{
643+
mongoc_collection_t *coll = mongoc_client_get_collection (client, "db", "coll");
644+
mongoc_collection_drop (coll, NULL);
645+
mongoc_collection_destroy (coll);
646+
}
647+
648+
// Allocate a large string:
649+
size_t large_len = 2095652;
650+
char *large_string = bson_malloc (large_len + 1);
651+
memset (large_string, 'a', large_len);
652+
large_string[large_len] = '\0';
653+
ASSERT (mcommon_in_range_unsigned (int, large_len));
654+
655+
// Create two large documents:
656+
bson_t *docs[2];
657+
docs[0] = BCON_NEW ("_id", "over_2mib_1");
658+
bson_append_utf8 (docs[0], "unencrypted", -1, large_string, (int) large_len);
659+
docs[1] = BCON_NEW ("_id", "over_2mib_2");
660+
bson_append_utf8 (docs[1], "unencrypted", -1, large_string, (int) large_len);
661+
662+
mongoc_bulkwriteopts_t *bw_opts = mongoc_bulkwriteopts_new ();
663+
mongoc_bulkwriteopts_set_verboseresults (bw_opts, true);
664+
665+
mongoc_bulkwrite_t *bw = mongoc_client_bulkwrite_new (client);
666+
ASSERT_OR_PRINT (mongoc_bulkwrite_append_insertone (bw, "db.coll", docs[0], NULL, &error), error);
667+
ASSERT_OR_PRINT (mongoc_bulkwrite_append_insertone (bw, "db.coll", docs[1], NULL, &error), error);
668+
669+
mongoc_bulkwritereturn_t bwr = mongoc_bulkwrite_execute (bw, bw_opts);
670+
ASSERT_NO_BULKWRITEEXCEPTION (bwr);
671+
ASSERT (bwr.res);
672+
const bson_t *insertresults = mongoc_bulkwriteresult_insertresults (bwr.res);
673+
ASSERT_MATCH (insertresults,
674+
BSON_STR ({"0" : {"insertedId" : "over_2mib_1"}}, {"1" : {"insertedId" : "over_2mib_2"}}));
675+
bson_destroy (docs[0]);
676+
bson_destroy (docs[1]);
677+
mongoc_bulkwrite_destroy (bw);
678+
mongoc_bulkwriteresult_destroy (bwr.res);
679+
mongoc_bulkwriteexception_destroy (bwr.exc);
680+
mongoc_bulkwriteopts_destroy (bw_opts);
681+
mongoc_client_destroy (client);
682+
bson_free (large_string);
683+
}
684+
632685
void
633686
test_bulkwrite_install (TestSuite *suite)
634687
{
@@ -722,4 +775,12 @@ test_bulkwrite_install (TestSuite *suite)
722775
NULL /* ctx */,
723776
test_framework_skip_if_max_wire_version_less_than_25 // require server 8.0
724777
);
778+
779+
TestSuite_AddFull (suite,
780+
"/bulkwrite/two_large_inserts",
781+
test_bulkwrite_two_large_inserts,
782+
NULL /* dtor */,
783+
NULL /* ctx */,
784+
test_framework_skip_if_max_wire_version_less_than_25 // require server 8.0
785+
);
725786
}

0 commit comments

Comments
 (0)