Skip to content

Commit 4222993

Browse files
committed
增加一个流水号标识,以便调试包体后发先至的问题
1 parent 7473a94 commit 4222993

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3225
-2816
lines changed

_doc/ProtoMessage.proto

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
syntax = "proto3";
2+
package pojo;
3+
option java_package = "org.wowtools.hppt.common.protobuf";
4+
option java_outer_classname = "ProtoMessage";
5+
6+
//消息字节
7+
message BytesPb{
8+
//真实字节
9+
bytes bytes = 1;
10+
//会话id 通过此id确认字节该与哪个端口/哪个用户端交互
11+
int32 sessionId = 2;
12+
//流水号 用于调试消息后发先至等问题,非调试时流水号为空
13+
int32 serialNumber = 3;
14+
15+
}
16+
17+
message MessagePb{
18+
//消息字节list
19+
repeated BytesPb bytesPbList = 1;
20+
//客户端/服务端需要执行的命令
21+
repeated string commandList = 2;
22+
//流水号 用于调试消息后发先至等问题,非调试时流水号为空
23+
int32 serialNumber = 3;
24+
}
25+
26+
message BytesListPb{
27+
//bytes list
28+
repeated bytes bytesList = 1;
29+
//流水号 用于调试消息后发先至等问题,非调试时流水号为空
30+
int32 serialNumber = 2;
31+
}

_doc/todo.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
post 下载文件会卡住
2+
ws 速度较慢

_localtest/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313

1414

1515
<dependencies>
16+
<dependency>
17+
<groupId>org.wowtools.hppt</groupId>
18+
<artifactId>run</artifactId>
19+
<version>1.0-SNAPSHOT</version>
20+
</dependency>
1621
<dependency>
1722
<groupId>com.squareup.okhttp3</groupId>
1823
<artifactId>okhttp</artifactId>

run/src/main/java/org/wowtools/hppt/common/client/ClientBytesSender.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.netty.channel.Channel;
44
import io.netty.channel.ChannelHandlerContext;
5+
import org.wowtools.hppt.common.pojo.SessionBytes;
56
import org.wowtools.hppt.common.util.RoughTimeUtil;
67

78
/**
@@ -37,7 +38,7 @@ public SessionIdCallBack(ChannelHandlerContext channelHandlerContext) {
3738
* 向目标发送字节的具体方式,如post请求,websocket等
3839
*
3940
* @param clientSession clientSession
40-
* @param bytes bytes
41+
* @param sessionBytes bytes
4142
*/
42-
void sendToTarget(ClientSession clientSession, byte[] bytes);
43+
void sendToTarget(ClientSession clientSession, SessionBytes sessionBytes);
4344
}

run/src/main/java/org/wowtools/hppt/common/client/ClientSessionManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,16 @@
66
import io.netty.channel.socket.SocketChannel;
77
import io.netty.handler.codec.ByteToMessageDecoder;
88
import lombok.extern.slf4j.Slf4j;
9+
import org.wowtools.hppt.common.pojo.SessionBytes;
910
import org.wowtools.hppt.common.util.BytesUtil;
11+
import org.wowtools.hppt.common.util.DebugConfig;
1012
import org.wowtools.hppt.common.util.NettyObjectBuilder;
1113

1214
import java.net.InetSocketAddress;
1315
import java.util.LinkedList;
1416
import java.util.List;
1517
import java.util.Map;
16-
import java.util.concurrent.BlockingQueue;
1718
import java.util.concurrent.ConcurrentHashMap;
18-
import java.util.concurrent.LinkedBlockingQueue;
19-
import java.util.concurrent.TimeUnit;
2019

