From 5bcbdee63ec7764096da0e0463fa577b5ebb0d22 Mon Sep 17 00:00:00 2001 From: m_aadhil Date: Sat, 25 Mar 2023 01:27:31 +0530 Subject: [PATCH] https://github.com/quickfix-j/quickfixj/issues/250 Enhancement. --- .../src/main/java/quickfix/Initiator.java | 8 +++++++- .../initiator/AbstractSocketInitiator.java | 6 +++++- .../mina/initiator/IoSessionInitiator.java | 19 +++++++++++++++---- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/Initiator.java b/quickfixj-core/src/main/java/quickfix/Initiator.java index a0e742e876..6b5acf4f8b 100644 --- a/quickfixj-core/src/main/java/quickfix/Initiator.java +++ b/quickfixj-core/src/main/java/quickfix/Initiator.java @@ -122,5 +122,11 @@ public interface Initiator extends Connector { * AbstractSocketInitiator.createDynamicSession is called */ String SETTING_DYNAMIC_SESSION = "DynamicSession"; - + /** + * Initiator setting for reconnect attempts. Only valid when + * session connection type is "initiator". + * + * @see quickfix.SessionFactory#SETTING_CONNECTION_TYPE + */ + String SETTING_RECONNECT_ATTEMPT = "ReconnectAttempt"; } diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index 84f9e142e9..22af3a8be0 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -147,6 +147,7 @@ private void createInitiator(final Session session, final boolean continueInitOn String proxyDomain = null; int proxyPort = -1; + int retryCount = 1; if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) { proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE); @@ -170,6 +171,9 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST); proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT); } + if (getSettings().isSetting(sessionID, Initiator.SETTING_RECONNECT_ATTEMPT)){ + retryCount = settings.getInt(sessionID, Initiator.SETTING_RECONNECT_ATTEMPT); + } ScheduledExecutorService scheduledExecutorService = (scheduledReconnectExecutor != null ? scheduledReconnectExecutor : getScheduledExecutorService()); try { @@ -177,7 +181,7 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { socketAddresses, localAddress, reconnectingIntervals, scheduledExecutorService, settings, networkingOptions, getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig, - proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation); + proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, retryCount); initiators.add(ioSessionInitiator); } catch (ConfigError e) { diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java index e6c97eeeb8..ba68acfb25 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java @@ -69,7 +69,7 @@ public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses, EventHandlingStrategy eventHandlingStrategy, IoFilterChainBuilder userIoFilterChainBuilder, boolean sslEnabled, SSLConfig sslConfig, String proxyType, String proxyVersion, String proxyHost, int proxyPort, - String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation) throws ConfigError { + String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation, int retryCount) throws ConfigError { this.executor = executor; final long[] reconnectIntervalInMillis = new long[reconnectIntervalInSeconds.length]; for (int ii = 0; ii != reconnectIntervalInSeconds.length; ++ii) { @@ -79,7 +79,7 @@ public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses, reconnectTask = new ConnectTask(sslEnabled, socketAddresses, localAddress, userIoFilterChainBuilder, fixSession, reconnectIntervalInMillis, sessionSettings, networkingOptions, eventHandlingStrategy, sslConfig, - proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, log); + proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation, log, retryCount); } catch (GeneralSecurityException e) { throw new ConfigError(e); } @@ -106,6 +106,9 @@ private static class ConnectTask implements Runnable { private long lastConnectTime; private int nextSocketAddressIndex; private int connectionFailureCount; + private int retryCount = 1; + private int retryAttempt = 0; + private boolean isFirstTime = true; private ConnectFuture connectFuture; private final String proxyType; @@ -123,7 +126,7 @@ public ConnectTask(boolean sslEnabled, SocketAddress[] socketAddresses, SessionSettings sessionSettings, NetworkingOptions networkingOptions, EventHandlingStrategy eventHandlingStrategy, SSLConfig sslConfig, String proxyType, String proxyVersion, String proxyHost, int proxyPort, String proxyUser, String proxyPassword, String proxyDomain, - String proxyWorkstation, Logger log) throws ConfigError, GeneralSecurityException { + String proxyWorkstation, Logger log, int retryCount) throws ConfigError, GeneralSecurityException { this.sslEnabled = sslEnabled; this.socketAddresses = socketAddresses; this.localAddress = localAddress; @@ -144,6 +147,7 @@ public ConnectTask(boolean sslEnabled, SocketAddress[] socketAddresses, this.proxyPassword = proxyPassword; this.proxyDomain = proxyDomain; this.proxyWorkstation = proxyWorkstation; + this.retryCount = retryCount; setupIoConnector(); } @@ -221,7 +225,14 @@ public void run() { private void connect() { try { lastReconnectAttemptTime = SystemTime.currentTimeMillis(); - SocketAddress nextSocketAddress = getNextSocketAddress(); + SocketAddress nextSocketAddress = socketAddresses[getCurrentSocketAddressIndex()]; + if (retryCount == 1 || retryAttempt == retryCount || isFirstTime){ + nextSocketAddress = getNextSocketAddress(); + retryAttempt = 1; + isFirstTime = false; + } else { + ++retryAttempt; + } if (localAddress == null) { connectFuture = ioConnector.connect(nextSocketAddress); } else {