-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Optimize String write #1651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize String write #1651
Changes from 5 commits
d0f68c4
73abcc9
577f6bf
502f84b
879ddbd
759381d
e29638d
8ba1940
3980457
d9cf649
3ff0644
43f1663
dd5fe4d
e4f6f31
db424d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
package com.mongodb.internal.connection; | ||
|
||
import org.bson.BsonSerializationException; | ||
import org.bson.ByteBuf; | ||
import org.bson.io.OutputBuffer; | ||
|
||
|
@@ -25,8 +26,10 @@ | |
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import static com.mongodb.assertions.Assertions.assertFalse; | ||
import static com.mongodb.assertions.Assertions.assertTrue; | ||
import static com.mongodb.assertions.Assertions.notNull; | ||
import static java.lang.String.format; | ||
|
||
/** | ||
* <p>This class is not part of the public API and may be removed or changed at any time</p> | ||
|
@@ -100,11 +103,17 @@ private ByteBuf getCurrentByteBuffer() { | |
return getByteBufferAtIndex(curBufferIndex); | ||
} | ||
|
||
private ByteBuf getNextByteBuffer() { | ||
assertFalse(bufferList.get(curBufferIndex).hasRemaining()); | ||
return getByteBufferAtIndex(++curBufferIndex); | ||
} | ||
|
||
private ByteBuf getByteBufferAtIndex(final int index) { | ||
if (bufferList.size() < index + 1) { | ||
bufferList.add(bufferProvider.getBuffer(index >= (MAX_SHIFT - INITIAL_SHIFT) | ||
? MAX_BUFFER_SIZE | ||
: Math.min(INITIAL_BUFFER_SIZE << index, MAX_BUFFER_SIZE))); | ||
ByteBuf buffer = bufferProvider.getBuffer(index >= (MAX_SHIFT - INITIAL_SHIFT) | ||
? MAX_BUFFER_SIZE | ||
: Math.min(INITIAL_BUFFER_SIZE << index, MAX_BUFFER_SIZE)); | ||
bufferList.add(buffer); | ||
} | ||
return bufferList.get(index); | ||
} | ||
|
@@ -147,6 +156,16 @@ public List<ByteBuf> getByteBuffers() { | |
return buffers; | ||
} | ||
|
||
public List<ByteBuf> getDuplicateByteBuffers() { | ||
ensureOpen(); | ||
|
||
List<ByteBuf> buffers = new ArrayList<>(bufferList.size()); | ||
for (final ByteBuf cur : bufferList) { | ||
buffers.add(cur.duplicate().order(ByteOrder.LITTLE_ENDIAN)); | ||
} | ||
return buffers; | ||
} | ||
|
||
|
||
@Override | ||
public int pipe(final OutputStream out) throws IOException { | ||
|
@@ -155,14 +174,18 @@ public int pipe(final OutputStream out) throws IOException { | |
byte[] tmp = new byte[INITIAL_BUFFER_SIZE]; | ||
|
||
int total = 0; | ||
for (final ByteBuf cur : getByteBuffers()) { | ||
ByteBuf dup = cur.duplicate(); | ||
while (dup.hasRemaining()) { | ||
int numBytesToCopy = Math.min(dup.remaining(), tmp.length); | ||
dup.get(tmp, 0, numBytesToCopy); | ||
out.write(tmp, 0, numBytesToCopy); | ||
List<ByteBuf> byteBuffers = getByteBuffers(); | ||
try { | ||
for (final ByteBuf cur : byteBuffers) { | ||
while (cur.hasRemaining()) { | ||
int numBytesToCopy = Math.min(cur.remaining(), tmp.length); | ||
cur.get(tmp, 0, numBytesToCopy); | ||
out.write(tmp, 0, numBytesToCopy); | ||
} | ||
total += cur.limit(); | ||
} | ||
total += dup.limit(); | ||
} finally { | ||
byteBuffers.forEach(ByteBuf::release); | ||
rozza marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return total; | ||
} | ||
|
@@ -282,4 +305,129 @@ private static final class BufferPositionPair { | |
this.position = position; | ||
} | ||
} | ||
|
||
protected int writeCharacters(final String str, final boolean checkNullTermination) { | ||
rozza marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int len = str.length(); | ||
int sp = 0; | ||
int prevPos = position; | ||
|
||
ByteBuf buf = getCurrentByteBuffer(); | ||
int currBufferPos = buf.position(); | ||
int limit = buf.limit(); | ||
int remaining = limit - currBufferPos; | ||
|
||
if (buf.hasArray()) { | ||
byte[] dst = buf.array(); | ||
int arrayOffset = buf.arrayOffset(); | ||
if (remaining >= str.length() + 1) { | ||
sp = writeOnArrayAscii(str, dst, arrayOffset + currBufferPos, checkNullTermination); | ||
currBufferPos += sp; | ||
if (sp == len) { | ||
dst[arrayOffset + currBufferPos++] = 0; | ||
position += sp + 1; | ||
buf.position(currBufferPos); | ||
return sp + 1; | ||
} | ||
position += sp; | ||
buf.position(currBufferPos); | ||
} | ||
} | ||
|
||
while (sp < len) { | ||
remaining = limit - currBufferPos; | ||
int c = str.charAt(sp); | ||
|
||
if (checkNullTermination && c == 0x0) { | ||
throw new BsonSerializationException( | ||
format("BSON cstring '%s' is not valid because it contains a null character " + "at index %d", str, sp)); | ||
} | ||
|
||
if (c < 0x80) { | ||
if (remaining == 0) { | ||
buf = getNextByteBuffer(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still suggest to give at shot at the PR I made which use a separate index to access single bytes in the internalNio Buffer within the Netty buffers, for two reasons:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point - accessing the NIO buffer directly sounds like a potential win. I’m aiming to keep this PR focused and incremental for easier review and integration. We could consider Netty-specific optimizations in a follow-up PR/ scope, once we have Netty benchmarks running in CI |
||
currBufferPos = 0; | ||
limit = buf.limit(); | ||
} | ||
buf.put((byte) c); | ||
currBufferPos++; | ||
position++; | ||
} else if (c < 0x800) { | ||
if (remaining < 2) { | ||
write((byte) (0xc0 + (c >> 6))); | ||
write((byte) (0x80 + (c & 0x3f))); | ||
|
||
buf = getCurrentByteBuffer(); | ||
currBufferPos = buf.position(); | ||
limit = buf.limit(); | ||
} else { | ||
buf.put((byte) (0xc0 + (c >> 6))); | ||
buf.put((byte) (0x80 + (c & 0x3f))); | ||
currBufferPos += 2; | ||
position += 2; | ||
} | ||
} else { | ||
c = Character.codePointAt(str, sp); | ||
if (c < 0x10000) { | ||
if (remaining < 3) { | ||
write((byte) (0xe0 + (c >> 12))); | ||
write((byte) (0x80 + ((c >> 6) & 0x3f))); | ||
write((byte) (0x80 + (c & 0x3f))); | ||
|
||
buf = getCurrentByteBuffer(); | ||
currBufferPos = buf.position(); | ||
limit = buf.limit(); | ||
} else { | ||
buf.put((byte) (0xe0 + (c >> 12))); | ||
buf.put((byte) (0x80 + ((c >> 6) & 0x3f))); | ||
buf.put((byte) (0x80 + (c & 0x3f))); | ||
currBufferPos += 3; | ||
position += 3; | ||
} | ||
} else { | ||
if (remaining < 4) { | ||
write((byte) (0xf0 + (c >> 18))); | ||
write((byte) (0x80 + ((c >> 12) & 0x3f))); | ||
write((byte) (0x80 + ((c >> 6) & 0x3f))); | ||
write((byte) (0x80 + (c & 0x3f))); | ||
|
||
buf = getCurrentByteBuffer(); | ||
currBufferPos = buf.position(); | ||
limit = buf.limit(); | ||
} else { | ||
buf.put((byte) (0xf0 + (c >> 18))); | ||
buf.put((byte) (0x80 + ((c >> 12) & 0x3f))); | ||
buf.put((byte) (0x80 + ((c >> 6) & 0x3f))); | ||
buf.put((byte) (0x80 + (c & 0x3f))); | ||
currBufferPos += 4; | ||
position += 4; | ||
} | ||
} | ||
} | ||
sp += Character.charCount(c); | ||
} | ||
|
||
getCurrentByteBuffer().put((byte) 0); | ||
position++; | ||
return position - prevPos; | ||
} | ||
|
||
private static int writeOnArrayAscii(final String str, | ||
final byte[] dst, | ||
final int arrayPosition, | ||
final boolean checkNullTermination) { | ||
int pos = arrayPosition; | ||
int sp = 0; | ||
for (; sp < str.length(); sp++, pos++) { | ||
char c = str.charAt(sp); | ||
if (checkNullTermination && c == 0) { | ||
throw new BsonSerializationException( | ||
format("BSON cstring '%s' is not valid because it contains a null character " + "at index %d", str, sp)); | ||
} | ||
if (c >= 0x80) { | ||
break; | ||
} | ||
dst[pos] = (byte) c; | ||
} | ||
return sp; | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.