2120
/**
2221
* ClientSession管理器
@@ -176,7 +175,11 @@ protected synchronized void decode(ChannelHandlerContext channelHandlerContext,
176175
if (null == bytes) {
177176
return;
178177
}
179-
clientBytesSender.sendToTarget(clientSession, bytes);
178+
SessionBytes sessionBytes = new SessionBytes(clientSession.getSessionId(), bytes);
179+
if (DebugConfig.OpenSerialNumber) {
180+
log.debug("用户端发来字节 >sessionBytes-SerialNumber {}", sessionBytes.getSerialNumber());
181+
}
182+
clientBytesSender.sendToTarget(clientSession, sessionBytes);
180183
lifecycle.afterSendToTarget(clientSession, bytes);
181184
} else {
182185
log.warn("找不到channelHandlerContext对应的client session");

run/src/main/java/org/wowtools/hppt/common/client/ClientTalker.java

Lines changed: 58 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package org.wowtools.hppt.common.client;
22

3-
import com.google.protobuf.ByteString;
43
import lombok.extern.slf4j.Slf4j;
54
import org.wowtools.hppt.common.pojo.SessionBytes;
6-
import org.wowtools.hppt.common.protobuf.ProtoMessage;
5+
import org.wowtools.hppt.common.pojo.TalkMessage;
76
import org.wowtools.hppt.common.util.AesCipherUtil;
8-
import org.wowtools.hppt.common.util.BytesUtil;
97
import org.wowtools.hppt.common.util.CommonConfig;
108
import org.wowtools.hppt.common.util.Constant;
9+
import org.wowtools.hppt.common.util.DebugConfig;
1110

1211
import java.nio.charset.StandardCharsets;
1312
import java.util.LinkedList;
@@ -56,7 +55,7 @@ public static byte[] buildSendToServerBytes(CommonConfig config, long maxSendBod
5655
wait = false;
5756
}
5857
//bytes
59-
List<ProtoMessage.BytesPb> bytesPbList = new LinkedList<>();
58+
List<SessionBytes> bytesPbList = new LinkedList<>();
6059
do {
6160
if (sendBodySize >= maxSendBodySize) {
6261
break;
@@ -72,24 +71,23 @@ public static byte[] buildSendToServerBytes(CommonConfig config, long maxSendBod
7271
break;
7372
}
7473
sendBodySize += bytes.getBytes().length;
75-
bytesPbList.add(ProtoMessage.BytesPb.newBuilder()
76-
.setBytes(ByteString.copyFrom(bytes.getBytes()))
77-
.setSessionId(bytes.getSessionId())
78-
.build());
74+
bytesPbList.add(bytes);
7975
} while (true);
8076

8177
if (sendBodySize == 0) {
8278
return null;
8379
}
84-
ProtoMessage.MessagePb.Builder rBuilder = ProtoMessage.MessagePb.newBuilder();
85-
if (!commands.isEmpty()) {
86-
rBuilder.addAllCommandList(commands);
87-
}
88-
if (!bytesPbList.isEmpty()) {
89-
rBuilder.addAllBytesPbList(bytesPbList);
80+
if (DebugConfig.OpenSerialNumber) {
81+
for (SessionBytes sessionBytes : bytesPbList) {
82+
log.debug("ClientTalker收集 >sessionBytes-SerialNumber {}", sessionBytes.getSerialNumber());
83+
}
9084
}
9185

92-
byte[] bytes = rBuilder.build().toByteArray();
86+
TalkMessage talkMessage = new TalkMessage(bytesPbList, commands);
87+
if (DebugConfig.OpenSerialNumber) {
88+
log.debug("ClientTalker组装 >talkMessage-SerialNumber {}", talkMessage.getSerialNumber());
89+
}
90+
byte[] bytes = talkMessage.toProto().build().toByteArray();
9391
//加密
9492
if (config.enableEncrypt) {
9593
bytes = aesCipherUtil.encryptor.encrypt(bytes);
@@ -106,14 +104,17 @@ public static boolean receiveServerBytes(CommonConfig config, byte[] responseBod
106104
if (null == responseBody) {
107105
return true;
108106
}
109-
ProtoMessage.MessagePb rMessagePb;
107+
TalkMessage talkMessage;
110108
try {
111109
//解密
112110
if (config.enableEncrypt) {
113111
responseBody = aesCipherUtil.descriptor.decrypt(responseBody);
114112
}
115113
log.debug("收到服务端发回字节数 {}", responseBody.length);
116-
rMessagePb = ProtoMessage.MessagePb.parseFrom(responseBody);
114+
talkMessage = new TalkMessage(responseBody);
115+
if (DebugConfig.OpenSerialNumber) {
116+
log.debug("ClientTalker收到服务端发回 <talkMessage-SerialNumber {}", talkMessage.getSerialNumber());
117+
}
117118
} catch (Exception e) {
118119
log.warn("服务端响应错误 {}", new String(responseBody, StandardCharsets.UTF_8), e);
119120
Thread.sleep(10000);
@@ -122,51 +123,54 @@ public static boolean receiveServerBytes(CommonConfig config, byte[] responseBod
122123

123124
boolean isEmpty = true;
124125
//收命令
125-
for (String command : rMessagePb.getCommandListList()) {
126-
log.debug("收到服务端命令 {} ", command);
127-
char type = command.charAt(0);
128-
switch (type) {
129-
case Constant.ScCommands.InitSession -> {
130-
//sessionId,initFlag
131-
String[] params = command.substring(1).split(Constant.sessionIdJoinFlag);
132-
int sessionId = Integer.parseInt(params[0]);
133-
int initFlag = Integer.parseInt(params[1]);
134-
ClientBytesSender.SessionIdCallBack sessionIdCallBack = sessionIdCallBackMap.remove(initFlag);
135-
if (null != sessionIdCallBack) {
136-
sessionIdCallBack.cb(sessionId);
137-
} else {
138-
log.warn("没有对应的SessionIdCallBack {}", sessionIdCallBack);
126+
if (null != talkMessage.getCommands() && !talkMessage.getCommands().isEmpty()) {
127+
for (String command : talkMessage.getCommands()) {
128+
log.debug("收到服务端命令 {} ", command);
129+
char type = command.charAt(0);
130+
switch (type) {
131+
case Constant.ScCommands.InitSession -> {
132+
//sessionId,initFlag
133+
String[] params = command.substring(1).split(Constant.sessionIdJoinFlag);
134+
int sessionId = Integer.parseInt(params[0]);
135+
int initFlag = Integer.parseInt(params[1]);
136+
ClientBytesSender.SessionIdCallBack sessionIdCallBack = sessionIdCallBackMap.remove(initFlag);
137+
if (null != sessionIdCallBack) {
138+
sessionIdCallBack.cb(sessionId);
139+
} else {
140+
log.warn("没有对应的SessionIdCallBack {}", sessionIdCallBack);
141+
}
139142
}
140-
}
141-
case Constant.ScCommands.CloseSession -> {
142-
int sessionId = Integer.parseInt(command.substring(1));
143-
ClientSession session = clientSessionManager.getClientSessionBySessionId(sessionId);
144-
if (null != session) {
145-
clientSessionManager.disposeClientSession(session, "服务端发送关闭命令");
143+
case Constant.ScCommands.CloseSession -> {
144+
int sessionId = Integer.parseInt(command.substring(1));
145+
ClientSession session = clientSessionManager.getClientSessionBySessionId(sessionId);
146+
if (null != session) {
147+
clientSessionManager.disposeClientSession(session, "服务端发送关闭命令");
148+
}
146149
}
147-
}
148-
case Constant.ScCommands.CheckSessionActive -> {
149-
int sessionId = Integer.parseInt(command.substring(1));
150-
ClientSession session = clientSessionManager.getClientSessionBySessionId(sessionId);
151-
if (null != session) {
152-
//session存在,则发送存活消息
153-
sendCommandQueue.add(String.valueOf(Constant.SsCommands.ActiveSession) + sessionId);
154-
} else {
155-
//否则发送关闭消息
156-
sendCommandQueue.add(String.valueOf(Constant.SsCommands.CloseSession) + sessionId);
150+
case Constant.ScCommands.CheckSessionActive -> {
151+
int sessionId = Integer.parseInt(command.substring(1));
152+
ClientSession session = clientSessionManager.getClientSessionBySessionId(sessionId);
153+
if (null != session) {
154+
//session存在,则发送存活消息
155+
sendCommandQueue.add(String.valueOf(Constant.SsCommands.ActiveSession) + sessionId);
156+
} else {
157+
//否则发送关闭消息
158+
sendCommandQueue.add(String.valueOf(Constant.SsCommands.CloseSession) + sessionId);
159+
}
157160
}
158161
}
159162
}
160163
}
161164

165+
162166
//收字节
163-
List<ProtoMessage.BytesPb> rBytesPbListList = rMessagePb.getBytesPbListList();
164-
if (!rBytesPbListList.isEmpty()) {
167+
List<SessionBytes> sessionBytes = talkMessage.getSessionBytes();
168+
if (null != sessionBytes && !sessionBytes.isEmpty()) {
165169
isEmpty = false;
166-
for (ProtoMessage.BytesPb bytesPb : rBytesPbListList) {
167-
ClientSession clientSession = clientSessionManager.getClientSessionBySessionId(bytesPb.getSessionId());
170+
for (SessionBytes sessionByte : sessionBytes) {
171+
ClientSession clientSession = clientSessionManager.getClientSessionBySessionId(sessionByte.getSessionId());
168172
if (clientSession != null) {
169-
clientSession.sendToUser(bytesPb.getBytes().toByteArray());
173+
clientSession.sendToUser(sessionByte.getBytes());
170174
} else {
171175
//客户端没有这个session,异步等待一下看是否是未初始化完成
172176
Thread.startVirtualThread(() -> {
@@ -178,9 +182,9 @@ public static boolean receiveServerBytes(CommonConfig config, byte[] responseBod
178182
} catch (InterruptedException e) {
179183
continue;
180184
}
181-
clientSession1 = clientSessionManager.getClientSessionBySessionId(bytesPb.getSessionId());
185+
clientSession1 = clientSessionManager.getClientSessionBySessionId(sessionByte.getSessionId());
182186
if (null != clientSession1) {
183-
clientSession1.sendToUser(bytesPb.getBytes().toByteArray());
187+
clientSession1.sendToUser(sessionByte.getBytes());
184188
return;
185189
}
186190
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.wowtools.hppt.common.pojo;
2+
3+
import com.google.protobuf.ByteString;
4+
import com.google.protobuf.InvalidProtocolBufferException;
5+
import lombok.Getter;
6+
import org.wowtools.hppt.common.protobuf.ProtoMessage;
7+
import org.wowtools.hppt.common.util.DebugConfig;
8+
9+
import java.util.ArrayList;
10+
import java.util.Collection;
11+
import java.util.List;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
/**
15+
* @author liuyu
16+
* @date 2024/10/23
17+
*/
18+
public class BytesList {
19+
private static final AtomicInteger serialNumberBuilder;
20+
21+
static {
22+
if (DebugConfig.OpenSerialNumber) {
23+
serialNumberBuilder = new AtomicInteger();
24+
} else {
25+
serialNumberBuilder = null;
26+
}
27+
}
28+
29+
private final Collection<byte[]> bytesCollection;
30+
private final int serialNumber;
31+
32+
public BytesList(Collection<byte[]> bytesCollection) {
33+
this.bytesCollection = bytesCollection;
34+
if (!DebugConfig.OpenSerialNumber) {
35+
serialNumber = 0;
36+
} else {
37+
serialNumber = serialNumberBuilder.incrementAndGet();
38+
}
39+
}
40+
41+
public BytesList(byte[] pbBytes) {
42+
ProtoMessage.BytesListPb pb;
43+
try {
44+
pb = ProtoMessage.BytesListPb.parseFrom(pbBytes);
45+
} catch (InvalidProtocolBufferException e) {
46+
throw new RuntimeException(e);
47+
}
48+
List<ByteString> byteStringList = pb.getBytesListList();
49+
ArrayList<byte[]> res = new ArrayList<>(byteStringList.size());
50+
for (ByteString s : byteStringList) {
51+
res.add(s.toByteArray());
52+
}
53+
this.bytesCollection = res;
54+
55+
if (!DebugConfig.OpenSerialNumber) {
56+
serialNumber = 0;
57+
} else {
58+
serialNumber = pb.getSerialNumber();
59+
}
60+
}
61+
62+
public Collection<byte[]> getBytes(){
63+
return bytesCollection;
64+
}
65+
66+
public int getSerialNumber() {
67+
return serialNumber;
68+
}
69+
70+
public ProtoMessage.BytesListPb.Builder toProto() {
71+
List<ByteString> byteStringList = new ArrayList<>(bytesCollection.size());
72+
for (byte[] bytes : bytesCollection) {
73+
byteStringList.add(ByteString.copyFrom(bytes));
74+
}
75+
ProtoMessage.BytesListPb.Builder builder = ProtoMessage.BytesListPb.newBuilder();
76+
builder.addAllBytesList(byteStringList);
77+
return builder;
78+
}
79+
}

0 commit comments

Comments
 (0)