Skip to content

Only retrieve one batch of messages at a time when responding to a large ResendRequest #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 66 additions & 50 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ public class Session implements Closeable {

protected static final Logger LOG = LoggerFactory.getLogger(Session.class);

private final int MAX_RESEND_BATCH_RETRIEVAL_SIZE = 1000;

Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID,
DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule, LogFactory logFactory,
MessageFactory messageFactory, int heartbeatInterval) {
Expand Down Expand Up @@ -2321,71 +2323,85 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre
private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo)
throws IOException, InvalidMessage, FieldNotFound {

final ArrayList<String> messages = new ArrayList<>();
try {
state.get(beginSeqNo, endSeqNo, messages);
} catch (final IOException e) {
if (forceResendWhenCorruptedStore) {
LOG.error("Cannot read messages from stores, resend HeartBeats", e);
for (int i = beginSeqNo; i < endSeqNo; i++) {
final Message heartbeat = messageFactory.create(sessionID.getBeginString(),
MsgType.HEARTBEAT);
initializeHeader(heartbeat.getHeader());
heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i);
messages.add(heartbeat.toString());
}
} else {
throw e;
}
}

int msgSeqNum = 0;
int begin = 0;
int current = beginSeqNo;
boolean appMessageJustSent = false;

for (final String message : messages) {
appMessageJustSent = false;
final Message msg;
int curBatchStartSeqNo = beginSeqNo;
while (curBatchStartSeqNo <= endSeqNo) {
int endCurBatchSeqNo = endSeqNo;
if (curBatchStartSeqNo + MAX_RESEND_BATCH_RETRIEVAL_SIZE < endSeqNo) {
endCurBatchSeqNo = curBatchStartSeqNo + MAX_RESEND_BATCH_RETRIEVAL_SIZE;
}
final ArrayList<String> messages = new ArrayList<>();
try {
// QFJ-626
msg = parseMessage(message);
msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);
} catch (final Exception e) {
getLog().onErrorEvent(
"Error handling ResendRequest: failed to parse message (" + e.getMessage()
+ "): " + message);
// Note: a SequenceReset message will be generated to fill the gap
continue;
state.get(curBatchStartSeqNo, endCurBatchSeqNo, messages);
} catch (final IOException e) {
if (forceResendWhenCorruptedStore) {
LOG.error("Cannot read messages from stores, resend HeartBeats", e);
for (int i = beginSeqNo; i < endSeqNo; i++) {
final Message heartbeat = messageFactory.create(sessionID.getBeginString(),
MsgType.HEARTBEAT);
initializeHeader(heartbeat.getHeader());
heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i);
messages.add(heartbeat.toString());
}
} else {
throw e;
}
}
for (final String message : messages) {
appMessageJustSent = false;
final Message msg;
try {
// QFJ-626
msg = parseMessage(message);
if (msg.getException() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philipwhiuk I am not sure about this. This will not resend messages which were formerly (without this PR) sent. I mean we could discuss if it is sensible but I feel that this should be a separate change. WDYT?

getLog().onErrorEvent(
"Error handling ResendRequest: failed to parse message (" + msg.getException().getMessage()
+ "): " + message);
// Note: a SequenceReset message will be generated to fill the gap
continue;
}
msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);
} catch (final Exception e) {
getLog().onErrorEvent(
"Error handling ResendRequest: failed to parse message (" + e.getMessage()
+ "): " + message);
// Note: a SequenceReset message will be generated to fill the gap
continue;
}

if ((current != msgSeqNum) && begin == 0) {
begin = current;
}
if ((current != msgSeqNum) && begin == 0) {
begin = current;
}

final String msgType = msg.getHeader().getString(MsgType.FIELD);
final String msgType = msg.getHeader().getString(MsgType.FIELD);

