Skip to content

Commit

Permalink
common: Replace ThreadLocal logic with Lease logic
Browse files Browse the repository at this point in the history
... to improve performance with virtual threads in Java 21 or later.

Set -Dorg.newsclub.net.unix.virtual-threads=false to disable.
  • Loading branch information
kohlschuetter committed Apr 15, 2024
1 parent 228ceaa commit 501d9f2
Show file tree
Hide file tree
Showing 20 changed files with 654 additions and 239 deletions.
169 changes: 94 additions & 75 deletions junixsocket-common/src/main/java/org/newsclub/net/unix/AFCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import org.newsclub.net.unix.pool.MutableHolder;
import org.newsclub.net.unix.pool.ObjectPool;
import org.newsclub.net.unix.pool.ObjectPool.Lease;

/**
* The core functionality of file descriptor based I/O.
*
* @author Christian Kohlschütter
*/
class AFCore extends CleanableState {
private static final ThreadLocal<ByteBuffer> TL_BUFFER = new ThreadLocal<>();
private static final ObjectPool<MutableHolder<ByteBuffer>> TL_BUFFER = ObjectPool
.newThreadLocalPool(() -> {
return new MutableHolder<>(null);
});

private static final String PROP_TL_BUFFER_MAX_CAPACITY =
"org.newsclub.net.unix.thread-local-buffer.max-capacity"; // 0 means "no limit" (discouraged)
Expand Down Expand Up @@ -129,46 +137,49 @@ int read(ByteBuffer dst, ByteBuffer socketAddressBuffer, int options) throws IOE
int pos;

boolean direct = dst.isDirect();
if (direct) {
buf = dst;
pos = dstPos;
} else {
buf = getThreadLocalDirectByteBuffer(remaining);
remaining = Math.min(remaining, buf.remaining());
pos = buf.position();
}

if (!blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}

int count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
ancillaryDataSupport, 0);
if (count == -1) {
return count;
}
try (Lease<MutableHolder<ByteBuffer>> lease = direct ? null : getPrivateDirectByteBuffer(
remaining)) {
if (direct) {
buf = dst;
pos = dstPos;
} else {
buf = Objects.requireNonNull(lease).get().get();
remaining = Math.min(remaining, buf.remaining());
pos = buf.position();
}

if (direct) {
if (count < 0) {
throw new IllegalStateException();
if (!blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}
dst.position(pos + count);
} else {
int oldLimit = buf.limit();
if (count < oldLimit) {
buf.limit(count);

int count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
ancillaryDataSupport, 0);
if (count == -1) {
return count;
}
try {
while (buf.hasRemaining()) {
dst.put(buf);

if (direct) {
if (count < 0) {
throw new IllegalStateException();
}
} finally {
dst.position(pos + count);
} else {
int oldLimit = buf.limit();
if (count < oldLimit) {
buf.limit(oldLimit);
buf.limit(count);
}
try {
while (buf.hasRemaining()) {
dst.put(buf);
}
} finally {
if (count < oldLimit) {
buf.limit(oldLimit);
}
}
}
return count;
}
return count;
}

int write(ByteBuffer src) throws IOException {
Expand All @@ -185,48 +196,54 @@ int write(ByteBuffer src, SocketAddress target, int options) throws IOException
FileDescriptor fdesc = validFdOrException();
final ByteBuffer addressTo;
final int addressToLen;
if (target == null) {
addressTo = null;
addressToLen = 0;
} else {
addressTo = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
addressToLen = AFSocketAddress.unwrapAddressDirectBufferInternal(addressTo, target);
}

// accept "send buffer overflow" as packet loss
// and don't retry (which may slow things down quite a bit)
if (!blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}
try (Lease<ByteBuffer> addressToLease = target == null ? null
: AFSocketAddress.SOCKETADDRESS_BUFFER_TL.take()) {
if (addressToLease == null) {
addressTo = null;
addressToLen = 0;
} else {
addressTo = addressToLease.get();
addressToLen = AFSocketAddress.unwrapAddressDirectBufferInternal(addressTo, target);
}

int pos = src.position();
boolean isDirect = src.isDirect();
ByteBuffer buf;
int bufPos;
if (isDirect) {
buf = src;
bufPos = pos;
} else {
buf = getThreadLocalDirectByteBuffer(remaining);
remaining = Math.min(remaining, buf.remaining());

bufPos = buf.position();

while (src.hasRemaining() && buf.hasRemaining()) {
buf.put(src);
// accept "send buffer overflow" as packet loss
// and don't retry (which may slow things down quite a bit)
if (!blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}

buf.position(bufPos);
}
if (datagramMode) {
options |= NativeUnixSocket.OPT_DGRAM_MODE;
}
int pos = src.position();
boolean isDirect = src.isDirect();
ByteBuffer buf;
int bufPos;

try (Lease<MutableHolder<ByteBuffer>> lease = isDirect ? null : getPrivateDirectByteBuffer(
remaining)) {
if (isDirect) {
buf = src;
bufPos = pos;
} else {
buf = Objects.requireNonNull(lease).get().get();
remaining = Math.min(remaining, buf.remaining());

int written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
options, ancillaryDataSupport);
src.position(pos + written);
bufPos = buf.position();

return written;
while (src.hasRemaining() && buf.hasRemaining()) {
buf.put(src);
}

buf.position(bufPos);
}
if (datagramMode) {
options |= NativeUnixSocket.OPT_DGRAM_MODE;
}

int written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
options, ancillaryDataSupport);
src.position(pos + written);
return written;
}
}
}

/**
Expand All @@ -239,23 +256,25 @@ int write(ByteBuffer src, SocketAddress target, int options) throws IOException
* @param capacity The desired capacity.
* @return A byte buffer satisfying the requested capacity.
*/
ByteBuffer getThreadLocalDirectByteBuffer(int capacity) {
Lease<MutableHolder<ByteBuffer>> getPrivateDirectByteBuffer(int capacity) {
if (capacity > TL_BUFFER_MAX_CAPACITY && TL_BUFFER_MAX_CAPACITY > 0) {
// Capacity exceeds configurable maximum limit;
// allocate but do not cache direct buffer.
// This may incur a performance penalty at the cost of correctness when using such capacities.
return ByteBuffer.allocateDirect(capacity);
return ObjectPool.unpooledLease(new MutableHolder<>(ByteBuffer.allocateDirect(capacity)));
}
if (capacity < TL_BUFFER_MIN_CAPACITY) {
capacity = TL_BUFFER_MIN_CAPACITY;
}
ByteBuffer buffer = TL_BUFFER.get();
Lease<MutableHolder<ByteBuffer>> lease = TL_BUFFER.take();
MutableHolder<ByteBuffer> holder = lease.get();
ByteBuffer buffer = holder.get();
if (buffer == null || capacity > buffer.capacity()) {
buffer = ByteBuffer.allocateDirect(capacity);
TL_BUFFER.set(buffer);
holder.set(buffer);
}
buffer.clear();
return buffer;
return lease;
}

void implConfigureBlocking(boolean block) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.newsclub.net.unix.pool.MutableHolder;
import org.newsclub.net.unix.pool.ObjectPool.Lease;

/**
* A {@link DatagramSocketImpl} implemented by junixsocket.
Expand Down Expand Up @@ -106,8 +108,10 @@ final void connect(AFSocketAddress socketAddress) throws IOException {
if (socketAddress == AFSocketAddress.INTERNAL_DUMMY_CONNECT) { // NOPMD
return;
}
ByteBuffer ab = socketAddress.getNativeAddressDirectBuffer();
NativeUnixSocket.connect(ab, ab.limit(), fd, -1);
try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
ByteBuffer ab = abLease.get();
NativeUnixSocket.connect(ab, ab.limit(), fd, -1);
}
this.remotePort = socketAddress.getPort();
}

Expand Down Expand Up @@ -144,13 +148,9 @@ final void bind(AFSocketAddress socketAddress) throws SocketException {
if (socketAddress == AFSocketAddress.INTERNAL_DUMMY_BIND) { // NOPMD
return;
}
try {
ByteBuffer ab;
if (socketAddress == null) {
ab = AFSocketAddress.getNativeAddressDirectBuffer(0);
} else {
ab = socketAddress.getNativeAddressDirectBuffer();
}
try (Lease<ByteBuffer> abLease = socketAddress == null ? AFSocketAddress
.getNativeAddressDirectBuffer(0) : socketAddress.getNativeAddressDirectBuffer()) {
ByteBuffer ab = abLease.get();
NativeUnixSocket.bind(ab, ab.limit(), fd, NativeUnixSocket.OPT_DGRAM_MODE);
if (socketAddress == null) {
this.localPort = 0;
Expand All @@ -174,41 +174,54 @@ private void recv(DatagramPacket p, int options) throws IOException {
int len = p.getLength();
FileDescriptor fdesc = core.validFdOrException();

ByteBuffer datagramPacketBuffer = core.getThreadLocalDirectByteBuffer(len);
len = Math.min(len, datagramPacketBuffer.capacity());
try (Lease<MutableHolder<ByteBuffer>> lease = core.getPrivateDirectByteBuffer(len)) {
ByteBuffer datagramPacketBuffer = lease.get().get();
len = Math.min(len, datagramPacketBuffer.capacity());

options |= core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING;

try (Lease<ByteBuffer> socketAddressBufferLease = AFSocketAddress.SOCKETADDRESS_BUFFER_TL
.take()) {
ByteBuffer socketAddressBuffer = socketAddressBufferLease.get();
int count = NativeUnixSocket.receive(fdesc, datagramPacketBuffer, 0, len,
socketAddressBuffer, options, ancillaryDataSupport, socketTimeout.get());
if (count > len) {
throw new IllegalStateException("count > len: " + count + " > " + len);
} else if (count == -1) {
throw new SocketTimeoutException();
} else if (count < 0) {
throw new IllegalStateException("count: " + count + " < 0");
}
datagramPacketBuffer.limit(count);
datagramPacketBuffer.rewind();
datagramPacketBuffer.get(p.getData(), p.getOffset(), count);

options |= core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING;
p.setLength(count);

ByteBuffer socketAddressBuffer = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
int count = NativeUnixSocket.receive(fdesc, datagramPacketBuffer, 0, len, socketAddressBuffer,
options, ancillaryDataSupport, socketTimeout.get());
if (count > len) {
throw new IllegalStateException("count > len: " + count + " > " + len);
} else if (count == -1) {
throw new SocketTimeoutException();
} else if (count < 0) {
throw new IllegalStateException("count: " + count + " < 0");
A addr = AFSocketAddress.ofInternal(socketAddressBuffer, getAddressFamily());
p.setAddress(addr == null ? null : addr.getInetAddress());
p.setPort(remotePort);
}
}
datagramPacketBuffer.limit(count);
datagramPacketBuffer.rewind();
datagramPacketBuffer.get(p.getData(), p.getOffset(), count);

p.setLength(count);

A addr = AFSocketAddress.ofInternal(socketAddressBuffer, getAddressFamily());
p.setAddress(addr == null ? null : addr.getInetAddress());
p.setPort(remotePort);
}

@Override
protected final void send(DatagramPacket p) throws IOException {
InetAddress addr = p.getAddress();
ByteBuffer sendToBuf = null;
int sendToBufLen = 0;

byte[] addrBytes;
if (addr != null) {
byte[] addrBytes = AFInetAddress.unwrapAddress(addr, getAddressFamily());
if (addrBytes != null) {
sendToBuf = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
addrBytes = AFInetAddress.unwrapAddress(addr, getAddressFamily());
} else {
addrBytes = null;
}

try (Lease<ByteBuffer> sendToBufLease = addrBytes == null ? null
: AFSocketAddress.SOCKETADDRESS_BUFFER_TL.take()) {
if (sendToBufLease != null) {
sendToBuf = sendToBufLease.get();
sendToBufLen = NativeUnixSocket.bytesToSockAddr(getAddressFamily().getDomain(), sendToBuf,
addrBytes);
sendToBuf.position(0);
Expand All @@ -221,14 +234,16 @@ protected final void send(DatagramPacket p) throws IOException {

int len = p.getLength();

ByteBuffer datagramPacketBuffer = core.getThreadLocalDirectByteBuffer(len);
datagramPacketBuffer.clear();
datagramPacketBuffer.put(p.getData(), p.getOffset(), p.getLength());
datagramPacketBuffer.flip();
try (Lease<MutableHolder<ByteBuffer>> lease = core.getPrivateDirectByteBuffer(len)) {
ByteBuffer datagramPacketBuffer = lease.get().get();
datagramPacketBuffer.clear();
datagramPacketBuffer.put(p.getData(), p.getOffset(), p.getLength());
datagramPacketBuffer.flip();

NativeUnixSocket.send(fdesc, datagramPacketBuffer, 0, len, sendToBuf, sendToBufLen,
/* NativeUnixSocket.OPT_NON_BLOCKING | */
NativeUnixSocket.OPT_DGRAM_MODE, ancillaryDataSupport);
NativeUnixSocket.send(fdesc, datagramPacketBuffer, 0, len, sendToBuf, sendToBufLen,
/* NativeUnixSocket.OPT_NON_BLOCKING | */
NativeUnixSocket.OPT_DGRAM_MODE, ancillaryDataSupport);
}
}

@Override
Expand Down Expand Up @@ -428,8 +443,8 @@ final boolean accept0(AFDatagramSocketImpl<A> socket) throws IOException {

final AFDatagramSocketImpl<A> si = socket;
core.incPendingAccepts();
try {
ByteBuffer ab = socketAddress.getNativeAddressDirectBuffer();
try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
ByteBuffer ab = abLease.get();

SocketException caught = null;
try {
Expand Down
Loading

0 comments on commit 501d9f2

Please sign in to comment.