Skip to content

Commit 7473a94

Browse files
committed
tmp
1 parent 8b66a89 commit 7473a94

16 files changed

+275
-54
lines changed

run/src/main/java/org/wowtools/hppt/common/client/ClientBytesSender.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.wowtools.hppt.common.client;
22

3+
import io.netty.channel.Channel;
34
import io.netty.channel.ChannelHandlerContext;
45
import org.wowtools.hppt.common.util.RoughTimeUtil;
56

@@ -14,6 +15,12 @@ public interface ClientBytesSender {
1415
*/
1516
public static abstract class SessionIdCallBack {
1617
public final long createTime = RoughTimeUtil.getTimestamp();
18+
public final ChannelHandlerContext channelHandlerContext;
19+
20+
public SessionIdCallBack(ChannelHandlerContext channelHandlerContext) {
21+
this.channelHandlerContext = channelHandlerContext;
22+
}
23+
1724
public abstract void cb(int sessionId);
1825
}
1926

run/src/main/java/org/wowtools/hppt/common/client/ClientSession.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ public class ClientSession {
3939
bytes = lifecycle.beforeSendToUser(this, bytes);
4040
if (null != bytes) {
4141
log.debug("ClientSession {} 向用户发送字节 {}", sessionId, bytes.length);
42-
BytesUtil.writeToChannelHandlerContext(channelHandlerContext, bytes);
42+
Throwable e = BytesUtil.writeToChannelHandlerContext(channelHandlerContext, bytes);
43+
if (null != e) {
44+
log.warn("向用户发送字节异常",e);
45+
close();
46+
}else if (log.isDebugEnabled()){
47+
log.debug("ClientSession {} 向用户发送字节完成 {}", sessionId, bytes.length);
48+
}
4349
lifecycle.afterSendToUser(this, bytes);
4450
}
4551
}

run/src/main/java/org/wowtools/hppt/common/client/ClientSessionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void channelActive(ChannelHandlerContext channelHandlerContext) throws Ex
117117
int localPort = localAddress.getPort();
118118
log.debug("client channelActive {}, port:{}", channelHandlerContext.hashCode(), localPort);
119119
//用户发起新连接 新建一个ClientSession
120-
ClientBytesSender.SessionIdCallBack cb = new ClientBytesSender.SessionIdCallBack() {
120+
ClientBytesSender.SessionIdCallBack cb = new ClientBytesSender.SessionIdCallBack(channelHandlerContext) {
121121
@Override
122122
public void cb(int sessionId) {
123123
ClientSession clientSession = new ClientSession(sessionId, channelHandlerContext, lifecycle);
@@ -150,7 +150,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
150150
}
151151

152152
@Override
153-
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
153+
protected synchronized void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
154154
byte[] bytes = BytesUtil.byteBuf2bytes(byteBuf);
155155
ClientSession clientSession = null;
156156
for (int i = 0; i < 1000; i++) {
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.wowtools.hppt.common.pojo;
2+
3+
/**
4+
* 可发送的SessionBytes,被发送后会触发回调函数并告知是否成功
5+
* @author liuyu
6+
* @date 2024/10/20
7+
*/
8+
public class SendAbleSessionBytes {
9+
public interface CallBack{
10+
void cb(boolean success);
11+
}
12+
13+
public final SessionBytes sessionBytes;
14+
15+
public final CallBack callBack;
16+
17+
public SendAbleSessionBytes(SessionBytes sessionBytes, CallBack callBack) {
18+
this.sessionBytes = sessionBytes;
19+
this.callBack = callBack;
20+
}
21+
}

run/src/main/java/org/wowtools/hppt/common/server/LoginClientService.java

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.wowtools.hppt.common.server;
22

3+
import org.wowtools.hppt.common.pojo.SendAbleSessionBytes;
34
import org.wowtools.hppt.common.pojo.SessionBytes;
45
import org.wowtools.hppt.common.util.AesCipherUtil;
56
import org.wowtools.hppt.common.util.BytesUtil;
@@ -35,7 +36,7 @@ public static final class Client {
3536

3637
private final BlockingQueue<String> commandQueue = new LinkedBlockingQueue<>();
3738

38-
private final BlockingQueue<SessionBytes> sessionBytesQueue = new LinkedBlockingQueue<>();
39+
private final BlockingQueue<SendAbleSessionBytes> sessionBytesQueue = new LinkedBlockingQueue<>();
3940
public final BlockingQueue<byte[]> receiveClientBytes = new LinkedBlockingQueue<>();
4041

4142
private final HashMap<Integer, ServerSession> sessions = new HashMap<>();
@@ -83,38 +84,42 @@ public void removeSession(ServerSession session) {
8384
}
8485

8586
//添加一条向客户端发送的bytes
86-
public void addBytes(int sessionId, byte[] bytes) {
87-
sessionBytesQueue.add(new SessionBytes(sessionId, bytes));
87+
public void addBytes(int sessionId, byte[] bytes, SendAbleSessionBytes.CallBack callBack) {
88+
SendAbleSessionBytes sessionBytes = new SendAbleSessionBytes(
89+
new SessionBytes(sessionId, bytes),
90+
callBack
91+
);
92+
sessionBytesQueue.add(sessionBytes);
8893
}
8994

9095
//取出所有需要向客户端发送的bytes 取出的bytes会按相同sessionId进行整合 无bytes则返回null
91-
public List<SessionBytes> fetchBytes(long maxReturnBodySize) {
96+
public List<SendAbleSessionBytes> fetchBytes(long maxReturnBodySize) {
9297
if (sessionBytesQueue.isEmpty()) {
9398
return null;
9499
}
95-
List<SessionBytes> bytesList = new LinkedList<>();
100+
List<SendAbleSessionBytes> bytesList = new LinkedList<>();
96101
if (maxReturnBodySize < 0) {
97102
sessionBytesQueue.drainTo(bytesList);
98103
} else {
99104
//根据maxReturnBodySize的限制取出队列中的数据返回
100105
long currentReturnBodySize = 0L;
101106
while (currentReturnBodySize < maxReturnBodySize) {
102-
SessionBytes next = sessionBytesQueue.poll();
107+
SendAbleSessionBytes next = sessionBytesQueue.poll();
103108
if (null == next) {
104109
break;
105110
}
106111
bytesList.add(next);
107-
currentReturnBodySize += next.getBytes().length;
112+
currentReturnBodySize += next.sessionBytes.getBytes().length;
108113
}
109114
}
110115
return merge(bytesList);
111116

112117
}
113118

114119
//取出所有需要向客户端发送的bytes 取出的bytes会按相同sessionId进行整合 无bytes则阻塞3秒后返回
115-
public List<SessionBytes> fetchBytesBlocked(long maxReturnBodySize) {
116-
List<SessionBytes> bytesList = new LinkedList<>();
117-
SessionBytes first;
120+
public List<SendAbleSessionBytes> fetchBytesBlocked(long maxReturnBodySize) {
121+
List<SendAbleSessionBytes> bytesList = new LinkedList<>();
122+
SendAbleSessionBytes first;
118123
try {
119124
first = sessionBytesQueue.poll(3, TimeUnit.SECONDS);
120125
} catch (InterruptedException e) {
@@ -132,29 +137,46 @@ public List<SessionBytes> fetchBytesBlocked(long maxReturnBodySize) {
132137
return merge(bytesList);
133138
} else {
134139
//根据maxReturnBodySize的限制取出队列中的数据返回
135-
long currentReturnBodySize = first.getBytes().length;
140+
long currentReturnBodySize = first.sessionBytes.getBytes().length;
136141
while (currentReturnBodySize < maxReturnBodySize) {
137-
SessionBytes next = sessionBytesQueue.poll();
142+
SendAbleSessionBytes next = sessionBytesQueue.poll();
138143
if (null == next) {
139144
break;
140145
}
141146
bytesList.add(next);
142-
currentReturnBodySize += next.getBytes().length;
147+
currentReturnBodySize += next.sessionBytes.getBytes().length;
143148
}
144149
return merge(bytesList);
145150
}
146151

147152
}
148153

149-
private static List<SessionBytes> merge(List<SessionBytes> bytesList) {
150-
Map<Integer, List<byte[]>> bytesMap = new HashMap<>();
151-
for (SessionBytes bytes : bytesList) {
152-
bytesMap.computeIfAbsent(bytes.getSessionId(), (r) -> new LinkedList<>())
153-
.add(bytes.getBytes());
154+
private static final class MergeCell {
155+
private final List<byte[]> bytesList = new LinkedList<>();
156+
private final List<SendAbleSessionBytes.CallBack> callBacks = new LinkedList<>();
157+
}
158+
159+
private static List<SendAbleSessionBytes> merge(List<SendAbleSessionBytes> bytesList) {
160+
Map<Integer, MergeCell> bytesMap = new HashMap<>();
161+
for (SendAbleSessionBytes ssb : bytesList) {
162+
MergeCell mergeCell = bytesMap.computeIfAbsent(ssb.sessionBytes.getSessionId(), (r) -> new MergeCell());
163+
mergeCell.bytesList.add(ssb.sessionBytes.getBytes());
164+
mergeCell.callBacks.add(ssb.callBack);
154165
}
155-
List<SessionBytes> res = new ArrayList<>(bytesMap.size());
156-
bytesMap.forEach((sessionId, bytes) -> {
157-
res.add(new SessionBytes(sessionId, BytesUtil.merge(bytes)));
166+
List<SendAbleSessionBytes> res = new ArrayList<>(bytesMap.size());
167+
bytesMap.forEach((sessionId, mergeCell) -> {
168+
SessionBytes sessionBytes = new SessionBytes(sessionId, BytesUtil.merge(mergeCell.bytesList));
169+
SendAbleSessionBytes.CallBack callBack;
170+
if (mergeCell.callBacks.size() == 1) {
171+
callBack = mergeCell.callBacks.get(0);
172+
} else {
173+
callBack = (success) -> {
174+
for (SendAbleSessionBytes.CallBack callBack1 : mergeCell.callBacks) {
175+
callBack1.cb(success);
176+
}
177+
};
178+
}
179+
res.add(new SendAbleSessionBytes(sessionBytes, callBack));
158180
});
159181
return res;
160182
}

run/src/main/java/org/wowtools/hppt/common/server/ServerSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,16 @@ private void startSendThread() {
5151
continue;
5252
}
5353
bytes = lifecycle.beforeSendToTarget(this, bytes);
54+
5455
if (bytes != null) {
5556
Throwable e = BytesUtil.writeToChannel(channel, bytes);
5657
if (null != e) {
58+
log.warn("BytesUtil.writeToChannel err", e);
5759
throw e;
5860
}
61+
if (log.isDebugEnabled()) {
62+
log.debug("向目标端口发送字节 {}", bytes.length);
63+
}
5964
lifecycle.afterSendToTarget(this, bytes);
6065
}
6166
} catch (Throwable e) {

run/src/main/java/org/wowtools/hppt/common/server/ServerSessionLifecycle.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.wowtools.hppt.common.server;
22

33

4+
import org.wowtools.hppt.common.pojo.SendAbleSessionBytes;
5+
46
/**
57
* ServerSession的生命周期,包含ServerSession从创建、交互、销毁各过程的触发事件
68
*
@@ -43,9 +45,10 @@ default void afterSendToTarget(ServerSession serverSession, byte[] bytes) {
4345
*
4446
* @param serverSession ServerSession
4547
* @param bytes 发送的字节
48+
* @param callBack 回调,not null
4649
*/
47-
default void sendToClientBuffer(ServerSession serverSession, byte[] bytes, LoginClientService.Client client) {
48-
client.addBytes(serverSession.getSessionId(), bytes);
50+
default void sendToClientBuffer(ServerSession serverSession, byte[] bytes, LoginClientService.Client client, SendAbleSessionBytes.CallBack callBack) {
51+
client.addBytes(serverSession.getSessionId(), bytes,callBack);
4952
}
5053

5154
/**

run/src/main/java/org/wowtools/hppt/common/server/ServerSessionManager.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
import io.netty.buffer.ByteBuf;
55
import io.netty.channel.*;
66
import io.netty.channel.socket.SocketChannel;
7+
import io.netty.handler.logging.LogLevel;
8+
import io.netty.handler.logging.LoggingHandler;
79
import lombok.extern.slf4j.Slf4j;
10+
import org.wowtools.hppt.common.pojo.SendAbleSessionBytes;
811
import org.wowtools.hppt.common.util.BytesUtil;
912
import org.wowtools.hppt.common.util.Constant;
1013
import org.wowtools.hppt.common.util.NettyObjectBuilder;
@@ -43,13 +46,15 @@ public class ServerSessionManager implements AutoCloseable {
4346
ServerSessionManager(ServerSessionManagerBuilder builder) {
4447
lifecycle = builder.lifecycle;
4548
sessionTimeout = builder.sessionTimeout;
49+
// bootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 1024); // 设置接收缓冲区为1MB
50+
// bootstrap.option(ChannelOption.SO_SNDBUF, 1024 * 1024); // 设置发送缓冲区为1MB
4651
bootstrap.group(builder.group)
47-
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) sessionTimeout)
4852
.channel(NettyObjectBuilder.getSocketChannelClass())
4953
.handler(new ChannelInitializer<SocketChannel>() {
5054
@Override
5155
protected void initChannel(SocketChannel ch) {
5256
ChannelPipeline pipeline = ch.pipeline();
57+
// pipeline.addLast(new LoggingHandler(LogLevel.INFO));
5358
pipeline.addLast(new SimpleHandler());
5459
}
5560
});
@@ -215,21 +220,53 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
215220
disposeServerSession(session, "channelInactive");
216221
}
217222

223+
private static final class CallBack implements SendAbleSessionBytes.CallBack {
224+
private final CompletableFuture<Boolean> future;
225+
226+
public CallBack(CompletableFuture<Boolean> future) {
227+
this.future = future;
228+
}
229+
230+
@Override
231+
public void cb(boolean success) {
232+
//锁住当前线程直至字节发送成功,避免缓冲区积压过多数据或后发先至问题
233+
future.complete(success);
234+
}
235+
}
236+
218237
@Override
219238
public void channelRead(ChannelHandlerContext ctx, Object msg) {
220239
ByteBuf buf = (ByteBuf) msg;
221240
byte[] bytes = null;
222241
try {
223242
bytes = BytesUtil.byteBuf2bytes(buf);
224-
}finally {
243+
} finally {
225244
//channelRead方法需要手动释放ByteBuf
226245
buf.release();
227246
}
228247
ServerSession session = getServeSession(ctx);
229248
if (null != session) {
230249
session.activeSession();
231-
log.debug("serverSession {} 收到目标端口字节 {}", session, bytes.length);
232-
lifecycle.sendToClientBuffer(session, bytes, session.getClient());
250+
log.debug("serverSession {} 收到目标端口字节 {} {}", session, bytes.length, this);
251+
CompletableFuture<Boolean> future = new CompletableFuture<>();
252+
CallBack callBack = new CallBack(future);
253+
lifecycle.sendToClientBuffer(session, bytes, session.getClient(), callBack);
254+
Boolean success;
255+
try {
256+
success = future.get(30, TimeUnit.SECONDS);
257+
} catch (Exception e) {
258+
log.warn("serverSession {} 字节发送异常 {}", session, bytes.length, e);
259+
throw new RuntimeException(e);
260+
}
261+
log.debug("serverSession {} 字节发送至客户端完成 {} success? {} {}", session, bytes.length, success, this);
262+
263+
if (null == success) {
264+
throw new RuntimeException("字节发送超时, session: " + session.getSessionId());
265+
}
266+
if (!success) {
267+
throw new RuntimeException("字节发送失败, session: " + session.getSessionId());
268+
}
269+
233270
lifecycle.afterSendToTarget(session, bytes);
234271
} else {
235272
log.warn("channelRead session不存在");
@@ -250,6 +287,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
250287
@Override
251288
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
252289
super.channelReadComplete(ctx);
290+
ctx.flush();
253291
}
254292
}
255293

run/src/main/java/org/wowtools/hppt/common/server/ServerSessionManagerBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public ServerSessionManagerBuilder setGroup(EventLoopGroup group) {
2929

3030
public ServerSessionManager build() {
3131
if (group == null) {
32-
group = NettyObjectBuilder.buildVirtualThreadEventLoopGroup();
32+
group = NettyObjectBuilder.buildEventLoopGroup();
3333
}
3434

3535
if (lifecycle == null) {

0 commit comments

Comments
 (0)