diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 15264f0e503..60d81776741 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -777,8 +777,14 @@ private SendResult sendDefaultImpl( callTimeout = true; break; } - - sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); + long curTimeout = timeout - costTime; + // In order to prevent the broker from being unresponsive for a long time and thus being unable to retry next time, + // if there is another chance for retry next time, the maximum sending time is modified to the maximum sendMsgMaxTimeoutPerRequest. + if (defaultMQProducer.getSendMsgMaxTimeoutPerRequest() > -1 && times + 1 < timesTotal + && curTimeout > defaultMQProducer.getSendMsgMaxTimeoutPerRequest()) { + curTimeout = defaultMQProducer.getSendMsgMaxTimeoutPerRequest(); + } + sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, curTimeout); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true); switch (communicationMode) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index e3f81ad9685..11edcaa4410 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -115,6 +115,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ private int sendMsgTimeout = 3000; + /** + * Max timeout for sending messages per request. + */ + private int sendMsgMaxTimeoutPerRequest = -1; + /** * Compress message body threshold, namely, message body larger than 4k will be compressed on default. */ @@ -1259,6 +1264,14 @@ public void setSendMsgTimeout(int sendMsgTimeout) { this.sendMsgTimeout = sendMsgTimeout; } + public int getSendMsgMaxTimeoutPerRequest() { + return sendMsgMaxTimeoutPerRequest; + } + + public void setSendMsgMaxTimeoutPerRequest(int sendMsgMaxTimeoutPerRequest) { + this.sendMsgMaxTimeoutPerRequest = sendMsgMaxTimeoutPerRequest; + } + public int getCompressMsgBodyOverHowmuch() { return compressMsgBodyOverHowmuch; }