Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8997] Ensure there is an opportunity to send a retry message when broker no response #9137

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,14 @@ private SendResult sendDefaultImpl(
callTimeout = true;
break;
}

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
long curTimeout = timeout - costTime;
// In order to prevent the broker from being unresponsive for a long time and thus being unable to retry next time,
// if there is another chance for retry next time, the maximum sending time is modified to the maximum sendMsgMaxTimeoutPerRequest.
if (defaultMQProducer.getSendMsgMaxTimeoutPerRequest() > -1 && times + 1 < timesTotal
&& curTimeout > defaultMQProducer.getSendMsgMaxTimeoutPerRequest()) {
curTimeout = defaultMQProducer.getSendMsgMaxTimeoutPerRequest();
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, curTimeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
switch (communicationMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private int sendMsgTimeout = 3000;

/**
* Max timeout for sending messages per request.
*/
private int sendMsgMaxTimeoutPerRequest = -1;

/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
Expand Down Expand Up @@ -1259,6 +1264,14 @@ public void setSendMsgTimeout(int sendMsgTimeout) {
this.sendMsgTimeout = sendMsgTimeout;
}

public int getSendMsgMaxTimeoutPerRequest() {
return sendMsgMaxTimeoutPerRequest;
}

public void setSendMsgMaxTimeoutPerRequest(int sendMsgMaxTimeoutPerRequest) {
this.sendMsgMaxTimeoutPerRequest = sendMsgMaxTimeoutPerRequest;
}

public int getCompressMsgBodyOverHowmuch() {
return compressMsgBodyOverHowmuch;
}
Expand Down
Loading