Skip to content

Commit e9a954b

Browse files
Copilotchrjohn
andcommitted
Fix session-level message resend issue when ForceResendWhenCorruptedStore is enabled
- Modified Session.resendMessages() to remove dependency on forceResendWhenCorruptedStore flag for admin message detection - Added special handling for Reject messages (MsgType=3) - only Reject messages among admin messages are now resent - Other session-level messages (Logon, Logout, Heartbeat, etc.) are properly skipped with SequenceReset GapFill - Handle corrupted store by adding null placeholders instead of fake Heartbeat messages - Added comprehensive test to verify the fix - Added setter for forceResendWhenCorruptedStore in SessionFactoryTestSupport.Builder Co-authored-by: chrjohn <6644028+chrjohn@users.noreply.github.com>
1 parent bcd2e1f commit e9a954b

File tree

3 files changed

+186
-10
lines changed

3 files changed

+186
-10
lines changed

quickfixj-core/src/main/java/quickfix/Session.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2352,13 +2352,10 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
23522352
state.get(beginSeqNo, endSeqNo, messages);
23532353
} catch (final IOException e) {
23542354
if (forceResendWhenCorruptedStore) {
2355-
LOG.error("Cannot read messages from stores, resend HeartBeats", e);
2355+
LOG.error("Cannot read messages from stores, will gap fill over missing messages", e);
2356+
// Add null placeholders for corrupted messages instead of fake Heartbeats
23562357
for (int i = beginSeqNo; i < endSeqNo; i++) {
2357-
final Message heartbeat = messageFactory.create(sessionID.getBeginString(),
2358-
MsgType.HEARTBEAT);
2359-
initializeHeader(heartbeat.getHeader());
2360-
heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i);
2361-
messages.add(heartbeat.toString());
2358+
messages.add(null);
23622359
}
23632360
} else {
23642361
throw e;
@@ -2375,6 +2372,15 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
23752372
final Message msg;
23762373
try {
23772374
// QFJ-626
2375+
// Handle null placeholders from corrupted store
2376+
if (message == null) {
2377+
// Mark for gap fill
2378+
if (begin == 0) {
2379+
begin = current;
2380+
}
2381+
current++;
2382+
continue;
2383+
}
23782384
msg = parseMessage(message);
23792385
msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);
23802386
} catch (final Exception e) {
@@ -2391,11 +2397,32 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
23912397

23922398
final String msgType = msg.getHeader().getString(MsgType.FIELD);
23932399

2394-
if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {
2395-
if (begin == 0) {
2396-
begin = msgSeqNum;
2400+
// Check if message is an admin message
2401+
// According to FIX spec, only Reject messages should be resent among admin messages
2402+
if (MessageUtils.isAdminMessage(msgType)) {
2403+
if (MsgType.REJECT.equals(msgType)) {
2404+
// Reject messages should be resent, but don't call toApp() for admin messages
2405+
initializeResendFields(msg);
2406+
if (begin != 0) {
2407+
generateSequenceReset(receivedMessage, begin, msgSeqNum);
2408+
}
2409+
getLog().onEvent("Resending Reject message: " + msgSeqNum);
2410+
boolean sent = send(msg.toString());
2411+
if (!sent) {
2412+
// Abort resend operation immediately - don't send any more messages
2413+
getLog().onWarnEvent("Resending messages aborted.");
2414+
return;
2415+
}
2416+
begin = 0;
2417+
appMessageJustSent = true;
2418+
} else {
2419+
// Other admin messages should NOT be resent, mark for gap fill
2420+
if (begin == 0) {
2421+
begin = msgSeqNum;
2422+
}
23972423
}
23982424
} else {
2425+
// Application message - resend normally
23992426
initializeResendFields(msg);
24002427
if (resendApproved(msg)) {
24012428
if (begin != 0) {

quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public static final class Builder {
106106
private final boolean rejectInvalidMessage = true;
107107
private final boolean rejectMessageOnUnhandledException = false;
108108
private final boolean requiresOrigSendingTime = true;
109-
private final boolean forceResendWhenCorruptedStore = false;
109+
private boolean forceResendWhenCorruptedStore = false;
110110
private final Set<InetAddress> allowedRemoteAddresses = null;
111111
private final boolean validateIncomingMessage = true;
112112
private final int resendRequestChunkSize = 0;
@@ -245,5 +245,10 @@ public Builder setEnableNextExpectedMsgSeqNum(final boolean enableNextExpectedMs
245245
this.enableNextExpectedMsgSeqNum = enableNextExpectedMsgSeqNum;
246246
return this;
247247
}
248+
249+
public Builder setForceResendWhenCorruptedStore(final boolean forceResendWhenCorruptedStore) {
250+
this.forceResendWhenCorruptedStore = forceResendWhenCorruptedStore;
251+
return this;
252+
}
248253
}
249254
}

quickfixj-core/src/test/java/quickfix/SessionTest.java

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3239,4 +3239,148 @@ public void testResendAbortsWhenSendReturnsFalse() throws Exception {
32393239
assertEquals("Only 2 messages should succeed", 2, responder.sentMessages.size());
32403240
}
32413241
}
3242+
3243+
// Test for issue #597: Session-level messages should not be resent when ForceResendWhenCorruptedStore is enabled
3244+
@Test
3245+
public void testResendDoesNotResendSessionLevelMessagesExceptReject() throws Exception {
3246+
final UnitTestApplication application = new UnitTestApplication();
3247+
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
3248+
3249+
// Create a session with ForceResendWhenCorruptedStore enabled
3250+
final Session session = new SessionFactoryTestSupport.Builder()
3251+
.setSessionId(sessionID)
3252+
.setApplication(application)
3253+
.setIsInitiator(false)
3254+
.setPersistMessages(true)
3255+
.setForceResendWhenCorruptedStore(true)
3256+
.build();
3257+
3258+
// Use a capturing responder to track all sent messages
3259+
final List<String> sentMessages = new ArrayList<>();
3260+
final Responder capturingResponder = new Responder() {
3261+
@Override
3262+
public boolean send(String data) {
3263+
sentMessages.add(data);
3264+
return true;
3265+
}
3266+
3267+
@Override
3268+
public String getRemoteAddress() {
3269+
return null;
3270+
}
3271+
3272+
@Override
3273+
public void disconnect() {
3274+
}
3275+
};
3276+
session.setResponder(capturingResponder);
3277+
3278+
try {
3279+
// Logon to establish session
3280+
logonTo(session);
3281+
assertTrue(session.isLoggedOn());
3282+
3283+
// Clear lists after logon
3284+
application.clear();
3285+
sentMessages.clear();
3286+
3287+
// Get the message store
3288+
final MessageStore messageStore = session.getStore();
3289+
3290+
// Store some messages in the message store:
3291+
// Seq 2: Heartbeat (session-level, should NOT be resent)
3292+
String heartbeatMsg = "8=FIX.4.4\0019=60\00135=0\00134=2\00149=SENDER\00156=TARGET\001" +
3293+
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + "\00110=000\001";
3294+
messageStore.set(2, heartbeatMsg);
3295+
3296+
// Seq 3: Application message (should be resent)
3297+
String appMsg1 = "8=FIX.4.4\0019=100\00135=D\00134=3\00149=SENDER\00156=TARGET\001" +
3298+
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) +
3299+
"\00155=EUR/USD\00154=1\00138=1000000\00140=1\00110=000\001";
3300+
messageStore.set(3, appMsg1);
3301+
3302+
// Seq 4: Logout (session-level, should NOT be resent)
3303+
String logoutMsg = "8=FIX.4.4\0019=60\00135=5\00134=4\00149=SENDER\00156=TARGET\001" +
3304+
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + "\00110=000\001";
3305+
messageStore.set(4, logoutMsg);
3306+
3307+
// Seq 5: Reject (session-level, SHOULD be resent)
3308+
String rejectMsg = "8=FIX.4.4\0019=80\00135=3\00134=5\00149=SENDER\00156=TARGET\001" +
3309+
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) +
3310+
"\00145=100\00158=Invalid message\00110=000\001";
3311+
messageStore.set(5, rejectMsg);
3312+
3313+
// Seq 6: Application message (should be resent)
3314+
String appMsg2 = "8=FIX.4.4\0019=100\00135=D\00134=6\00149=SENDER\00156=TARGET\001" +
3315+
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) +
3316+
"\00155=GBP/USD\00154=2\00138=2000000\00140=1\00110=000\001";
3317+
messageStore.set(6, appMsg2);
3318+
3319+
// Update the next sender sequence number to 7
3320+
messageStore.setNextSenderMsgSeqNum(7);
3321+
3322+
// Receive a ResendRequest for messages 2-6
3323+
final ResendRequest resendRequest = new ResendRequest();
3324+
resendRequest.getHeader().setString(SenderCompID.FIELD, "TARGET");
3325+
resendRequest.getHeader().setString(TargetCompID.FIELD, "SENDER");
3326+
resendRequest.getHeader().setInt(MsgSeqNum.FIELD, 2);
3327+
resendRequest.getHeader().setUtcTimeStamp(SendingTime.FIELD, LocalDateTime.now(ZoneOffset.UTC));
3328+
resendRequest.setInt(BeginSeqNo.FIELD, 2);
3329+
resendRequest.setInt(EndSeqNo.FIELD, 6);
3330+
resendRequest.toString(); // calculate length/checksum
3331+
3332+
session.next(resendRequest);
3333+
3334+
// Verify expectations:
3335+
// 1. toApp() should only be called for application messages (seq 3 and 6)
3336+
assertEquals("toApp should be called twice for app messages", 2, application.toAppMessages.size());
3337+
3338+
// 2. Parse all sent messages and verify what was sent
3339+
boolean rejectWasResent = false;
3340+
boolean heartbeatWasResent = false;
3341+
boolean logoutWasResent = false;
3342+
int sequenceResetCount = 0;
3343+
int appMessageCount = 0;
3344+
3345+
for (String sentMsg : sentMessages) {
3346+
try {
3347+
Message msg = new Message(sentMsg);
3348+
String msgType = msg.getHeader().getString(MsgType.FIELD);
3349+
int seqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);
3350+
3351+
if (msgType.equals(MsgType.SEQUENCE_RESET)) {
3352+
sequenceResetCount++;
3353+
} else if (msgType.equals(MsgType.REJECT) && seqNum == 5) {
3354+
rejectWasResent = true;
3355+
// Verify Reject has PossDupFlag
3356+
assertTrue("Reject should have PossDupFlag set",
3357+
msg.getHeader().isSetField(PossDupFlag.FIELD) &&
3358+
msg.getHeader().getBoolean(PossDupFlag.FIELD));
3359+
} else if (msgType.equals(MsgType.HEARTBEAT) && seqNum == 2) {
3360+
heartbeatWasResent = true;
3361+
} else if (msgType.equals(MsgType.LOGOUT) && seqNum == 4) {
3362+
logoutWasResent = true;
3363+
} else if (msgType.equals("D")) { // NewOrderSingle
3364+
appMessageCount++;
3365+
// Verify app messages have PossDupFlag
3366+
assertTrue("Application messages should have PossDupFlag set",
3367+
msg.getHeader().isSetField(PossDupFlag.FIELD) &&
3368+
msg.getHeader().getBoolean(PossDupFlag.FIELD));
3369+
}
3370+
} catch (Exception e) {
3371+
// Skip unparseable messages
3372+
}
3373+
}
3374+
3375+
// Verify results
3376+
assertTrue("Reject message should have been resent", rejectWasResent);
3377+
assertFalse("Heartbeat should NOT have been resent", heartbeatWasResent);
3378+
assertFalse("Logout should NOT have been resent", logoutWasResent);
3379+
assertEquals("Two application messages should have been resent", 2, appMessageCount);
3380+
assertTrue("At least one SequenceReset should be sent for skipped admin messages", sequenceResetCount >= 1);
3381+
3382+
} finally {
3383+
session.close();
3384+
}
3385+
}
32423386
}

0 commit comments

Comments
 (0)