Skip to content

Commit

Permalink
Merge pull request #833 from the-thing/proxy_testing
Browse files Browse the repository at this point in the history
Mina proxy handshake fix and proxy integration tests

(cherry picked from commit 3427f08)
  • Loading branch information
chrjohn committed Jan 7, 2025
1 parent 37add8a commit 1041d15
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 9 deletions.
17 changes: 16 additions & 1 deletion quickfixj-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-example</artifactId>
<version>4.1.111.Final</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
Expand Down Expand Up @@ -403,6 +408,15 @@
</configuration>
</plugin>
</plugins>

<extensions>
<!-- required by Netty cross-platform build -->
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.0.Final</version>
</extension>
</extensions>
</build>

<reporting>
Expand Down Expand Up @@ -438,3 +452,4 @@
</profile>
</profiles>
</project>

35 changes: 35 additions & 0 deletions quickfixj-core/src/main/java/quickfix/mina/CustomSslFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package quickfix.mina;

import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.ssl.SslFilter;

import javax.net.ssl.SSLContext;

/**
* Temporary {@link SslFilter} wrapper that prevents auto connect for initiators.
*/
public class CustomSslFilter extends SslFilter {

private static final boolean DEFAULT_AUTO_START = true;

private final boolean autoStart;

public CustomSslFilter(SSLContext sslContext) {
this(sslContext, DEFAULT_AUTO_START);
}

public CustomSslFilter(SSLContext sslContext, boolean autoStart) {
super(sslContext);
this.autoStart = autoStart;
}

@Override
public void onPostAdd(IoFilterChain parent, String name, NextFilter next) throws Exception {
IoSession session = parent.getSession();

if (session.isConnected() && autoStart) {
onConnected(next, session);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,21 @@ public class ProtocolFactory {

public final static int SOCKET = 0;
public final static int VM_PIPE = 1;
public final static int PROXY = 2;

public static String getTypeString(int type) {
switch (type) {
case SOCKET:
return "SOCKET";
case VM_PIPE:
return "VM_PIPE";
case PROXY:
return "PROXY";
default:
return "unknown";
}
}

public static SocketAddress createSocketAddress(int transportType, String host,
int port) throws ConfigError {
if (transportType == SOCKET || transportType == PROXY) {
if (transportType == SOCKET) {
return host != null ? new InetSocketAddress(host, port) : new InetSocketAddress(port);
} else if (transportType == VM_PIPE) {
return new VmPipeAddress(port);
Expand All @@ -94,8 +91,6 @@ public static int getTransportType(String string) {
return SOCKET;
} else if (string.equalsIgnoreCase("VM_PIPE")) {
return VM_PIPE;
} else if (string.equalsIgnoreCase("PROXY")) {
return PROXY;
} else {
throw new RuntimeError("Unknown Transport Type type: " + string);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import quickfix.SessionID;
import quickfix.SessionSettings;
import quickfix.mina.CompositeIoFilterChainBuilder;
import quickfix.mina.CustomSslFilter;
import quickfix.mina.EventHandlingStrategy;
import quickfix.mina.NetworkingOptions;
import quickfix.mina.ProtocolFactory;
Expand Down Expand Up @@ -132,7 +133,7 @@ private void installSSL(AcceptorSocketDescriptor descriptor,
log.info("Installing SSL filter for {}", descriptor.getAddress());
SSLConfig sslConfig = descriptor.getSslConfig();
SSLContext sslContext = SSLContextFactory.getInstance(sslConfig);
SslFilter sslFilter = new SslFilter(sslContext);
SslFilter sslFilter = new CustomSslFilter(sslContext);
sslFilter.setNeedClientAuth(sslConfig.isNeedClientAuth());
sslFilter.setEnabledCipherSuites(sslConfig.getEnabledCipherSuites() != null ? sslConfig.getEnabledCipherSuites()
: SSLSupport.getDefaultCipherSuites(sslContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import quickfix.SessionID;
import quickfix.SystemTime;
import quickfix.mina.CompositeIoFilterChainBuilder;
import quickfix.mina.CustomSslFilter;
import quickfix.mina.EventHandlingStrategy;
import quickfix.mina.NetworkingOptions;
import quickfix.mina.ProtocolFactory;
Expand Down Expand Up @@ -186,7 +187,7 @@ private void setupIoConnector() throws ConfigError, GeneralSecurityException {
private SslFilter installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBuilder)
throws GeneralSecurityException {
final SSLContext sslContext = SSLContextFactory.getInstance(sslConfig);
final SslFilter sslFilter = new SslFilter(sslContext);
final SslFilter sslFilter = new CustomSslFilter(sslContext, false);
sslFilter.setEnabledCipherSuites(sslConfig.getEnabledCipherSuites() != null ? sslConfig.getEnabledCipherSuites()
: SSLSupport.getDefaultCipherSuites(sslContext));
sslFilter.setEnabledProtocols(sslConfig.getEnabledProtocols() != null ? sslConfig.getEnabledProtocols()
Expand Down
74 changes: 74 additions & 0 deletions quickfixj-core/src/test/java/quickfix/mina/SocksProxyServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package quickfix.mina;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.socksproxy.SocksServerInitializer;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.mina.util.DaemonThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadFactory;

/**
* Simple SOCKS proxy server based on Netty examples. Only SOCKS protocols are currently supported.
* The implementation performs the proxy handshake, but it doesn't perform any user authentication.
*/
public class SocksProxyServer {

private static final Logger LOGGER = LoggerFactory.getLogger(SocksProxyServer.class);
private static final ThreadFactory THREAD_FACTORY = new DaemonThreadFactory();

private final ServerBootstrap bootstrap;
private final int port;
private Channel channel;

public SocksProxyServer(int port) {
this.bootstrap = new ServerBootstrap();
this.bootstrap.group(new NioEventLoopGroup(THREAD_FACTORY), new NioEventLoopGroup(THREAD_FACTORY))
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new SocksServerInitializer());
this.port = port;
}

public synchronized void start() {
if (channel != null) {
throw new IllegalStateException("SOCKS proxy server is running already");
}

try {
channel = bootstrap.bind(port)
.sync()
.channel();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}

LOGGER.info("SOCKS proxy server started at port: {}", port);
}

public synchronized void stop() {
if (channel == null) {
throw new IllegalStateException("SOCKS proxy server is not running");
}

try {
channel.close().sync();
channel = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to close SOCKS proxy server");
}

LOGGER.info("SOCKS proxy server stopped at port {}", port);
}

public int getPort() {
return port;
}
}
177 changes: 177 additions & 0 deletions quickfixj-core/src/test/java/quickfix/mina/SocksProxyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package quickfix.mina;

import org.apache.mina.util.AvailablePortFinder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import quickfix.Acceptor;
import quickfix.ApplicationAdapter;
import quickfix.ConfigError;
import quickfix.DefaultMessageFactory;
import quickfix.FixVersions;
import quickfix.Initiator;
import quickfix.MemoryStoreFactory;
import quickfix.MessageFactory;
import quickfix.MessageStoreFactory;
import quickfix.Session;
import quickfix.SessionFactory;
import quickfix.SessionID;
import quickfix.SessionSettings;
import quickfix.ThreadedSocketAcceptor;
import quickfix.ThreadedSocketInitiator;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;

/**
* Performs end to end tests against SOCKS proxy server.
*/
public class SocksProxyTest {

// maximum time to wait for session logon
private static final long TIMEOUT_SECONDS = 5;

private SocksProxyServer proxyServer;

@Before
public void setUp() {
int proxyPort = AvailablePortFinder.getNextAvailable();

proxyServer = new SocksProxyServer(proxyPort);
proxyServer.start();
}

@After
public void tearDown() {
proxyServer.stop();
}

@Test
public void shouldLoginViaSocks4Proxy() throws ConfigError {
shouldLoginSocksProxy("4");
}

@Test
public void shouldLoginViaSocks4aProxy() throws ConfigError {
shouldLoginSocksProxy("4a");
}

@Test
public void shouldLoginViaSocks5Proxy() throws ConfigError {
shouldLoginSocksProxy("5");
}

private void shouldLoginSocksProxy(String proxyVersion) throws ConfigError {
int port = AvailablePortFinder.getNextAvailable();
SessionConnector acceptor = createAcceptor(port);

try {
acceptor.start();

SessionConnector initiator = createInitiator(proxyVersion, proxyServer.getPort(), port);

try {
initiator.start();
assertLoggedOn(acceptor, new SessionID(FixVersions.BEGINSTRING_FIX44, "ALICE", "BOB"));
assertLoggedOn(initiator, new SessionID(FixVersions.BEGINSTRING_FIX44, "BOB", "ALICE"));
} finally {
initiator.stop();
}
} finally {
acceptor.stop();
}
}

private void assertLoggedOn(SessionConnector connector, SessionID sessionID) {
long startTimeNanos = System.nanoTime();

while (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTimeNanos) < TIMEOUT_SECONDS) {
if (isLoggedOn(connector, sessionID)) {
return;
}

try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
}

throw new AssertionError("Session " + sessionID + " is not logged on");
}

private boolean isLoggedOn(SessionConnector connector, SessionID sessionID) {
Session session = connector.getSessionMap().get(sessionID);

if (session == null) {
return false;
}

return session.isLoggedOn();
}

private SessionConnector createAcceptor(int port) throws ConfigError {
MessageStoreFactory messageStoreFactory = new MemoryStoreFactory();
MessageFactory messageFactory = new DefaultMessageFactory();
SessionSettings acceptorSettings = createAcceptorSettings("ALICE", "BOB", port);
return new ThreadedSocketAcceptor(new ApplicationAdapter(), messageStoreFactory, acceptorSettings, messageFactory);
}

private SessionConnector createInitiator(String proxyVersion, int proxyPort, int port) throws ConfigError {
MessageStoreFactory messageStoreFactory = new MemoryStoreFactory();
MessageFactory messageFactory = new DefaultMessageFactory();
SessionSettings initiatorSettings = createInitiatorSettings("BOB", "ALICE", proxyVersion, proxyPort, port);
return new ThreadedSocketInitiator(new ApplicationAdapter(), messageStoreFactory, initiatorSettings, messageFactory);
}

private SessionSettings createAcceptorSettings(String senderId, String targetId, int port) {
HashMap<Object, Object> defaults = new HashMap<>();
defaults.put(SessionFactory.SETTING_CONNECTION_TYPE, "acceptor");
defaults.put(Acceptor.SETTING_SOCKET_ACCEPT_PORT, Integer.toString(port));
defaults.put(Session.SETTING_START_TIME, "00:00:00");
defaults.put(Session.SETTING_END_TIME, "00:00:00");
defaults.put(Session.SETTING_HEARTBTINT, "30");

SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, senderId, targetId);

SessionSettings sessionSettings = new SessionSettings();
sessionSettings.set(defaults);
sessionSettings.setString(sessionID, "BeginString", FixVersions.BEGINSTRING_FIX44);
sessionSettings.setString(sessionID, "DataDictionary", "FIX44.xml");
sessionSettings.setString(sessionID, "SenderCompID", senderId);
sessionSettings.setString(sessionID, "TargetCompID", targetId);

return sessionSettings;
}

private SessionSettings createInitiatorSettings(String senderId, String targetId, String proxyVersion, int proxyPort, int port) {
HashMap<Object, Object> defaults = new HashMap<>();
defaults.put(SessionFactory.SETTING_CONNECTION_TYPE, "initiator");
defaults.put(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, ProtocolFactory.getTypeString(ProtocolFactory.SOCKET));
defaults.put(Initiator.SETTING_SOCKET_CONNECT_HOST, "localhost");
defaults.put(Initiator.SETTING_SOCKET_CONNECT_PORT, Integer.toString(port));
defaults.put(Initiator.SETTING_RECONNECT_INTERVAL, "2");
defaults.put(Initiator.SETTING_PROXY_HOST, "localhost");
defaults.put(Initiator.SETTING_PROXY_PORT, Integer.toString(proxyPort));
defaults.put(Initiator.SETTING_PROXY_TYPE, "socks");
defaults.put(Initiator.SETTING_PROXY_VERSION, proxyVersion);
defaults.put(Initiator.SETTING_PROXY_USER, "proxy-user");
defaults.put(Initiator.SETTING_PROXY_PASSWORD, "proxy-password");
defaults.put(Session.SETTING_START_TIME, "00:00:00");
defaults.put(Session.SETTING_END_TIME, "00:00:00");
defaults.put(Session.SETTING_HEARTBTINT, "30");

SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, senderId, targetId);

SessionSettings sessionSettings = new SessionSettings();
sessionSettings.set(defaults);
sessionSettings.setString(sessionID, "BeginString", FixVersions.BEGINSTRING_FIX44);
sessionSettings.setString(sessionID, "DataDictionary", "FIX44.xml");
sessionSettings.setString(sessionID, "SenderCompID", senderId);
sessionSettings.setString(sessionID, "TargetCompID", targetId);

return sessionSettings;
}

}
Loading

0 comments on commit 1041d15

Please sign in to comment.