Skip to content

Commit 58564f6

Browse files
committed
增加一些可配置的延迟参数,以便人工控制请求频率
1 parent 6b422fa commit 58564f6

File tree

12 files changed

+98
-106
lines changed

12 files changed

+98
-106
lines changed

.gitignore

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ target/
44
!**/src/test/**/target/
55

66
### IntelliJ IDEA ###
7-
.idea/modules.xml
8-
.idea/jarRepositories.xml
9-
.idea/compiler.xml
10-
.idea/libraries/
7+
.idea/
118
*.iws
129
*.iml
1310
*.ipr

common/src/main/java/org/wowtools/hppt/common/client/ClientTalker.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,7 @@ public static byte[] buildSendToServerBytes(CommonConfig config, long maxSendBod
9090
}
9191

9292
byte[] bytes = rBuilder.build().toByteArray();
93-
//压缩 加密
94-
if (config.enableCompress) {
95-
bytes = BytesUtil.compress(bytes);
96-
}
93+
//加密
9794
if (config.enableEncrypt) {
9895
bytes = aesCipherUtil.encryptor.encrypt(bytes);
9996
}
@@ -111,13 +108,10 @@ public static boolean receiveServerBytes(CommonConfig config, byte[] responseBod
111108
}
112109
ProtoMessage.MessagePb rMessagePb;
113110
try {
114-
//解密、解压
111+
//解密
115112
if (config.enableEncrypt) {
116113
responseBody = aesCipherUtil.descriptor.decrypt(responseBody);
117114
}
118-
if (config.enableCompress) {
119-
responseBody = BytesUtil.decompress(responseBody);
120-
}
121115
log.debug("收到服务端发回字节数 {}", responseBody.length);
122116
rMessagePb = ProtoMessage.MessagePb.parseFrom(responseBody);
123117
} catch (Exception e) {

common/src/main/java/org/wowtools/hppt/common/server/ServerTalker.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,10 @@ public static void receiveClientBytes(CommonConfig config, ServerSessionManager
2626
if (null == bytes || bytes.length == 0) {
2727
return;
2828
}
29-
//解密 解压
29+
//解密
3030
if (config.enableEncrypt) {
3131
bytes = client.aesCipherUtil.descriptor.decrypt(bytes);
3232
}
33-
if (config.enableCompress) {
34-
bytes = BytesUtil.decompress(bytes);
35-
}
36-
3733
ProtoMessage.MessagePb inputMessage = ProtoMessage.MessagePb.parseFrom(bytes);
3834
Map<Integer, ServerSession> serverSessionMap = serverSessionManager.getServerSessionMapByClientId(client.clientId);
3935

@@ -116,10 +112,7 @@ public static byte[] replyToClient(CommonConfig config, ServerSessionManager ser
116112

117113

118114
byte[] bytes = rBuilder.build().toByteArray();
119-
//压缩 加密
120-
if (config.enableCompress) {
121-
bytes = BytesUtil.compress(bytes);
122-
}
115+
//加密
123116
if (config.enableEncrypt) {
124117
bytes = client.aesCipherUtil.encryptor.encrypt(bytes);
125118
}

common/src/main/java/org/wowtools/hppt/common/util/CommonConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
* @date 2024/1/25
66
*/
77
public class CommonConfig {
8-
/**
9-
* 是否启用压缩,默认启用
10-
*/
11-
public boolean enableCompress = true;
8+
// /**
9+
// * 是否启用压缩,默认启用
10+
// */
11+
// public boolean enableCompress = true;
1212

1313
/**
1414
* 是否启用内容加密,默认启用

run/src/main/java/org/wowtools/hppt/run/sc/common/ClientSessionService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public abstract class ClientSessionService {
4040
private boolean firstLoginErr = true;
4141
private boolean noLogin = true;
4242

43-
protected volatile boolean actived = true;
43+
protected volatile boolean running = true;
4444

4545
/**
4646
* 当一个事件结束时发起的回调
@@ -145,7 +145,7 @@ protected void receiveServerBytes(byte[] bytes) throws Exception {
145145
* 当发生难以修复的异常等情况时,主动调用此方法结束当前服务,以便后续自动重启等操作
146146
*/
147147
public void exit() {
148-
actived = false;
148+
running = false;
149149
clientSessionManager.close();
150150
try {
151151
doClose();
@@ -180,7 +180,7 @@ public void sync() {
180180

181181
private Thread buildSendThread() {
182182
return new Thread(() -> {
183-
while (actived) {
183+
while (running) {
184184
try {
185185
byte[] sendBytes = ClientTalker.buildSendToServerBytes(config, config.maxSendBodySize, sendCommandQueue, sendBytesQueue, aesCipherUtil, true);
186186
if (null != sendBytes) {

run/src/main/java/org/wowtools/hppt/run/sc/post/PostClientSessionService.java

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.wowtools.hppt.run.sc.common.ClientSessionService;
99
import org.wowtools.hppt.run.sc.pojo.ScConfig;
1010

11+
import java.nio.charset.StandardCharsets;
1112
import java.util.LinkedList;
1213
import java.util.List;
1314
import java.util.UUID;
@@ -45,32 +46,32 @@ protected void connectToServer(ScConfig config, Cb cb) {
4546
private void startSendThread(Cb cb) {
4647
Thread.startVirtualThread(() -> {
4748
//等待初始化完成
48-
while (null == sendQueue) {
49-
try {
50-
Thread.sleep(10);
51-
} catch (InterruptedException e) {
52-
throw new RuntimeException(e);
53-
}
54-
}
5549
cb.end();
50+
//起一个while循环不断发送数据
5651
final long sendSleepTime = config.post.sendSleepTime;
57-
while (actived) {
58-
if (sendSleepTime > 0) {
59-
try {
60-
Thread.sleep(sendSleepTime);
61-
} catch (InterruptedException e) {
62-
throw new RuntimeException(e);
63-
}
64-
}
52+
while (running) {
6553
try {
6654
byte[] sendBytes;
6755
List<byte[]> bytesList = new LinkedList<>();
6856
sendBytes = sendQueue.take();
6957
bytesList.add(sendBytes);
58+
if (sendSleepTime > 0) {
59+
try {
60+
Thread.sleep(sendSleepTime);
61+
} catch (InterruptedException e) {
62+
throw new RuntimeException(e);
63+
}
64+
}
7065
sendQueue.drainTo(bytesList);
7166
sendBytes = BytesUtil.bytesCollection2PbBytes(bytesList);
72-
try (Response ignored = HttpUtil.doPost(sendUrl, sendBytes)) {
73-
log.debug("startSendThread 发送完成");
67+
try (Response r = HttpUtil.doPost(sendUrl, sendBytes)) {
68+
assert r.body() != null;
69+
byte[] rBytes = r.body().bytes();
70+
if (rBytes.length == 0) {
71+
log.debug("SendThread 发送完成");
72+
} else {
73+
throw new RuntimeException("异常的响应值" + new String(rBytes, StandardCharsets.UTF_8));
74+
}
7475
}
7576
} catch (Exception e) {
7677
log.warn("SendThread异常", e);
@@ -86,24 +87,23 @@ private void startReplyThread() {
8687
Thread.startVirtualThread(() -> {
8788
boolean empty = false;
8889
final long sendSleepTime = config.post.sendSleepTime;
89-
while (actived) {
90-
if (sendSleepTime > 0) {
91-
try {
92-
Thread.sleep(sendSleepTime);
93-
} catch (InterruptedException e) {
94-
throw new RuntimeException(e);
95-
}
96-
}
90+
while (running) {
91+
//检测是否需要挂起接收线程
9792
if (empty && !isNoLogin() && clientSessionManager.getSessionNum() == 0) {
9893
synchronized (replyThreadEmptyLock) {
99-
log.info("无客户端,睡眠接收线程");
94+
log.info("无客户端,挂起接收线程");
10095
try {
10196
replyThreadEmptyLock.wait();
10297
} catch (InterruptedException e) {
10398
log.info("唤醒接收线程");
99+
if (!running) {
100+
log.info("退出已停止ReplyThread");
101+
return;
102+
}
104103
}
105104
}
106105
}
106+
//发一个接收请求接数据
107107
try {
108108
byte[] responseBytes;
109109
try (Response response = HttpUtil.doPost(replyUrl, null)) {
@@ -112,18 +112,32 @@ private void startReplyThread() {
112112
}
113113
if (null != responseBytes && responseBytes.length > 0) {
114114
log.debug("收到服务端响应字节数 {}", responseBytes.length);
115-
List<byte[]> bytesList = BytesUtil.pbBytes2BytesList(responseBytes);
115+
List<byte[]> bytesList;
116+
try {
117+
bytesList = BytesUtil.pbBytes2BytesList(responseBytes);
118+
} catch (Exception e) {
119+
log.warn("解析protobuf字节异常 {}", new String(responseBytes, StandardCharsets.UTF_8));
120+
throw e;
121+
}
116122
for (byte[] bytes : bytesList) {
117123
receiveServerBytes(bytes);
118124
}
119125
empty = bytesList.isEmpty();
120-
}else {
126+
} else {
121127
empty = true;
122128
}
123129
} catch (Exception e) {
124130
log.warn("ReplyThread异常", e);
125131
exit();
126132
}
133+
//按需做等待
134+
if (sendSleepTime > 0) {
135+
try {
136+
Thread.sleep(sendSleepTime);
137+
} catch (InterruptedException e) {
138+
throw new RuntimeException(e);
139+
}
140+
}
127141
}
128142
});
129143
}
@@ -140,4 +154,14 @@ protected void newConnected() {
140154
replyThreadEmptyLock.notify();
141155
}
142156
}
157+
158+
@Override
159+
public void exit() {
160+
super.exit();
161+
Thread.startVirtualThread(() -> {
162+
synchronized (replyThreadEmptyLock) {
163+
replyThreadEmptyLock.notify();
164+
}
165+
});
166+
}
143167
}

run/src/main/java/org/wowtools/hppt/run/ss/common/ServerSessionService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ public void receiveClientBytes(CTX ctx, byte[] bytes) {
8080
if (null == bytes || bytes.length == 0) {
8181
return;
8282
}
83-
//TODO 锁ctx异步
8483
log.debug("收到客户端字节数 {}", bytes.length);
8584
// 若客户端为空,则进行对时或登录
8685
ClientCell clientCell = ctxClientCellMap.get(ctx);

run/src/main/java/org/wowtools/hppt/run/ss/pojo/SsConfig.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,14 @@ public class SsConfig extends CommonConfig {
4545

4646
public static final class PostConfig {
4747
/**
48-
* 等待真实端口返回数据的毫秒数
48+
* 等待真实端口返回数据的毫秒数,一般设一个略小于http服务超时时间的值
4949
*/
5050
public long waitResponseTime = 30000;
51+
52+
/**
53+
* 回复的servlet人为设置的延迟,避免客户端过于频繁的发请求
54+
*/
55+
public long replyDelayTime = 0;
5156
}
5257

5358
public PostConfig post = new PostConfig();

run/src/main/java/org/wowtools/hppt/run/ss/post/PostCtx.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,4 @@ public PostCtx(String cookie) {
1515
this.cookie = cookie;
1616
}
1717

18-
Thread waitResponseThread;
19-
2018
}

run/src/main/java/org/wowtools/hppt/run/ss/post/PostServerSessionService.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import jakarta.servlet.http.HttpServletResponse;
66
import lombok.extern.slf4j.Slf4j;
77
import org.eclipse.jetty.server.Server;
8+
import org.eclipse.jetty.server.ServerConnector;
89
import org.eclipse.jetty.servlet.ErrorPageErrorHandler;
910
import org.eclipse.jetty.servlet.ServletContextHandler;
1011
import org.eclipse.jetty.servlet.ServletHolder;
@@ -32,10 +33,18 @@ public PostServerSessionService(SsConfig ssConfig) throws Exception {
3233
@Override
3334
public void init(SsConfig ssConfig) throws Exception {
3435
log.info("*********");
35-
server = new Server(ssConfig.port);
36+
server = new Server();
37+
// 创建一个ServerConnector
38+
ServerConnector connector = new ServerConnector(server);
39+
connector.setPort(ssConfig.port);
40+
// 设置请求超时时间(以毫秒为单位)
41+
connector.setIdleTimeout(ssConfig.post.waitResponseTime * 2);
42+
// 将connector添加到server
43+
server.addConnector(connector);
44+
3645
ServletContextHandler context = new ServletContextHandler(server, "/");
3746
context.addServlet(new ServletHolder(new SendServlet(this)), "/s");
38-
context.addServlet(new ServletHolder(new ReplyServlet(this, ssConfig.post.waitResponseTime)), "/r");
47+
context.addServlet(new ServletHolder(new ReplyServlet(this, ssConfig.post.waitResponseTime, ssConfig.post.replyDelayTime)), "/r");
3948
context.addServlet(new ServletHolder(new ErrorServlet()), "/e");
4049

4150
ErrorPageErrorHandler errorHandler = new ErrorPageErrorHandler();

0 commit comments

Comments
 (0)