From 7dbe2ecc2111ae7ff5502cd0fdb378f6c49586d5 Mon Sep 17 00:00:00 2001 From: lvxiao Date: Mon, 25 Nov 2024 10:54:43 +0800 Subject: [PATCH] **fix(okex): Fix issue with resubscribing to channels after login** - Added `loginDone` and `needToResubscribeChannels` flags - Initialized the `loginDone` flag during connection setup - Implemented the `resubscribeChannels` method to decide whether to resubscribe based on login status - Checked if channel resubscription is needed after a successful login --- .../okex/OkexStreamingService.java | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java index e522b4d243c..48c8244643a 100644 --- a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java +++ b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java @@ -54,6 +54,10 @@ public class OkexStreamingService extends JsonNettyStreamingService { private final ExchangeSpecification xSpec; + private volatile boolean loginDone = false; + + private volatile boolean needToResubscribeChannels = false; + public OkexStreamingService(String apiUrl, ExchangeSpecification exchangeSpecification) { super(apiUrl); this.xSpec = exchangeSpecification; @@ -61,6 +65,7 @@ public OkexStreamingService(String apiUrl, ExchangeSpecification exchangeSpecifi @Override public Completable connect() { + loginDone = xSpec.getApiKey() == null; Completable conn = super.connect(); return conn.andThen( (CompletableSource) @@ -82,6 +87,14 @@ public Completable connect() { }); } + @Override + public void resubscribeChannels() { + needToResubscribeChannels = true; + if (loginDone) { + super.resubscribeChannels(); + } + } + public void login() throws JsonProcessingException { Mac mac; try { @@ -123,6 +136,18 @@ public void messageHandler(String message) { LOG.error("Error parsing incoming message to JSON: {}", message); return; } + // Retry after a successful login + if (jsonNode.has("event")) { + String event = jsonNode.get("event").asText(); + if ("login".equals(event)) { + loginDone = true; + if (needToResubscribeChannels) { + this.resubscribeChannels(); + needToResubscribeChannels = false; + } + return; + } + } if (processArrayMessageSeparately() && jsonNode.isArray()) { // In case of array - handle every message separately. @@ -227,4 +252,4 @@ public void pingPongDisconnectIfConnected() { pingPongSubscription.dispose(); } } -} +} \ No newline at end of file