diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeServiceRaw.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeServiceRaw.java index b972dafa8b..09ffd276fd 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeServiceRaw.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeServiceRaw.java @@ -54,50 +54,6 @@ BybitResult> getBybitOrder( return order; } -// BybitResult placeMarketOrder( -// BybitCategory category, String symbol, BybitSide side, BigDecimal qty, String orderLinkId) -// throws IOException { -// BybitPlaceOrderPayload payload = -// new BybitPlaceOrderPayload(category, symbol, side, MARKET, qty, orderLinkId); -// BybitResult placeOrder = -// decorateApiCall( -// () -> bybitAuthenticated.placeMarketOrder(apiKey, signatureCreator, nonceFactory, -// payload)) -// .withRateLimiter(getCreateOrderRateLimiter(category)) -// .withRateLimiter(rateLimiter(GLOBAL_RATE_LIMITER)) -// .call(); -// if (!placeOrder.isSuccess()) { -// throw createBybitExceptionFromResult(placeOrder); -// } -// return placeOrder; -// } - -//BybitResult placeLimitOrder( -// BybitCategory category, -// String symbol, -// BybitSide side, -// BigDecimal qty, -// BigDecimal limitPrice, -// String orderLinkId, -// boolean reduceOnly) -// throws IOException { -// BybitPlaceOrderPayload payload = -// new BybitPlaceOrderPayload( -// category, symbol, side, BybitOrderType.LIMIT, qty, orderLinkId, limitPrice); -// payload.setReduceOnly(String.valueOf(reduceOnly)); -// BybitResult placeOrder = -// decorateApiCall( -// () -> bybitAuthenticated.placeLimitOrder(apiKey, signatureCreator, nonceFactory, -// payload)) -// .withRateLimiter(getCreateOrderRateLimiter(category)) -// .withRateLimiter(rateLimiter(GLOBAL_RATE_LIMITER)) -// .call(); -// if (!placeOrder.isSuccess()) { -// throw createBybitExceptionFromResult(placeOrder); -// } -// return placeOrder; -// } - BybitResult amendOrder( BybitCategory category, String symbol, diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java index c9fe13147e..e1956902bc 100644 --- a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java +++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java @@ -2,6 +2,7 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingExchange; +import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; import io.reactivex.rxjava3.core.Completable; import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; @@ -95,4 +96,13 @@ public BybitStreamingTradeService getStreamingTradeService() { return streamingTradeService; } + /** + * Enables the user to listen on channel inactive events and react appropriately. + * + * @param channelInactiveHandler a WebSocketMessageHandler instance. + */ + public void setChannelInactiveHandler( + WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) { + streamingService.setChannelInactiveHandler(channelInactiveHandler); + } } diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java index be7ece0918..4125cafbe8 100644 --- a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java +++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java @@ -8,6 +8,7 @@ import dto.BybitSubscribeMessage; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler; +import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.CompletableSource; @@ -41,6 +42,7 @@ public class BybitStreamingService extends JsonNettyStreamingService { private final Observable pingPongSrc = Observable.interval(15, 20, TimeUnit.SECONDS); private Disposable pingPongSubscription; private final ExchangeSpecification spec; + private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; @Getter private boolean isAuthorized = false; public BybitStreamingService(String apiUrl, ExchangeSpecification spec) { @@ -158,4 +160,9 @@ public void pingPongDisconnectIfConnected() { protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() { return WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler.INSTANCE; } + + public void setChannelInactiveHandler( + WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) { + this.channelInactiveHandler = channelInactiveHandler; + } }