Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
public class HeartbeatMessage implements MessageTypeAware, Serializable {
private static final long serialVersionUID = -985316399527884899L;
private boolean ping = true;
private long sequenceNumber;
private ConnectionPoolInfo connectionPoolInfo;
/**
* The constant PING.
*/
Expand Down Expand Up @@ -55,4 +57,20 @@ public boolean isPing() {
public void setPing(boolean ping) {
this.ping = ping;
}

public long getSequenceNumber() {
return sequenceNumber;
}

public void setSequenceNumber(long sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}

public ConnectionPoolInfo getConnectionPoolInfo() {
return connectionPoolInfo;
}

public void setConnectionPoolInfo(ConnectionPoolInfo connectionPoolInfo) {
this.connectionPoolInfo = connectionPoolInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,16 @@ private String getThreadPrefix() {
*/
protected abstract long getRpcRequestTimeout();

/**
* Create heartbeat message for sending to server.
* Subclasses can override this to provide enhanced heartbeat messages.
*
* @return heartbeat message (HeartbeatMessage or EnhancedHeartbeatMessage)
*/
protected Object createHeartbeatMessage() {
return HeartbeatMessage.PING;
}

/**
* Registers a channel event listener to receive channel events.
* If the listener is already registered, it will not be added again.
Expand Down Expand Up @@ -725,7 +735,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("will send ping msg,channel {}", ctx.channel());
}
AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);
Object heartbeatMessage = createHeartbeatMessage();
AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), heartbeatMessage);
} catch (Throwable throwable) {
LOGGER.error("send request error: {}", throwable.getMessage(), throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.netty.channel.Channel;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.core.protocol.ConnectionPoolInfo;
import org.apache.seata.core.protocol.MessageType;
import org.apache.seata.core.rpc.ShutdownHook;
import org.apache.seata.core.rpc.TransactionMessageHandler;
Expand All @@ -29,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,7 +45,7 @@ public class NettyRemotingServer extends AbstractNettyRemotingServer {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyRemotingServer.class);

private TransactionMessageHandler transactionMessageHandler;

private ServerHeartbeatProcessor heartbeatMessageProcessor;
private final AtomicBoolean initialized = new AtomicBoolean(false);

private final ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(
Expand Down Expand Up @@ -125,7 +127,7 @@ private void registerProcessor() {
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

Expand All @@ -134,4 +136,12 @@ public void destroy() {
super.destroy();
branchResultMessageExecutor.shutdown();
}

public Map<String, ConnectionPoolInfo> getAllClientPoolInfo() {
return heartbeatMessageProcessor.getAllPoolInfo();
}

public ConnectionPoolInfo getClientPoolInfo(String clientAddress) {
return heartbeatMessageProcessor.getClientPoolInfo(clientAddress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.model.Resource;
import org.apache.seata.core.model.ResourceManager;
import org.apache.seata.core.protocol.AbstractMessage;
import org.apache.seata.core.protocol.MessageType;
import org.apache.seata.core.protocol.RegisterRMRequest;
import org.apache.seata.core.protocol.RegisterRMResponse;
import org.apache.seata.core.protocol.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is best not to use the import *.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestions. I will fix that.

import org.apache.seata.core.rpc.netty.NettyPoolKey.TransactionRole;
import org.apache.seata.core.rpc.processor.client.ClientHeartbeatProcessor;
import org.apache.seata.core.rpc.processor.client.ClientOnResponseProcessor;
Expand All @@ -48,6 +45,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import static org.apache.seata.common.Constants.DBKEYS_SPLIT_CHAR;
Expand All @@ -67,6 +65,12 @@ public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
private String applicationId;
private String transactionServiceGroup;

// connection pool monitoring
private static final long POOL_INFO_REPORT_INTERVAL = 30000; // 30 seconds
private ConnectionPoolMonitor poolMonitor;
private final AtomicLong heartbeatSequence = new AtomicLong(0);
private volatile long lastPoolInfoReportTime = 0L;

@Override
public void init() {
// registry processor
Expand Down Expand Up @@ -321,6 +325,7 @@ public void destroy() {
initialized.getAndSet(false);
instance = null;
transactionServiceGroup = null;
poolMonitor = null;
}

@Override
Expand Down Expand Up @@ -351,6 +356,21 @@ public long getRpcRequestTimeout() {
return NettyClientConfig.getRpcRmRequestTimeout();
}

@Override
public Object createHeartbeatMessage() {
HeartbeatMessage heartbeat = HeartbeatMessage.PING;
if (shouldReportInfo()) {
heartbeat.setConnectionPoolInfo(poolMonitor.collectPoolInfo());
heartbeat.setSequenceNumber(heartbeatSequence.incrementAndGet());
lastPoolInfoReportTime = System.currentTimeMillis();
}
return heartbeat;
}

private boolean shouldReportInfo() {
return (System.currentTimeMillis() - lastPoolInfoReportTime) >= POOL_INFO_REPORT_INTERVAL;
}

private void registerProcessor() {
// 1.registry rm client handle branch commit processor
RmBranchCommitProcessor rmBranchCommitProcessor =
Expand All @@ -373,6 +393,7 @@ private void registerProcessor() {
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
// 5.registry heartbeat message processor
poolMonitor = new ConnectionPoolMonitor(applicationId, transactionServiceGroup);
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package org.apache.seata.core.rpc.processor.server;

import io.netty.channel.ChannelHandlerContext;
import org.apache.seata.core.protocol.ConnectionPoolInfo;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.rpc.RemotingServer;
import org.apache.seata.core.rpc.processor.RemotingProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* process client heartbeat message request(PING).
* <p>
Expand All @@ -37,13 +40,26 @@ public class ServerHeartbeatProcessor implements RemotingProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerHeartbeatProcessor.class);

private RemotingServer remotingServer;
private final ConnectionPoolInfoCache poolInfoCache;

public ServerHeartbeatProcessor(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
this.poolInfoCache = new ConnectionPoolInfoCache();
}

@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
handleStandardHeartbeat(ctx, rpcMessage);
LOGGER.debug("received PING {}", rpcMessage.getBody());
HeartbeatMessage heartbeatMessage = (HeartbeatMessage) rpcMessage.getBody();
ConnectionPoolInfo poolInfo = heartbeatMessage.getConnectionPoolInfo();
LOGGER.debug("received PING from {}", heartbeatMessage);
String clientAddress = ctx.channel().remoteAddress().toString();
poolInfoCache.updatePoolInfo(clientAddress, poolInfo);
}


private void handleStandardHeartbeat(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
try {
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), HeartbeatMessage.PONG);
} catch (Throwable throwable) {
Expand All @@ -53,4 +69,12 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
LOGGER.debug("received PING from {}", ctx.channel().remoteAddress());
}
}

public Map<String, ConnectionPoolInfo> getAllPoolInfo() {
return poolInfoCache.getAllPoolInfo();
}

public ConnectionPoolInfo getClientPoolInfo(String clientAddress) {
return poolInfoCache.getPoolInfo(clientAddress);
}
}
Loading