Skip to content

Commit 8b66a89

Browse files
committed
修复一些内存泄露问题
1 parent f3481e7 commit 8b66a89

File tree

15 files changed

+158
-40
lines changed

15 files changed

+158
-40
lines changed

run/src/main/java/org/wowtools/hppt/common/server/ServerSession.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,15 @@ private void startSendThread() {
5252
}
5353
bytes = lifecycle.beforeSendToTarget(this, bytes);
5454
if (bytes != null) {
55-
BytesUtil.writeToChannel(channel, bytes);
55+
Throwable e = BytesUtil.writeToChannel(channel, bytes);
56+
if (null != e) {
57+
throw e;
58+
}
5659
lifecycle.afterSendToTarget(this, bytes);
5760
}
5861
} catch (Throwable e) {
5962
log.warn("SendThread err", e);
63+
close();
6064
}
6165
}
6266
log.info("{} sendThread stop", this);

run/src/main/java/org/wowtools/hppt/common/server/ServerSessionManager.java

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
import org.wowtools.hppt.common.util.Constant;
1010
import org.wowtools.hppt.common.util.NettyObjectBuilder;
1111

12+
import java.net.InetSocketAddress;
1213
import java.util.HashSet;
1314
import java.util.Map;
15+
import java.util.concurrent.CompletableFuture;
1416
import java.util.concurrent.ConcurrentHashMap;
17+
import java.util.concurrent.TimeUnit;
1518
import java.util.concurrent.atomic.AtomicInteger;
1619

