Skip to content

Commit

Permalink
Merge pull request #4968 from lvxiao1/fix-okex-resubscribing-login-error
Browse files Browse the repository at this point in the history
[okex-stream] all login-required channels become invalid after a network disconnection and reconnection
  • Loading branch information
timmolter authored Nov 25, 2024
2 parents b22c017 + 7dbe2ec commit c20907c
Showing 1 changed file with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ public class OkexStreamingService extends JsonNettyStreamingService {

private final ExchangeSpecification xSpec;

private volatile boolean loginDone = false;

private volatile boolean needToResubscribeChannels = false;

public OkexStreamingService(String apiUrl, ExchangeSpecification exchangeSpecification) {
super(apiUrl);
this.xSpec = exchangeSpecification;
}

@Override
public Completable connect() {
loginDone = xSpec.getApiKey() == null;
Completable conn = super.connect();
return conn.andThen(
(CompletableSource)
Expand All @@ -82,6 +87,14 @@ public Completable connect() {
});
}

@Override
public void resubscribeChannels() {
needToResubscribeChannels = true;
if (loginDone) {
super.resubscribeChannels();
}
}

public void login() throws JsonProcessingException {
Mac mac;
try {
Expand Down Expand Up @@ -123,6 +136,18 @@ public void messageHandler(String message) {
LOG.error("Error parsing incoming message to JSON: {}", message);
return;
}
// Retry after a successful login
if (jsonNode.has("event")) {
String event = jsonNode.get("event").asText();
if ("login".equals(event)) {
loginDone = true;
if (needToResubscribeChannels) {
this.resubscribeChannels();
needToResubscribeChannels = false;
}
return;
}
}

if (processArrayMessageSeparately() && jsonNode.isArray()) {
// In case of array - handle every message separately.
Expand Down Expand Up @@ -227,4 +252,4 @@ public void pingPongDisconnectIfConnected() {
pingPongSubscription.dispose();
}
}
}
}

0 comments on commit c20907c

Please sign in to comment.