Skip to content

Commit 9e70e7f

Browse files
committed
ClientSession向客户端发数据的逻辑增加一个缓冲池,以减少各客户端之间的相互等待
1 parent fd14575 commit 9e70e7f

File tree

1 file changed

+29
-8
lines changed

1 file changed

+29
-8
lines changed

common/src/main/java/org/wowtools/hppt/common/client/ClientSession.java

+29-8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import lombok.extern.slf4j.Slf4j;
55
import org.wowtools.hppt.common.util.BytesUtil;
66

7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.LinkedBlockingQueue;
9+
import java.util.concurrent.TimeUnit;
10+
711
/**
812
* 客户端会话
913
*
@@ -14,12 +18,33 @@
1418
public class ClientSession {
1519
private final int sessionId;
1620
private final ChannelHandlerContext channelHandlerContext;
17-
private final ClientSessionLifecycle lifecycle;
21+
22+
private final BlockingQueue<byte[]> sendToUserBytesQueue = new LinkedBlockingQueue<>();
23+
private volatile boolean running = true;
1824

1925
ClientSession(int sessionId, ChannelHandlerContext channelHandlerContext, ClientSessionLifecycle lifecycle) {
2026
this.sessionId = sessionId;
2127
this.channelHandlerContext = channelHandlerContext;
22-
this.lifecycle = lifecycle;
28+
Thread.startVirtualThread(() -> {
29+
while (running) {
30+
byte[] bytes;
31+
try {
32+
bytes = sendToUserBytesQueue.poll(10, TimeUnit.SECONDS);
33+
} catch (InterruptedException e) {
34+
continue;
35+
}
36+
if (null == bytes) {
37+
continue;
38+
}
39+
bytes = lifecycle.beforeSendToUser(this, bytes);
40+
if (null != bytes) {
41+
log.debug("ClientSession {} 向用户发送字节 {}", sessionId, bytes.length);
42+
BytesUtil.writeToChannelHandlerContext(channelHandlerContext, bytes);
43+
lifecycle.afterSendToUser(this, bytes);
44+
}
45+
}
46+
log.debug("ClientSession {} 接收线程结束", sessionId);
47+
});
2348
}
2449

2550
/**
@@ -28,12 +53,7 @@ public class ClientSession {
2853
* @param bytes bytes
2954
*/
3055
public void sendToUser(byte[] bytes) {
31-
bytes = lifecycle.beforeSendToUser(this, bytes);
32-
if (null != bytes) {
33-
log.debug("ClientSession {} 向用户发送字节 {}", sessionId, bytes.length);
34-
BytesUtil.writeToChannelHandlerContext(channelHandlerContext, bytes);
35-
lifecycle.afterSendToUser(this, bytes);
36-
}
56+
sendToUserBytesQueue.add(bytes);
3757
}
3858

3959

@@ -46,6 +66,7 @@ public ChannelHandlerContext getChannelHandlerContext() {
4666
}
4767

4868
void close() {
69+
running = false;
4970
channelHandlerContext.close();
5071
}
5172

0 commit comments

Comments
 (0)