-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Implement splitting and encoding ops
, nsInfo
as separate OP_MSG
sections, implement prose tests
#1495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement splitting and encoding ops
, nsInfo
as separate OP_MSG
sections, implement prose tests
#1495
Changes from 99 commits
48b9614
af854ed
1eb7466
5567324
644d561
f566303
3d13ffd
cf46b46
e049234
0e16427
0d518d8
6b77f78
2ed8e87
07bac95
16f100b
9f8ce2c
fdb90d2
39386fe
f67af3f
53f6883
9a7b668
395af7a
8f504b0
b02a638
a11e5f6
bfbc1cc
1c3c590
1c9c19e
061e605
8320216
f41ed59
4846e0b
dbf9a26
a0005cd
35eee96
21bb22e
182b2f9
fb32fde
f026ee3
7372f13
0fb65d4
e36898f
70bb422
3584de5
5a17ea0
8e1d770
e973615
33ec764
ee893f2
e33f592
f94ad58
310426a
301a2ba
1a57fb6
6f01e61
c3224e9
c057390
dbb6ec8
cb11c44
8bd7a6a
f781bca
ec88c02
1d4a1d3
14bb86e
2127031
bcbde1e
fcbfe08
752fcbe
764d7de
f186b31
3442a73
ffd4f75
3fc86bb
f9c960c
86e5234
fb134f8
a4bf4d0
2837235
b6702a6
24ce6db
ca5d19a
6d8a3e6
076d39a
6b2b7a8
96733a7
8bda529
11653f0
8d545ff
e2aaa2a
52bb622
70fd9f1
038fafa
f430b7e
cf175d4
ea0633e
9fe35e4
056d411
6f68c0e
ee3073e
0e78b67
38c880d
0156135
f07ff6f
3c6c09f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,32 +16,63 @@ | |
|
||
package com.mongodb.internal.connection; | ||
|
||
import com.mongodb.internal.connection.DualMessageSequences.EncodeDocumentsResult; | ||
import com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker; | ||
import com.mongodb.internal.validator.NoOpFieldNameValidator; | ||
import com.mongodb.lang.Nullable; | ||
import org.bson.BsonBinaryWriter; | ||
import org.bson.BsonBinaryWriterSettings; | ||
import org.bson.BsonDocument; | ||
import org.bson.BsonElement; | ||
import org.bson.BsonMaximumSizeExceededException; | ||
import org.bson.BsonValue; | ||
import org.bson.BsonWriter; | ||
import org.bson.BsonWriterSettings; | ||
import org.bson.FieldNameValidator; | ||
import org.bson.codecs.BsonValueCodecProvider; | ||
import org.bson.codecs.Codec; | ||
import org.bson.codecs.Encoder; | ||
import org.bson.codecs.EncoderContext; | ||
import org.bson.codecs.configuration.CodecRegistry; | ||
import org.bson.io.BsonOutput; | ||
|
||
import java.util.List; | ||
|
||
import static com.mongodb.assertions.Assertions.assertTrue; | ||
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED; | ||
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED; | ||
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_REACHED; | ||
import static com.mongodb.internal.connection.MessageSettings.DOCUMENT_HEADROOM_SIZE; | ||
import static java.lang.String.format; | ||
import static org.bson.codecs.configuration.CodecRegistries.fromProviders; | ||
|
||
final class BsonWriterHelper { | ||
private static final int DOCUMENT_HEADROOM = 1024 * 16; | ||
/** | ||
* This class is not part of the public API and may be removed or changed at any time. | ||
*/ | ||
public final class BsonWriterHelper { | ||
private static final CodecRegistry REGISTRY = fromProviders(new BsonValueCodecProvider()); | ||
private static final EncoderContext ENCODER_CONTEXT = EncoderContext.builder().build(); | ||
|
||
static void writeElements(final BsonWriter writer, final List<BsonElement> bsonElements) { | ||
for (BsonElement bsonElement : bsonElements) { | ||
writer.writeName(bsonElement.getName()); | ||
getCodec(bsonElement.getValue()).encode(writer, bsonElement.getValue(), ENCODER_CONTEXT); | ||
static void appendElementsToDocument( | ||
final BsonOutput bsonOutputWithDocument, | ||
final int documentStartPosition, | ||
@Nullable final List<BsonElement> bsonElements) { | ||
int bsonDocumentEndingSize = 1; | ||
int appendFrom = bsonOutputWithDocument.getPosition() - bsonDocumentEndingSize; | ||
BsonBinaryWriter writer = createBsonBinaryWriter(bsonOutputWithDocument, NoOpFieldNameValidator.INSTANCE, null); | ||
// change `writer`s state so that we can append elements | ||
writer.writeStartDocument(); | ||
bsonOutputWithDocument.truncateToPosition(appendFrom); | ||
if (bsonElements != null) { | ||
for (BsonElement element : bsonElements) { | ||
String name = element.getName(); | ||
BsonValue value = element.getValue(); | ||
writer.writeName(name); | ||
encodeUsingRegistry(writer, value); | ||
} | ||
} | ||
// write the BSON document ending | ||
vbabanin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bsonOutputWithDocument.writeByte(0); | ||
backpatchLength(documentStartPosition, bsonOutputWithDocument); | ||
} | ||
|
||
static void writePayloadArray(final BsonWriter writer, final BsonOutput bsonOutput, final MessageSettings settings, | ||
|
@@ -65,16 +96,86 @@ static void writePayload(final BsonWriter writer, final BsonOutput bsonOutput, f | |
} | ||
|
||
if (payload.getPosition() == 0) { | ||
throw new BsonMaximumSizeExceededException(format("Payload document size is larger than maximum of %d.", | ||
payloadSettings.getMaxDocumentSize())); | ||
throw createBsonMaximumSizeExceededException(payloadSettings.getMaxDocumentSize()); | ||
} | ||
} | ||
|
||
/** | ||
* @return See {@link DualMessageSequences#encodeDocuments(WritersProviderAndLimitsChecker)}. | ||
*/ | ||
static EncodeDocumentsResult writeDocumentsOfDualMessageSequences( | ||
final DualMessageSequences dualMessageSequences, | ||
final int commandDocumentSizeInBytes, | ||
final BsonOutput firstOutput, | ||
final BsonOutput secondOutput, | ||
final MessageSettings messageSettings) { | ||
BsonBinaryWriter firstWriter = createBsonBinaryWriter(firstOutput, dualMessageSequences.getFirstFieldNameValidator(), null); | ||
BsonBinaryWriter secondWriter = createBsonBinaryWriter(secondOutput, dualMessageSequences.getSecondFieldNameValidator(), null); | ||
// the size of operation-agnostic command fields (a.k.a. extra elements) is counted towards `messageOverheadInBytes` | ||
int messageOverheadInBytes = 1000; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this limitation apply only to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if limitation is the right word here. But yes, this is unique to client bulk writes:
|
||
int maxSizeInBytes = messageSettings.getMaxMessageSize() - (messageOverheadInBytes + commandDocumentSizeInBytes); | ||
int firstStart = firstOutput.getPosition(); | ||
int secondStart = secondOutput.getPosition(); | ||
int maxBatchCount = messageSettings.getMaxBatchCount(); | ||
return dualMessageSequences.encodeDocuments(write -> { | ||
vbabanin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int firstBeforeWritePosition = firstOutput.getPosition(); | ||
int secondBeforeWritePosition = secondOutput.getPosition(); | ||
int batchCountAfterWrite = write.doAndGetBatchCount(firstWriter, secondWriter); | ||
assertTrue(batchCountAfterWrite <= maxBatchCount); | ||
int writtenSizeInBytes = | ||
firstOutput.getPosition() - firstStart | ||
+ secondOutput.getPosition() - secondStart; | ||
if (writtenSizeInBytes < maxSizeInBytes && batchCountAfterWrite < maxBatchCount) { | ||
return OK_LIMIT_NOT_REACHED; | ||
} else if (writtenSizeInBytes > maxSizeInBytes) { | ||
firstOutput.truncateToPosition(firstBeforeWritePosition); | ||
secondOutput.truncateToPosition(secondBeforeWritePosition); | ||
if (batchCountAfterWrite == 1) { | ||
// we have failed to write a single document | ||
throw createBsonMaximumSizeExceededException(messageSettings.getMaxDocumentSize()); | ||
} | ||
return FAIL_LIMIT_EXCEEDED; | ||
} else { | ||
return OK_LIMIT_REACHED; | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* @param messageSettings Non-{@code null} iff the document size limit must be validated. | ||
*/ | ||
static BsonBinaryWriter createBsonBinaryWriter( | ||
final BsonOutput out, | ||
final FieldNameValidator validator, | ||
@Nullable final MessageSettings messageSettings) { | ||
return new BsonBinaryWriter( | ||
new BsonWriterSettings(), | ||
messageSettings == null | ||
? new BsonBinaryWriterSettings() | ||
: new BsonBinaryWriterSettings(messageSettings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE), | ||
out, | ||
validator); | ||
} | ||
|
||
/** | ||
* Backpatches the document/message/sequence length into the beginning of the document/message/sequence. | ||
* | ||
* @param startPosition The start position of the document/message/sequence in {@code bsonOutput}. | ||
*/ | ||
static void backpatchLength(final int startPosition, final BsonOutput bsonOutput) { | ||
int messageLength = bsonOutput.getPosition() - startPosition; | ||
bsonOutput.writeInt32(startPosition, messageLength); | ||
} | ||
|
||
private static BsonMaximumSizeExceededException createBsonMaximumSizeExceededException(final int maxSize) { | ||
return new BsonMaximumSizeExceededException(format("Payload document size is larger than maximum of %d.", maxSize)); | ||
} | ||
|
||
private static boolean writeDocument(final BsonWriter writer, final BsonOutput bsonOutput, final MessageSettings settings, | ||
final BsonDocument document, final int messageStartPosition, final int batchItemCount, | ||
final int maxSplittableDocumentSize) { | ||
int currentPosition = bsonOutput.getPosition(); | ||
getCodec(document).encode(writer, document, ENCODER_CONTEXT); | ||
encodeUsingRegistry(writer, document); | ||
int messageSize = bsonOutput.getPosition() - messageStartPosition; | ||
int documentSize = bsonOutput.getPosition() - currentPosition; | ||
if (exceedsLimits(settings, messageSize, documentSize, batchItemCount) | ||
|
@@ -85,24 +186,25 @@ private static boolean writeDocument(final BsonWriter writer, final BsonOutput b | |
return true; | ||
} | ||
|
||
@SuppressWarnings({"unchecked"}) | ||
private static Codec<BsonValue> getCodec(final BsonValue bsonValue) { | ||
return (Codec<BsonValue>) REGISTRY.get(bsonValue.getClass()); | ||
static void encodeUsingRegistry(final BsonWriter writer, final BsonValue value) { | ||
@SuppressWarnings("unchecked") | ||
Encoder<BsonValue> encoder = (Encoder<BsonValue>) REGISTRY.get(value.getClass()); | ||
encoder.encode(writer, value, ENCODER_CONTEXT); | ||
} | ||
|
||
private static MessageSettings getPayloadMessageSettings(final SplittablePayload.Type type, final MessageSettings settings) { | ||
MessageSettings payloadMessageSettings = settings; | ||
if (type != SplittablePayload.Type.INSERT) { | ||
payloadMessageSettings = createMessageSettingsBuilder(settings) | ||
.maxDocumentSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM) | ||
.maxDocumentSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE) | ||
.build(); | ||
} | ||
return payloadMessageSettings; | ||
} | ||
|
||
private static MessageSettings getDocumentMessageSettings(final MessageSettings settings) { | ||
return createMessageSettingsBuilder(settings) | ||
.maxMessageSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM) | ||
.maxMessageSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE) | ||
.build(); | ||
} | ||
|
||
|
@@ -126,8 +228,6 @@ private static boolean exceedsLimits(final MessageSettings settings, final int m | |
return false; | ||
} | ||
|
||
|
||
private BsonWriterHelper() { | ||
} | ||
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.