Skip to content

Commit 65a22ef

Browse files
committed
增加一个BufferPool对象,统一管理阻塞队列
1 parent 6727f2d commit 65a22ef

28 files changed

+155
-83
lines changed

kafkademo/src/main/java/org/wowtools/hppt/kafkademo/ClientDemo.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void connectToServer(ScConfig config, Cb cb) throws Exception {
3232
}
3333
};
3434
KafkaUtil.buildConsumer("client", KafkaUtil.ServerSendTopic, clientConsumer);
35-
cb.end();//调用end方法,通知框架连接完成
35+
cb.end(null);//调用end方法,通知框架连接完成
3636
}
3737

3838
@Override

run/src/main/java/org/wowtools/hppt/common/client/ClientSession.java

+5-11
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22

33
import io.netty.channel.ChannelHandlerContext;
44
import lombok.extern.slf4j.Slf4j;
5+
import org.wowtools.hppt.common.util.BufferPool;
56
import org.wowtools.hppt.common.util.BytesUtil;
67

7-
import java.util.concurrent.BlockingQueue;
8-
import java.util.concurrent.LinkedBlockingQueue;
98
import java.util.concurrent.TimeUnit;
109

1110
/**
@@ -19,20 +18,15 @@ public class ClientSession {
1918
private final int sessionId;
2019
private final ChannelHandlerContext channelHandlerContext;
2120

22-
private final BlockingQueue<byte[]> sendToUserBytesQueue = new LinkedBlockingQueue<>();
21+
private final BufferPool<byte[]> sendToUserBytesQueue = new BufferPool<>("<ClientSession-sendToUserBytesQueue");
2322
private volatile boolean running = true;
2423

2524
ClientSession(int sessionId, ChannelHandlerContext channelHandlerContext, ClientSessionLifecycle lifecycle) {
2625
this.sessionId = sessionId;
2726
this.channelHandlerContext = channelHandlerContext;
2827
Thread.startVirtualThread(() -> {
2928
while (running) {
30-
byte[] bytes;
31-
try {
32-
bytes = sendToUserBytesQueue.poll(10, TimeUnit.SECONDS);
33-
} catch (InterruptedException e) {
34-
continue;
35-
}
29+
byte[] bytes = sendToUserBytesQueue.poll(10, TimeUnit.SECONDS);
3630
if (null == bytes) {
3731
continue;
3832
}
@@ -41,9 +35,9 @@ public class ClientSession {
4135
log.debug("ClientSession {} 向用户发送字节 {}", sessionId, bytes.length);
4236
Throwable e = BytesUtil.writeToChannelHandlerContext(channelHandlerContext, bytes);
4337
if (null != e) {
44-
log.warn("向用户发送字节异常",e);
38+
log.warn("向用户发送字节异常", e);
4539
close();
46-
}else if (log.isDebugEnabled()){
40+
} else if (log.isDebugEnabled()) {
4741
log.debug("ClientSession {} 向用户发送字节完成 {}", sessionId, bytes.length);
4842
}
4943
lifecycle.afterSendToUser(this, bytes);

run/src/main/java/org/wowtools/hppt/common/server/LoginClientService.java

+8-17
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.wowtools.hppt.common.pojo.SendAbleSessionBytes;
44
import org.wowtools.hppt.common.pojo.SessionBytes;
55
import org.wowtools.hppt.common.util.AesCipherUtil;
6+
import org.wowtools.hppt.common.util.BufferPool;
67
import org.wowtools.hppt.common.util.BytesUtil;
78

89
import java.nio.charset.StandardCharsets;
@@ -34,10 +35,10 @@ public static final class Client {
3435
public final String clientId;
3536
public final AesCipherUtil aesCipherUtil;
3637

37-
private final BlockingQueue<String> commandQueue = new LinkedBlockingQueue<>();
38+
private final BufferPool<String> commandQueue = new BufferPool<>("<LoginClientService-Client-commandQueue");
3839

39-
private final BlockingQueue<SendAbleSessionBytes> sessionBytesQueue = new LinkedBlockingQueue<>();
40-
public final BlockingQueue<byte[]> receiveClientBytes = new LinkedBlockingQueue<>();
40+
private final BufferPool<SendAbleSessionBytes> sessionBytesQueue = new BufferPool<>("<LoginClientService-Client-sessionBytesQueue");
41+
public final BufferPool<byte[]> receiveClientBytes = new BufferPool<>("<LoginClientService-Client-receiveClientBytes");
4142

4243
private final HashMap<Integer, ServerSession> sessions = new HashMap<>();
4344

@@ -57,12 +58,7 @@ public void addCommand(String cmd) {
5758

5859
//取出所有需要向客户端发送的命令 无命令则返回null
5960
public List<String> fetchCommands() {
60-
if (commandQueue.isEmpty()) {
61-
return null;
62-
}
63-
List<String> res = new LinkedList<>();
64-
commandQueue.drainTo(res);
65-
return res;
61+
return commandQueue.drainToList();
6662
}
6763

6864
public void addSession(ServerSession session) {
@@ -99,7 +95,7 @@ public List<SendAbleSessionBytes> fetchBytes(long maxReturnBodySize) {
9995
}
10096
List<SendAbleSessionBytes> bytesList = new LinkedList<>();
10197
if (maxReturnBodySize < 0) {
102-
sessionBytesQueue.drainTo(bytesList);
98+
sessionBytesQueue.drainToList(bytesList);
10399
} else {
104100
//根据maxReturnBodySize的限制取出队列中的数据返回
105101
long currentReturnBodySize = 0L;
@@ -119,12 +115,7 @@ public List<SendAbleSessionBytes> fetchBytes(long maxReturnBodySize) {
119115
//取出所有需要向客户端发送的bytes 取出的bytes会按相同sessionId进行整合 无bytes则阻塞3秒后返回
120116
public List<SendAbleSessionBytes> fetchBytesBlocked(long maxReturnBodySize) {
121117
List<SendAbleSessionBytes> bytesList = new LinkedList<>();
122-
SendAbleSessionBytes first;
123-
try {
124-
first = sessionBytesQueue.poll(3, TimeUnit.SECONDS);
125-
} catch (InterruptedException e) {
126-
return bytesList;
127-
}
118+
SendAbleSessionBytes first= sessionBytesQueue.poll(3, TimeUnit.SECONDS);
128119
if (null == first) {
129120
return bytesList;
130121
}
@@ -133,7 +124,7 @@ public List<SendAbleSessionBytes> fetchBytesBlocked(long maxReturnBodySize) {
133124
return bytesList;
134125
}
135126
if (maxReturnBodySize < 0) {
136-
sessionBytesQueue.drainTo(bytesList);
127+
sessionBytesQueue.drainToList(bytesList);
137128
return merge(bytesList);
138129
} else {
139130
//根据maxReturnBodySize的限制取出队列中的数据返回

run/src/main/java/org/wowtools/hppt/common/server/ServerSession.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.netty.channel.Channel;
44
import lombok.extern.slf4j.Slf4j;
55
import org.wowtools.hppt.common.pojo.SessionBytes;
6+
import org.wowtools.hppt.common.util.BufferPool;
67
import org.wowtools.hppt.common.util.BytesUtil;
78
import org.wowtools.hppt.common.util.DebugConfig;
89
import org.wowtools.hppt.common.util.RoughTimeUtil;
@@ -26,7 +27,7 @@ public class ServerSession {
2627

2728
private final ServerSessionLifecycle lifecycle;
2829

29-
private final BlockingQueue<SessionBytes> sendBytesQueue = new LinkedBlockingQueue<>();
30+
private final BufferPool<SessionBytes> sendBytesQueue = new BufferPool<>(">ServerSession-sendBytesQueue");
3031
//上次活跃时间
3132
private long activeTime;
3233

run/src/main/java/org/wowtools/hppt/common/server/ServerSessionManagerBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
public class ServerSessionManagerBuilder {
1111
protected EventLoopGroup group;
1212
protected ServerSessionLifecycle lifecycle;
13-
protected long sessionTimeout = 30000;
13+
protected long sessionTimeout = 60000;
1414

1515
public ServerSessionManagerBuilder setSessionTimeout(long sessionTimeout) {
1616
this.sessionTimeout = sessionTimeout;

run/src/main/java/org/wowtools/hppt/common/server/ServerTalker.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44
import org.wowtools.hppt.common.pojo.SendAbleSessionBytes;
55
import org.wowtools.hppt.common.pojo.SessionBytes;
66
import org.wowtools.hppt.common.pojo.TalkMessage;
7+
import org.wowtools.hppt.common.util.BufferPool;
78
import org.wowtools.hppt.common.util.CommonConfig;
89
import org.wowtools.hppt.common.util.Constant;
910
import org.wowtools.hppt.common.util.DebugConfig;
1011

1112
import java.util.ArrayList;
1213
import java.util.List;
1314
import java.util.Map;
14-
import java.util.concurrent.BlockingQueue;
15-
import java.util.concurrent.LinkedBlockingQueue;
1615

1716
/**
1817
* @author liuyu
@@ -206,7 +205,8 @@ public void run() {
206205
}
207206
}
208207

209-
private static final BlockingQueue<SendAbleSessionBytesResult> sendAbleSessionBytesResultQueue = new LinkedBlockingQueue<>();
208+
private static final BufferPool<SendAbleSessionBytesResult> sendAbleSessionBytesResultQueue
209+
= new BufferPool<>("ServerTalker.sendAbleSessionBytesResultQueue");
210210

211211
static {
212212
Thread.startVirtualThread(() -> {

run/src/main/java/org/wowtools/hppt/common/util/BufferPool.java

+42-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.wowtools.hppt.common.util;
22

3+
import lombok.extern.slf4j.Slf4j;
4+
35
import java.util.LinkedList;
46
import java.util.List;
57
import java.util.concurrent.LinkedBlockingQueue;
@@ -11,14 +13,14 @@
1113
* @author liuyu
1214
* @date 2024/10/27
1315
*/
16+
@Slf4j
1417
public class BufferPool<T> {
1518
private final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
1619

1720
private final String name;
1821

1922
/**
20-
*
21-
* @param name 缓冲池名字,为便于排查,请保证业务功能上的唯一
23+
* @param name 缓冲池名字,为便于排查,请保证名称在业务层面的准确清晰
2224
*/
2325
public BufferPool(String name) {
2426
this.name = name;
@@ -30,7 +32,17 @@ public BufferPool(String name) {
3032
* @param t t
3133
*/
3234
public void add(T t) {
33-
queue.add(t);
35+
if (!DebugConfig.OpenBufferPoolDetector) {
36+
queue.add(t);
37+
} else {
38+
int n = queue.size();
39+
queue.add(t);
40+
int n1 = queue.size();
41+
if (n < DebugConfig.BufferPoolWaterline && n1 >= DebugConfig.BufferPoolWaterline) {
42+
log.debug("{} 缓冲池高水位线: {} -> {}", name, n, n1);
43+
}
44+
}
45+
3446
}
3547

3648
/**
@@ -84,4 +96,31 @@ public List<T> takeAndDrainToList() {
8496
queue.drainTo(list);
8597
return list;
8698
}
99+
100+
/**
101+
* 获取队列中当前可用的所有元素,队列为空则返回null
102+
*
103+
* @return list
104+
*/
105+
public List<T> drainToList() {
106+
if (queue.isEmpty()) {
107+
return null;
108+
}
109+
List<T> list = new LinkedList<>();
110+
queue.drainTo(list);
111+
return list;
112+
}
113+
114+
/**
115+
* 获取队列中当前可用的所有元素添加到list中
116+
*
117+
* @param list list
118+
*/
119+
public void drainToList(List<T> list) {
120+
queue.drainTo(list);
121+
}
122+
123+
public boolean isEmpty() {
124+
return queue.isEmpty();
125+
}
87126
}

run/src/main/java/org/wowtools/hppt/common/util/BytesUtil.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,11 @@ private static void waitChannelWritable(Channel channel) {
151151
int i = 0;
152152
while (!channel.isWritable() && channel.isOpen()) {
153153
i++;
154-
if (i > 30) {
154+
if (i > 3000) {
155155
throw new RuntimeException("waitChannelWritable timeout");
156156
}
157157
try {
158-
Thread.sleep(1000);
158+
Thread.sleep(10);
159159
} catch (InterruptedException e) {
160160
}
161161
}

run/src/main/java/org/wowtools/hppt/common/util/DebugConfig.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,17 @@ public class DebugConfig {
2020
//是否开启消息流水号
2121
public static final boolean OpenSerialNumber;
2222

23+
//是否开启缓冲池监控
24+
public static final boolean OpenBufferPoolDetector;
25+
26+
//缓冲池高水位线,缓冲池中元素个数超过此值且继续向其中添加要素则触发日志
27+
public static final int BufferPoolWaterline;
28+
2329
static {
2430
ResourceLeakDetector.Level _NettyResourceLeakDetectorLevel = ResourceLeakDetector.Level.DISABLED;
2531
boolean _OpenSerialNumber = false;
32+
boolean _OpenBufferPoolDetector = false;
33+
int _BufferPoolWaterline = 1000;
2634
try {
2735
String str = ResourcesReader.readStr(Run.class, "debug.ini");
2836
Map<String, String> configs = new HashMap<>();
@@ -48,11 +56,17 @@ public class DebugConfig {
4856

4957
_OpenSerialNumber = "1".equals(configs.get("OpenSerialNumber"));
5058

59+
_OpenBufferPoolDetector = "1".equals(configs.get("OpenBufferPoolDetector"));
60+
_BufferPoolWaterline = Integer.parseInt(configs.getOrDefault("BufferPoolWaterline", "1000"));
61+
5162
} catch (Exception e) {
52-
log.debug("不开启调试模式,原因 {}", e.getMessage());
63+
log.debug("不开启调试模式 ", e);
5364
}
5465

5566
NettyResourceLeakDetectorLevel = _NettyResourceLeakDetectorLevel;
5667
OpenSerialNumber = _OpenSerialNumber;
68+
69+
OpenBufferPoolDetector = _OpenBufferPoolDetector;
70+
BufferPoolWaterline = _BufferPoolWaterline;
5771
}
5872
}

run/src/main/java/org/wowtools/hppt/common/util/HttpUtil.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ public class HttpUtil {
3030
static {
3131
okHttpClient = new OkHttpClient.Builder()
3232
.sslSocketFactory(sslSocketFactory(), x509TrustManager())
33-
// 是否开启缓存
34-
.retryOnConnectionFailure(false)
33+
.retryOnConnectionFailure(true)
3534
.connectionPool(pool())
3635
.connectTimeout(60L, TimeUnit.SECONDS)
3736
.readTimeout(60L, TimeUnit.SECONDS)

run/src/main/java/org/wowtools/hppt/run/sc/RunSc.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ public static void main(String[] args) {
2727
throw new RuntimeException("读取配置文件异常", e);
2828
}
2929
while (true) {
30-
try {
31-
ClientSessionService clientSessionService = ClientSessionServiceBuilder.build(config);
30+
try (ClientSessionService clientSessionService = ClientSessionServiceBuilder.build(config)){
3231
clientSessionService.sync();
3332
} catch (Exception e) {
3433
log.warn("服务异常", e);

0 commit comments

Comments
 (0)