if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {
if (begin == 0) {
begin = msgSeqNum;
}
} else {
initializeResendFields(msg);
if (resendApproved(msg)) {
if (begin != 0) {
generateSequenceReset(receivedMessage, begin, msgSeqNum);
}
getLog().onEvent("Resending message: " + msgSeqNum);
send(msg.toString());
begin = 0;
appMessageJustSent = true;
} else {
if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {
if (begin == 0) {
begin = msgSeqNum;
}
} else {
initializeResendFields(msg);
if (resendApproved(msg)) {
if (begin != 0) {
generateSequenceReset(receivedMessage, begin, msgSeqNum);
}
getLog().onEvent("Resending message: " + msgSeqNum);
send(msg.toString());
begin = 0;
appMessageJustSent = true;
} else {
if (begin == 0) {
begin = msgSeqNum;
}
}
}
current = msgSeqNum + 1;
}
current = msgSeqNum + 1;
curBatchStartSeqNo = endCurBatchSeqNo+1;
}

int newBegin = beginSeqNo;
Expand Down
98 changes: 98 additions & 0 deletions quickfixj-core/src/test/java/quickfix/SessionResendMessagesTest
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import quickfix.*;

import java.io.IOException;
import java.util.ArrayList;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

class SessionResendMessagesTest {

private Session session;
private Application mockApplication;
private MessageStore mockStore;
private LogFactory mockLogFactory;
private Log mockLog;
private static final int HEARTBEAT_INTERVAL = 30;

@BeforeEach
void setUp() throws Exception {
mockApplication = mock(Application.class);
mockStore = mock(MessageStore.class);
MessageStoreFactory mockStoreFactory = mock(MessageStoreFactory.class);
when(mockStoreFactory.create(any(SessionID.class))).thenReturn(mockStore);
DataDictionaryProvider mockDataDictProvider = mock(DataDictionaryProvider.class);
mockLogFactory = mock(LogFactory.class);
mockLog = mock(Log.class);
when(mockLogFactory.create(any(SessionID.class))).thenReturn(mockLog);

session = new Session(
mockApplication,
mockStoreFactory,
new SessionID("FIX.4.4", "SENDER", "TARGET"),
mockDataDictProvider,
mock(SessionSchedule.class),
mockLogFactory,
mock(MessageFactory.class),
HEARTBEAT_INTERVAL
);
}

@Test
void testResendMessagesBatchedRetrieval() throws Exception {
// Prepare 1200 messages (expect two batches: 1000, then 200)
int beginSeqNo = 1;
int endSeqNo = 1200;
ArrayList<String> batch1 = new ArrayList<>();
ArrayList<String> batch2 = new ArrayList<>();
for (int i = beginSeqNo; i <= endSeqNo; i++) {
String msgStr = "8=FIX.4.4\u00019=12\u000135=0\u000134=" + i + "\u0001" +
"49=SENDER\u000156=TARGET\u000110=000\u0001";
if (i <= 1000) batch1.add(msgStr);
else batch2.add(msgStr);
}

doAnswer(invocation -> {
int start = invocation.getArgument(0);
int end = invocation.getArgument(1);
ArrayList<String> msgs = invocation.getArgument(2);
if (start == 1 && end == 1001) msgs.addAll(batch1);
else if (start == 1001 && end == 1200) msgs.addAll(batch2);
else fail("Unexpected batch: " + start + " to " + end);
return null;
}).when(mockStore).get(anyInt(), anyInt(), anyList());

Message receivedMessage = new Message();
session.resendMessages(receivedMessage, beginSeqNo, endSeqNo);

// Verify batches requested
verify(mockStore).get(1, 1001, new ArrayList<>());
verify(mockStore).get(1001, 1200, new ArrayList<>());
verifyNoMoreInteractions(mockStore);
}

@Test
void testResendMessagesSkipsCorruptedMessage() throws Exception {
int beginSeqNo = 1, endSeqNo = 2;
ArrayList<String> messages = new ArrayList<>();
// First message is valid, second is corrupted
messages.add("8=FIX.4.4\u00019=12\u000135=0\u000134=1\u000149=SENDER\u000156=TARGET\u000110=000\u0001");
messages.add("corrupted message");

doAnswer(invocation -> {
ArrayList<String> msgs = invocation.getArgument(2);
msgs.addAll(messages);
return null;
}).when(mockStore).get(anyInt(), anyInt(), anyList());

Message receivedMessage = new Message();
session.resendMessages(receivedMessage, beginSeqNo, endSeqNo);

// Should log an error for the corrupted message and process the first one
verify(mockLog, atLeastOnce()).onErrorEvent(contains("Error handling ResendRequest"));
}
}