1720
/**
@@ -32,7 +35,7 @@ public class ServerSessionManager implements AutoCloseable {
3235
//<clientId,Map<sessionId,session>>
3336
private final Map<String, Map<Integer, ServerSession>> clientIdServerSessionMap = new ConcurrentHashMap<>();
3437

35-
private final Bootstrap bootstrap = new Bootstrap();
38+
private final Bootstrap bootstrap = new Bootstrap();//TODO 这里改到每个session里,减少eventloop数,实现阻塞等待,以此避免接收目标端数据过快
3639

3740
private final ServerSessionLifecycle lifecycle;
3841
private final long sessionTimeout;
@@ -90,16 +93,77 @@ public void close() {
9093
running = false;
9194
}
9295

93-
public ServerSession createServerSession(LoginClientService.Client client, String host, int port) {
96+
//新建一个session并返回sessionId
97+
public int createServerSession(LoginClientService.Client client, String host, int port, long timeoutMillis) {
9498
int sessionId = sessionIdBuilder.addAndGet(1);
95-
log.info("new ServerSession {} {}:{} from {}", sessionId, host, port, client.clientId);
96-
Channel channel = bootstrap.connect(host, port).channel();
99+
Map<Integer, ServerSession> clientSessions = clientIdServerSessionMap.computeIfAbsent(client.clientId, (id) -> new ConcurrentHashMap<>());
100+
101+
class ChannelRes {
102+
Throwable cause;
103+
Channel channel;
104+
}
105+
CompletableFuture<ChannelRes> resFuture = new CompletableFuture<>();
106+
107+
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
108+
// 添加超时处理
109+
future.addListener((ChannelFutureListener) f -> {
110+
ChannelRes res = new ChannelRes();
111+
try {
112+
if (!f.isSuccess()) {
113+
try {
114+
future.cancel(true); // 取消未完成的连接尝试
115+
} catch (Exception e) {
116+
log.warn("future.channel, sessionId {}", sessionId, e);
117+
}
118+
res.cause = f.cause();
119+
} else {
120+
// 设置连接超时
121+
boolean isConnected = future.awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS);
122+
if (!isConnected) {
123+
try {
124+
future.cancel(true); // 取消未完成的连接尝试
125+
} catch (Exception e) {
126+
log.warn("future.channel, sessionId {}", sessionId, e);
127+
}
128+
} else {
129+
res.channel = future.channel();
130+
log.info("new ServerSession {} {}:{} from {}", sessionId, host, port, client.clientId);
131+
}
132+
}
133+
} catch (Exception e) {
134+
res.cause = e;
135+
} finally {
136+
resFuture.complete(res);
137+
}
138+
139+
});
140+
141+
ChannelRes res;
142+
try {
143+
res = resFuture.get(timeoutMillis, TimeUnit.MINUTES);
144+
} catch (Exception e) {
145+
log.warn("获取ChannelRes异常 sessionId {}", sessionId, e);
146+
return sessionId;
147+
}
148+
if (null == res) {
149+
log.warn("获取channel超时 sessionId {}", sessionId);
150+
return sessionId;
151+
}
152+
if (null != res.cause) {
153+
log.warn("获取channel异常 sessionId {}", sessionId, res.cause);
154+
return sessionId;
155+
}
156+
157+
Channel channel = res.channel;
158+
if (null == channel) {
159+
log.warn("获取channel为空 sessionId {}", sessionId);
160+
return sessionId;
161+
}
97162
ServerSession serverSession = new ServerSession(sessionTimeout, sessionId, client, lifecycle, channel);
98163
channelServerSessionMap.put(channel, serverSession);
99164
serverSessionMap.put(sessionId, serverSession);
100-
Map<Integer, ServerSession> clientSessions = clientIdServerSessionMap.computeIfAbsent(client.clientId, (id) -> new ConcurrentHashMap<>());
101165
clientSessions.put(sessionId, serverSession);
102-
return serverSession;
166+
return sessionId;
103167
}
104168

105169
public void disposeServerSession(ServerSession serverSession, String type) {
@@ -154,7 +218,13 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
154218
@Override
155219
public void channelRead(ChannelHandlerContext ctx, Object msg) {
156220
ByteBuf buf = (ByteBuf) msg;
157-
byte[] bytes = BytesUtil.byteBuf2bytes(buf);
221+
byte[] bytes = null;
222+
try {
223+
bytes = BytesUtil.byteBuf2bytes(buf);
224+
}finally {
225+
//channelRead方法需要手动释放ByteBuf
226+
buf.release();
227+
}
158228
ServerSession session = getServeSession(ctx);
159229
if (null != session) {
160230
session.activeSession();

run/src/main/java/org/wowtools/hppt/common/server/ServerTalker.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import lombok.extern.slf4j.Slf4j;
55
import org.wowtools.hppt.common.pojo.SessionBytes;
66
import org.wowtools.hppt.common.protobuf.ProtoMessage;
7-
import org.wowtools.hppt.common.util.BytesUtil;
87
import org.wowtools.hppt.common.util.CommonConfig;
98
import org.wowtools.hppt.common.util.Constant;
109

@@ -22,7 +21,7 @@ public class ServerTalker {
2221

2322
//接收客户端发来的字节并做相应处理
2423
public static void receiveClientBytes(CommonConfig config, ServerSessionManager serverSessionManager,
25-
LoginClientService.Client client, byte[] bytes) throws Exception {
24+
LoginClientService.Client client, byte[] bytes, long timeoutMillis) throws Exception {
2625
if (null == bytes || bytes.length == 0) {
2726
return;
2827
}
@@ -36,7 +35,7 @@ public static void receiveClientBytes(CommonConfig config, ServerSessionManager
3635
/* 发消息 */
3736
//发命令
3837
for (String command : inputMessage.getCommandListList()) {
39-
receiveClientCommand(command, serverSessionManager, serverSessionMap, client);
38+
receiveClientCommand(command, serverSessionManager, serverSessionMap, client, timeoutMillis);
4039
}
4140
//发bytes
4241
for (ProtoMessage.BytesPb bytesPb : inputMessage.getBytesPbListList()) {
@@ -51,23 +50,35 @@ public static void receiveClientBytes(CommonConfig config, ServerSessionManager
5150
}
5251

5352
private static void receiveClientCommand(String command,
54-
ServerSessionManager serverSessionManager, Map<Integer, ServerSession> serverSessionMap, LoginClientService.Client client) {
53+
ServerSessionManager serverSessionManager, Map<Integer, ServerSession> serverSessionMap, LoginClientService.Client client, long timeoutMillis) {
5554
log.debug("收到客户端命令 {} ", command);
5655
char type = command.charAt(0);
5756
switch (type) {
5857
case Constant.SsCommands.CreateSession -> {
5958
String[] params = command.substring(1).split(Constant.sessionIdJoinFlag);
60-
ServerSession session = serverSessionManager.createServerSession(client, params[0], Integer.parseInt(params[1]));
61-
client.addCommand(String.valueOf(Constant.ScCommands.InitSession) + session.getSessionId() + Constant.sessionIdJoinFlag + params[2]);
59+
int sessionId = serverSessionManager.createServerSession(client, params[0], Integer.parseInt(params[1]), timeoutMillis);
60+
client.addCommand(String.valueOf(Constant.ScCommands.InitSession) + sessionId + Constant.sessionIdJoinFlag + params[2]);
61+
if (null == serverSessionManager.getServerSessionBySessionId(sessionId)) {
62+
//获取sessionId为空,说明刚才serverSessionManager.createServerSession失败了,所以接着发一条关闭命令给客户端
63+
client.addCommand(String.valueOf(Constant.ScCommands.CloseSession) + sessionId);
64+
}
6265
}
6366
case Constant.SsCommands.CloseSession -> {
67+
if (null == serverSessionMap) {
68+
log.info("CloseSession, serverSessionMap尚未建立,忽略命令 {}", command);
69+
return;
70+
}
6471
int sessionId = Integer.parseInt(command.substring(1));
6572
ServerSession serverSession = serverSessionMap.get(sessionId);
6673
if (null != serverSession) {
6774
serverSessionManager.disposeServerSession(serverSession, "客户端发送关闭命令");
6875
}
6976
}
7077
case Constant.SsCommands.ActiveSession -> {
78+
if (null == serverSessionMap) {
79+
log.info("ActiveSession, serverSessionMap尚未建立,忽略命令 {}", command);
80+
return;
81+
}
7182
int sessionId = Integer.parseInt(command.substring(1));
7283
ServerSession serverSession = serverSessionMap.get(sessionId);
7384
if (null != serverSession) {

run/src/main/java/org/wowtools/hppt/common/util/BytesUtil.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,20 @@ public static ByteBuf bytes2byteBuf(Channel ctx, byte[] bytes) {
135135
return byteBuf;
136136
}
137137

138-
private static boolean afterWrite(ChannelFuture future, Object msg) {
138+
private static Throwable afterWrite(ChannelFuture future, Object msg) {
139139
boolean completed = future.awaitUninterruptibly(10, TimeUnit.SECONDS); // 同步等待完成
140140
if (completed) {
141141
if (future.isSuccess()) {
142-
return true;
142+
return null;
143143
}
144144
}
145-
log.warn("写入消息未成功!!! timeout? {}", !completed, future.cause());
145+
Throwable cause = future.cause();
146+
if (null == cause) {
147+
cause = new RuntimeException("写入消息未成功");
148+
}
149+
log.warn("写入消息未成功!!! timeout? {}", !completed, cause);
146150
ReferenceCountUtil.safeRelease(msg);
147-
return false;
151+
return cause;
148152
}
149153

150154
private static void waitChannelWritable(Channel channel) {
@@ -161,24 +165,24 @@ private static void waitChannelWritable(Channel channel) {
161165
}
162166
}
163167

164-
//把字节写入ChannelHandlerContext
165-
public static boolean writeToChannelHandlerContext(ChannelHandlerContext ctx, byte[] bytes) {
168+
//把字节写入ChannelHandlerContext 如果有异常则返回异常
169+
public static Throwable writeToChannelHandlerContext(ChannelHandlerContext ctx, byte[] bytes) {
166170
waitChannelWritable(ctx.channel());
167171
ByteBuf byteBuf = bytes2byteBuf(ctx, bytes);
168172
ChannelFuture future = ctx.writeAndFlush(byteBuf);
169173
return afterWrite(future, byteBuf);
170174
}
171175

172-
//把字节写入Channel
173-
public static boolean writeToChannel(Channel channel, byte[] bytes) {
176+
//把字节写入Channel 如果有异常则返回异常
177+
public static Throwable writeToChannel(Channel channel, byte[] bytes) {
174178
waitChannelWritable(channel);
175179
ByteBuf byteBuf = bytes2byteBuf(channel, bytes);
176180
ChannelFuture future = channel.writeAndFlush(byteBuf);
177181
return afterWrite(future, byteBuf);
178182
}
179183

180-
//把对象写入Channel
181-
public static boolean writeObjToChannel(Channel channel, Object obj) {
184+
//把对象写入Channel 如果有异常则返回异常
185+
public static Throwable writeObjToChannel(Channel channel, Object obj) {
182186
waitChannelWritable(channel);
183187
ChannelFuture future = channel.writeAndFlush(obj);
184188
return afterWrite(future, obj);

run/src/main/java/org/wowtools/hppt/run/Run.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,6 @@ public static void main(String[] args) throws Exception {
3030
} catch (Exception e) {
3131
System.out.println("未加载到根目录下logback.xml文件,使用默认配置 " + e.getMessage());
3232
}
33-
try {
34-
if ("1".equals(ResourcesReader.readStr(Run.class, "/debug.txt").trim())) {
35-
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
36-
System.out.println("开启调试");
37-
}
38-
} catch (Exception e) {
39-
}
4033
String type = args[0];
4134
switch (type) {
4235
case "ss":

run/src/main/java/org/wowtools/hppt/run/sc/RunSc.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package org.wowtools.hppt.run.sc;
22

3+
import io.netty.util.ResourceLeakDetector;
34
import lombok.extern.slf4j.Slf4j;
45
import org.wowtools.hppt.common.util.Constant;
56
import org.wowtools.hppt.common.util.ResourcesReader;
7+
import org.wowtools.hppt.run.Run;
68
import org.wowtools.hppt.run.sc.common.ClientSessionService;
79
import org.wowtools.hppt.run.sc.file.FileClientSessionService;
810
import org.wowtools.hppt.run.sc.hppt.HpptClientSessionService;
@@ -22,6 +24,13 @@
2224
public class RunSc {
2325

2426
public static void main(String[] args) {
27+
try {
28+
if ("1".equals(ResourcesReader.readStr(Run.class, "/debug.txt").trim())) {
29+
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
30+
System.out.println("开启调试");
31+
}
32+
} catch (Exception e) {
33+
}
2534
String configPath;
2635
if (args.length <= 1) {
2736
configPath = "sc.yml";

run/src/main/java/org/wowtools/hppt/run/sc/hppt/HpptClientSessionService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ protected void initChannel(SocketChannel ch) {
6464

6565
@Override
6666
public void sendBytesToServer(byte[] bytes) {
67-
if (!BytesUtil.writeToChannelHandlerContext(_ctx, bytes)) {
67+
Throwable e = BytesUtil.writeToChannelHandlerContext(_ctx, bytes);
68+
if (null != e) {
69+
log.warn("sendBytesToServer err", e);
6870
exit();
6971
}
7072
}
@@ -77,7 +79,7 @@ public MessageHandler(Cb cb) {
7779
}
7880

7981
@Override
80-
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception{
82+
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
8183
// 处理接收到的消息
8284
byte[] bytes = BytesUtil.byteBuf2bytes(msg);
8385
receiveServerBytes(bytes);

run/src/main/java/org/wowtools/hppt/run/sc/rhppt/RHpptClientSessionService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
114114

115115
@Override
116116
public void sendBytesToServer(byte[] bytes) {
117-
if (!BytesUtil.writeToChannelHandlerContext(_ctx, bytes)) {
117+
Throwable e = BytesUtil.writeToChannelHandlerContext(_ctx, bytes);
118+
if (null!=e) {
119+
log.warn("sendBytesToServer err", e);
118120
exit();
119121
}
120122
}

run/src/main/java/org/wowtools/hppt/run/sc/websocket/WebSocketClientSessionService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public void connectToServer(ScConfig config, Cb cb) throws Exception {
3737

3838
private void newWsConn(ScConfig config, Cb cb) throws Exception {
3939
doClose();
40-
final URI webSocketURL = new URI(config.websocket.serverUrl+"/s");//随便加一个后缀防止被nginx转发时识别不到
40+
final URI webSocketURL = new URI(config.websocket.serverUrl + "/s");//随便加一个后缀防止被nginx转发时识别不到
4141
group = NettyObjectBuilder.buildEventLoopGroup(config.websocket.workerGroupNum);
4242
Bootstrap boot = new Bootstrap();
4343
boot.option(ChannelOption.SO_KEEPALIVE, true)
@@ -85,7 +85,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
8585
} catch (InterruptedException e) {
8686
throw new RuntimeException(e);
8787
}
88-
if (!BytesUtil.writeObjToChannel(ctx.channel(), new PingWebSocketFrame())) {
88+
Throwable e = BytesUtil.writeObjToChannel(ctx.channel(), new PingWebSocketFrame());
89+
if (null != e) {
90+
log.warn("ping err", e);
8991
exit();
9092
}
9193
}
@@ -106,7 +108,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
106108
@Override
107109
public void sendBytesToServer(byte[] bytes) {
108110
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(BytesUtil.bytes2byteBuf(wsChannel, bytes));
109-
if (!BytesUtil.writeObjToChannel(wsChannel, frame)) {
111+
Throwable e = BytesUtil.writeObjToChannel(wsChannel, frame);
112+
if (null != e) {
113+
log.warn("sendBytesToServer err", e);
110114
exit();
111115
}
112116
}

run/src/main/java/org/wowtools/hppt/run/ss/RunSs.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package org.wowtools.hppt.run.ss;
22

3+
import io.netty.util.ResourceLeakDetector;
34
import lombok.extern.slf4j.Slf4j;
45
import org.wowtools.hppt.common.util.Constant;
56
import org.wowtools.hppt.common.util.ResourcesReader;
7+
import org.wowtools.hppt.run.Run;
68
import org.wowtools.hppt.run.ss.common.ServerSessionService;
79
import org.wowtools.hppt.run.ss.file.FileServerSessionService;
810
import org.wowtools.hppt.run.ss.hppt.HpptServerSessionService;
@@ -20,6 +22,14 @@
2022
public class RunSs {
2123

2224
public static void main(String[] args) throws Exception {
25+
try {
26+
if ("1".equals(ResourcesReader.readStr(Run.class, "/debug.txt").trim())) {
27+
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
28+
System.out.println("开启调试");
29+
}
30+
} catch (Exception e) {
31+
}
32+
2333
String configPath;
2434
if (args.length <= 1) {
2535
configPath = "ss.yml";

0 commit comments

Comments
 (0)