Skip to content

Commit 6727f2d

Browse files
committed
增加一个BufferPool对象,统一管理阻塞队列
1 parent 4222993 commit 6727f2d

File tree

6 files changed

+97
-16
lines changed

6 files changed

+97
-16
lines changed

run/src/main/java/org/wowtools/hppt/common/client/ClientTalker.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
import lombok.extern.slf4j.Slf4j;
44
import org.wowtools.hppt.common.pojo.SessionBytes;
55
import org.wowtools.hppt.common.pojo.TalkMessage;
6-
import org.wowtools.hppt.common.util.AesCipherUtil;
7-
import org.wowtools.hppt.common.util.CommonConfig;
8-
import org.wowtools.hppt.common.util.Constant;
9-
import org.wowtools.hppt.common.util.DebugConfig;
6+
import org.wowtools.hppt.common.util.*;
107

118
import java.nio.charset.StandardCharsets;
129
import java.util.LinkedList;
@@ -35,7 +32,7 @@ public class ClientTalker {
3532
* @throws Exception
3633
*/
3734
public static byte[] buildSendToServerBytes(CommonConfig config, long maxSendBodySize,
38-
BlockingQueue<String> sendCommandQueue, BlockingQueue<SessionBytes> sendBytesQueue,
35+
BufferPool<String> sendCommandQueue, BufferPool<SessionBytes> sendBytesQueue,
3936
AesCipherUtil aesCipherUtil, boolean wait) throws Exception {
4037
long sendBodySize = 0;//大致预估发送体积
4138
//命令
@@ -99,7 +96,7 @@ public static byte[] buildSendToServerBytes(CommonConfig config, long maxSendBod
9996

10097
//接收服务端发来的字节并做相应处理
10198
public static boolean receiveServerBytes(CommonConfig config, byte[] responseBody,
102-
ClientSessionManager clientSessionManager, AesCipherUtil aesCipherUtil, BlockingQueue<String> sendCommandQueue,
99+
ClientSessionManager clientSessionManager, AesCipherUtil aesCipherUtil, BufferPool<String> sendCommandQueue,
103100
Map<Integer, ClientBytesSender.SessionIdCallBack> sessionIdCallBackMap) throws Exception {
104101
if (null == responseBody) {
105102
return true;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package org.wowtools.hppt.common.util;
2+
3+
import java.util.LinkedList;
4+
import java.util.List;
5+
import java.util.concurrent.LinkedBlockingQueue;
6+
import java.util.concurrent.TimeUnit;
7+
8+
/**
9+
* 缓冲池,内置一个LinkedBlockingQueue,用以解耦生产者和消费者、缓冲数据并做监控
10+
*
11+
* @author liuyu
12+
* @date 2024/10/27
13+
*/
14+
public class BufferPool<T> {
15+
private final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
16+
17+
private final String name;
18+
19+
/**
20+
*
21+
* @param name 缓冲池名字,为便于排查,请保证业务功能上的唯一
22+
*/
23+
public BufferPool(String name) {
24+
this.name = name;
25+
}
26+
27+
/**
28+
* 添加
29+
*
30+
* @param t t
31+
*/
32+
public void add(T t) {
33+
queue.add(t);
34+
}
35+
36+
/**
37+
* 获取,队列为空则一直阻塞等待
38+
*
39+
* @return t
40+
*/
41+
public T take() {
42+
try {
43+
return queue.take();
44+
} catch (InterruptedException e) {
45+
throw new RuntimeException(e);
46+
}
47+
}
48+
49+
/**
50+
* 获取,队列为空则返回null
51+
*
52+
* @return t or null
53+
*/
54+
public T poll() {
55+
return queue.poll();
56+
}
57+
58+
/**
59+
* 获取,队列为空则阻塞等待一段时间,超时则返回null
60+
*
61+
* @param timeout timeout
62+
* @param unit TimeUnit
63+
* @return t or null
64+
*/
65+
public T poll(long timeout, TimeUnit unit) {
66+
T t;
67+
try {
68+
t = queue.poll(timeout, unit);
69+
} catch (InterruptedException e) {
70+
return null;
71+
}
72+
return t;
73+
}
74+
75+
/**
76+
* 获取队列中当前可用的所有元素,队列为空则阻塞等待,所以list至少会有一个元素
77+
*
78+
* @return list
79+
*/
80+
public List<T> takeAndDrainToList() {
81+
List<T> list = new LinkedList<>();
82+
T t0 = take();
83+
list.add(t0);
84+
queue.drainTo(list);
85+
return list;
86+
}
87+
}

run/src/main/java/org/wowtools/hppt/run/sc/common/PortReceiver.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
import java.util.LinkedList;
1616
import java.util.List;
1717
import java.util.Map;
18-
import java.util.concurrent.BlockingQueue;
1918
import java.util.concurrent.ConcurrentHashMap;
20-
import java.util.concurrent.LinkedBlockingQueue;
2119
import java.util.concurrent.atomic.AtomicInteger;
2220

2321
/**
@@ -31,8 +29,8 @@ final class PortReceiver implements Receiver {
3129
private final ClientSessionService clientSessionService;
3230

3331

34-
private final BlockingQueue<String> sendCommandQueue = new LinkedBlockingQueue<>();
35-
private final BlockingQueue<SessionBytes> sendBytesQueue = new LinkedBlockingQueue<>();
32+
private final BufferPool<String> sendCommandQueue = new BufferPool<>(">PortReceiver-sendCommand");
33+
private final BufferPool<SessionBytes> sendBytesQueue = new BufferPool<>(">PortReceiver-sendBytesQueue");
3634

3735
private final Map<Integer, ClientBytesSender.SessionIdCallBack> sessionIdCallBackMap = new ConcurrentHashMap<>();//<newSessionFlag,cb>
3836
private AesCipherUtil aesCipherUtil;

run/src/main/java/org/wowtools/hppt/run/sc/common/SsReceiver.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
package org.wowtools.hppt.run.sc.common;
22

33
import org.wowtools.hppt.common.client.ClientSession;
4+
import org.wowtools.hppt.common.util.BufferPool;
45
import org.wowtools.hppt.common.util.RoughTimeUtil;
56
import org.wowtools.hppt.run.sc.pojo.ScConfig;
67

7-
import java.util.concurrent.BlockingQueue;
8-
import java.util.concurrent.LinkedBlockingQueue;
98

109
/**
1110
* @author liuyu
1211
* @date 2024/9/27
1312
*/
1413
public final class SsReceiver implements Receiver {
15-
public final BlockingQueue<byte[]> serverBytesQueue = new LinkedBlockingQueue<>();
14+
public final BufferPool<byte[]> serverBytesQueue = new BufferPool<>(">SsReceiver-serverBytesQueue");
1615

1716
private long lastUsedTime = RoughTimeUtil.getTimestamp();
1817

run/src/main/resources/debug.ini

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33

44
# netty内存泄露检查级别 0 DISABLED 1 SIMPLE 2 ADVANCED 3 PARANOID
5-
NettyResourceLeakDetectorLevel = 0
5+
NettyResourceLeakDetectorLevel = 3
66
# 是否开启消息流水号 1为开启 用于调试消息后发先至等问题,非调试时流水号为空
7-
OpenSerialNumber = 0
7+
OpenSerialNumber = 1

run/src/main/resources/logback.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
</appender>
2828

2929
<!-- Root logger configuration -->
30-
<root level="INFO">
30+
<root level="DEBUG">
3131
<!-- <appender-ref ref="FILE" />-->
3232
<appender-ref ref="CONSOLE" />
3333
</root>

0 commit comments

Comments
 (0)