diff --git a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java index e522b4d243c..48c8244643a 100644 --- a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java +++ b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java @@ -54,6 +54,10 @@ public class OkexStreamingService extends JsonNettyStreamingService { private final ExchangeSpecification xSpec; + private volatile boolean loginDone = false; + + private volatile boolean needToResubscribeChannels = false; + public OkexStreamingService(String apiUrl, ExchangeSpecification exchangeSpecification) { super(apiUrl); this.xSpec = exchangeSpecification; @@ -61,6 +65,7 @@ public OkexStreamingService(String apiUrl, ExchangeSpecification exchangeSpecifi @Override public Completable connect() { + loginDone = xSpec.getApiKey() == null; Completable conn = super.connect(); return conn.andThen( (CompletableSource) @@ -82,6 +87,14 @@ public Completable connect() { }); } + @Override + public void resubscribeChannels() { + needToResubscribeChannels = true; + if (loginDone) { + super.resubscribeChannels(); + } + } + public void login() throws JsonProcessingException { Mac mac; try { @@ -123,6 +136,18 @@ public void messageHandler(String message) { LOG.error("Error parsing incoming message to JSON: {}", message); return; } + // Retry after a successful login + if (jsonNode.has("event")) { + String event = jsonNode.get("event").asText(); + if ("login".equals(event)) { + loginDone = true; + if (needToResubscribeChannels) { + this.resubscribeChannels(); + needToResubscribeChannels = false; + } + return; + } + } if (processArrayMessageSeparately() && jsonNode.isArray()) { // In case of array - handle every message separately. @@ -227,4 +252,4 @@ public void pingPongDisconnectIfConnected() { pingPongSubscription.dispose(); } } -} +} \ No newline at end of file