Skip to content

Commit c58d859

Browse files
committed
Make socket per core
1 parent 893eb03 commit c58d859

File tree

8 files changed

+12
-133
lines changed

8 files changed

+12
-133
lines changed

examples/tcp.proxy/README.md

Lines changed: 0 additions & 23 deletions
This file was deleted.

examples/tcp.proxy/compose.yaml

Lines changed: 0 additions & 35 deletions
This file was deleted.

examples/tcp.proxy/etc/zilla.yaml

Lines changed: 0 additions & 20 deletions
This file was deleted.

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.concurrent.ConcurrentMap;
2121

2222
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpServerBindingConfig;
23-
import io.aklivity.zilla.runtime.binding.tcp.internal.util.OperatingSystem;
2423
import io.aklivity.zilla.runtime.engine.EngineContext;
2524
import io.aklivity.zilla.runtime.engine.binding.Binding;
2625
import io.aklivity.zilla.runtime.engine.binding.BindingContext;
@@ -61,10 +60,8 @@ public BindingContext supply(
6160
}
6261

6362
private TcpServerBindingConfig supplyServer(
64-
long bindingId)
63+
long index)
6564
{
66-
return OperatingSystem.detect() == OperatingSystem.OS.MACOS
67-
? servers.computeIfAbsent(bindingId, TcpServerBindingConfig::new)
68-
: new TcpServerBindingConfig(bindingId);
65+
return servers.computeIfAbsent(index, TcpServerBindingConfig::new);
6966
}
7067
}

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public final class TcpServerBindingConfig
4040
private volatile ServerSocketChannel[] channels;
4141

4242
public TcpServerBindingConfig(
43-
long bindingId)
43+
long index)
4444
{
45-
this.id = bindingId;
45+
this.id = index;
4646
this.binds = new AtomicInteger();
4747
}
4848

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,15 @@ public class TcpServerFactory implements TcpStreamFactory
9999
private final int replyMax;
100100
private final int windowThreshold;
101101
private final int proxyTypeId;
102+
private final int index;
102103
private final BindingHandler streamFactory;
103104

104105
public TcpServerFactory(
105106
TcpConfiguration config,
106107
EngineContext context,
107108
LongFunction<TcpServerBindingConfig> servers)
108109
{
109-
this.router = new TcpServerRouter(config, context, this::handleAccept, servers);
110+
this.router = new TcpServerRouter(context.index(), config, context, this::handleAccept, servers);
110111
this.writeBuffer = context.writeBuffer();
111112
this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder());
112113
this.bufferPool = context.bufferPool();
@@ -116,6 +117,7 @@ public TcpServerFactory(
116117
this.supplyPollerKey = context::supplyPollerKey;
117118
this.streamFactory = context.streamFactory();
118119
this.proxyTypeId = context.supplyTypeId("proxy");
120+
this.index = context.index();
119121

120122
final int readBufferSize = writeBuffer.capacity() - DataFW.FIELD_OFFSET_PAYLOAD;
121123
this.readByteBuffer = ByteBuffer.allocateDirect(readBufferSize).order(nativeOrder());

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,19 @@ public final class TcpServerRouter
4141
private final ToIntFunction<PollerKey> acceptHandler;
4242
private final Function<SelectableChannel, PollerKey> supplyPollerKey;
4343
private final LongFunction<TcpServerBindingConfig> lookupServer;
44+
private final int index;
4445

4546
private int remainingConnections;
4647
private boolean unbound;
4748

4849
public TcpServerRouter(
50+
int index,
4951
TcpConfiguration config,
5052
EngineContext context,
5153
ToIntFunction<PollerKey> acceptHandler,
5254
LongFunction<TcpServerBindingConfig> lookupServer)
5355
{
56+
this.index = index;
5457
this.remainingConnections = config.maxConnections();
5558
this.bindings = new Long2ObjectHashMap<>();
5659
this.supplyPollerKey = context::supplyPollerKey;
@@ -130,7 +133,7 @@ public void close(
130133
private void register(
131134
TcpBindingConfig binding)
132135
{
133-
TcpServerBindingConfig server = lookupServer.apply(binding.id);
136+
TcpServerBindingConfig server = lookupServer.apply(index);
134137
ServerSocketChannel[] channels = server.bind(binding.options);
135138

136139
PollerKey[] acceptKeys = new PollerKey[channels.length];
@@ -158,7 +161,7 @@ private void unregister(
158161
}
159162
}
160163

161-
TcpServerBindingConfig server = lookupServer.apply(binding.id);
164+
TcpServerBindingConfig server = lookupServer.apply(index);
162165
server.unbind();
163166
}
164167
}

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

0 commit comments

Comments
 (0)