From fa5bc8c478f615f5a7e3e450f23982fec04bf26a Mon Sep 17 00:00:00 2001 From: rizer1980 <4340180@gmail.com> Date: Wed, 5 Feb 2025 10:32:02 +0300 Subject: [PATCH] [BYBIT] merge auth and non auth modes in one connection Rate Limiters new TimeStampFactory orderbook stream reconnect handling [NETTY STREAMING] add connectionStateModel closed on disconnect [OKEX,BINANCE] add connection state observers --- .../knowm/xchange/bybit/BybitAdapters.java | 13 +- .../xchange/bybit/BybitAuthenticated.java | 10 +- .../knowm/xchange/bybit/BybitExchange.java | 12 + .../xchange/bybit/BybitTimeStampFactory.java | 14 ++ .../dto/trade/BybitAmendOrderPayload.java | 2 +- .../dto/trade/BybitCancelAllOrdersParams.java | 45 ++++ .../dto/trade/BybitCancelOrderParams.java | 27 +++ .../bybit/dto/trade/BybitOpenOrdersParam.java | 23 ++ .../dto/trade/details/BybitOrderDetails.java | 12 +- .../inverse/BybitInverseOrderDetail.java | 118 ++++++++++ .../bybit/service/BybitAccountService.java | 2 +- .../bybit/service/BybitAccountServiceRaw.java | 18 +- .../bybit/service/BybitBaseService.java | 10 +- .../bybit/service/BybitTradeService.java | 95 +++----- .../bybit/service/BybitTradeServiceRaw.java | 44 ++-- .../examples/BybitCancelAllOrdersExample.java | 2 +- .../examples/BybitCancelOrderExample.java | 2 +- .../examples/BybitPlaceOrderExample.java | 15 +- .../examples/BybitRateLimiterTestExample.java | 14 +- .../service/BybitAccountServiceTest.java | 10 - .../service/BybitTradeServiceRawTest.java | 8 +- .../bybit/service/BybitTradeServiceTest.java | 67 ++++-- .../src/test/resources/getOrderError.json5 | 1 + .../src/test/resources/getOrderLinear.json5 | 55 +++++ .../binance/BinanceStreamingExchange.java | 10 + .../bybit/BybitStreamingExchange.java | 143 +++++++++--- .../BybitStreamingMarketDataService.java | 61 +++-- .../bybit/BybitStreamingService.java | 100 ++++---- .../bybit/BybitStreamingTradeService.java | 14 +- .../bybit/BybitUserDataStreamingService.java | 216 ++++++++++++++++++ .../example/BybitStreamOrderBookExample.java | 48 ++-- .../BybitStreamPositionChangeExample.java | 6 +- .../example/BybitStreamTestNetExample.java | 10 +- .../src/test/resources/logback.xml | 4 +- .../okex/OkexStreamingExchange.java | 22 ++ .../service/netty/NettyStreamingService.java | 1 + 36 files changed, 944 insertions(+), 310 deletions(-) create mode 100644 xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitTimeStampFactory.java create mode 100644 xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitCancelAllOrdersParams.java create mode 100644 xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitCancelOrderParams.java create mode 100644 xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitOpenOrdersParam.java create mode 100644 xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/details/inverse/BybitInverseOrderDetail.java create mode 100644 xchange-bybit/src/test/resources/getOrderError.json5 create mode 100644 xchange-bybit/src/test/resources/getOrderLinear.json5 create mode 100644 xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitUserDataStreamingService.java diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitAdapters.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitAdapters.java index 165173677a8..3bed3e7bab4 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitAdapters.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitAdapters.java @@ -3,16 +3,13 @@ import static org.knowm.xchange.bybit.dto.BybitCategory.INVERSE; import static org.knowm.xchange.bybit.dto.BybitCategory.OPTION; import static org.knowm.xchange.bybit.dto.marketdata.instruments.option.BybitOptionInstrumentInfo.OptionType.CALL; -import static org.knowm.xchange.bybit.dto.marketdata.instruments.option.BybitOptionInstrumentInfo.OptionType.PUT; import java.math.BigDecimal; -import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.TemporalAccessor; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -26,7 +23,6 @@ import org.knowm.xchange.bybit.dto.marketdata.instruments.BybitInstrumentInfo; import org.knowm.xchange.bybit.dto.marketdata.instruments.linear.BybitLinearInverseInstrumentInfo; import org.knowm.xchange.bybit.dto.marketdata.instruments.option.BybitOptionInstrumentInfo; -import org.knowm.xchange.bybit.dto.marketdata.instruments.option.BybitOptionInstrumentInfo.OptionType; import org.knowm.xchange.bybit.dto.marketdata.instruments.spot.BybitSpotInstrumentInfo; import org.knowm.xchange.bybit.dto.marketdata.tickers.BybitTicker; import org.knowm.xchange.bybit.dto.marketdata.tickers.linear.BybitLinearInverseTicker; @@ -65,11 +61,12 @@ public class BybitAdapters { public static Wallet adaptBybitBalances(List coinWalletBalances) { List balances = new ArrayList<>(coinWalletBalances.size()); for (BybitCoinWalletBalance bybitCoinBalance : coinWalletBalances) { + BigDecimal availableToWithdraw = bybitCoinBalance.getAvailableToWithdraw().isEmpty() ? BigDecimal.ZERO : new BigDecimal(bybitCoinBalance.getAvailableToWithdraw()); balances.add( new Balance( new Currency(bybitCoinBalance.getCoin()), new BigDecimal(bybitCoinBalance.getEquity()), - new BigDecimal(bybitCoinBalance.getAvailableToWithdraw()))); + availableToWithdraw)); } return Wallet.Builder.from(balances).build(); } @@ -242,19 +239,19 @@ public static InstrumentMetaData symbolToCurrencyPairMetaData( .build(); } - public static Order adaptBybitOrderDetails(BybitOrderDetail bybitOrderResult) { + public static Order adaptBybitOrderDetails(BybitOrderDetail bybitOrderResult, BybitCategory category) { Order.Builder builder; switch (bybitOrderResult.getOrderType()) { case MARKET: builder = new MarketOrder.Builder( - adaptOrderType(bybitOrderResult), guessSymbol(bybitOrderResult.getSymbol())); + adaptOrderType(bybitOrderResult), convertBybitSymbolToInstrument(bybitOrderResult.getSymbol(),category)); break; case LIMIT: builder = new LimitOrder.Builder( - adaptOrderType(bybitOrderResult), guessSymbol(bybitOrderResult.getSymbol())) + adaptOrderType(bybitOrderResult), convertBybitSymbolToInstrument(bybitOrderResult.getSymbol(),category)) .limitPrice(bybitOrderResult.getPrice()); break; default: diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitAuthenticated.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitAuthenticated.java index babcba4bfd9..d539e1ef577 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitAuthenticated.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitAuthenticated.java @@ -13,6 +13,7 @@ import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; import java.io.IOException; +import javax.annotation.Nonnull; import org.knowm.xchange.bybit.dto.BybitResult; import org.knowm.xchange.bybit.dto.account.BybitAccountInfoResponse; import org.knowm.xchange.bybit.dto.account.BybitCancelAllOrdersPayload; @@ -65,12 +66,12 @@ BybitResult getAllCoinsBalance( */ @GET @Path("/account/fee-rate") - BybitResult getFeeRates( + BybitResult getFeeRate( @HeaderParam(X_BAPI_API_KEY) String apiKey, @HeaderParam(X_BAPI_SIGN) ParamsDigest signature, @HeaderParam(X_BAPI_TIMESTAMP) SynchronizedValueFactory timestamp, @QueryParam("category") String category, - @QueryParam("symbol") String symbol) + @Nonnull @QueryParam("symbol") String symbol) throws IOException, BybitException; /** @@ -78,12 +79,13 @@ BybitResult getFeeRates( */ @GET @Path("/order/realtime") - BybitResult> getOpenOrders( + BybitResult> getOrders( @HeaderParam(X_BAPI_API_KEY) String apiKey, @HeaderParam(X_BAPI_SIGN) ParamsDigest signature, @HeaderParam(X_BAPI_TIMESTAMP) SynchronizedValueFactory timestamp, @QueryParam("category") String category, - @QueryParam("orderId") String orderId) + @Nonnull @QueryParam("symbol") String symbol, + @Nonnull @QueryParam("orderId") String orderId) throws IOException, BybitException; /** diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitExchange.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitExchange.java index df6969a9ddf..b16e401a28c 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitExchange.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitExchange.java @@ -1,6 +1,7 @@ package org.knowm.xchange.bybit; import java.io.IOException; +import lombok.Getter; import org.knowm.xchange.BaseExchange; import org.knowm.xchange.Exchange; import org.knowm.xchange.ExchangeSpecification; @@ -15,6 +16,7 @@ import org.knowm.xchange.bybit.service.BybitTradeService; import org.knowm.xchange.client.ResilienceRegistries; import org.knowm.xchange.exceptions.ExchangeException; +import si.mazi.rescu.SynchronizedValueFactory; public class BybitExchange extends BaseExchange implements Exchange{ @@ -28,6 +30,9 @@ public class BybitExchange extends BaseExchange implements Exchange{ private static ResilienceRegistries RESILIENCE_REGISTRIES; + @Getter + protected SynchronizedValueFactory timeStampFactory = new BybitTimeStampFactory(); + @Override protected void initServices() { marketDataService = new BybitMarketDataService(this,getResilienceRegistries()); @@ -135,4 +140,11 @@ public ResilienceRegistries getResilienceRegistries() { } return RESILIENCE_REGISTRIES; } + + @Override + public SynchronizedValueFactory getNonceFactory() { + throw new UnsupportedOperationException( + "Bybit uses timestamp/recv-window rather than a nonce"); + } + } diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitTimeStampFactory.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitTimeStampFactory.java new file mode 100644 index 00000000000..895dddb1783 --- /dev/null +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/BybitTimeStampFactory.java @@ -0,0 +1,14 @@ +package org.knowm.xchange.bybit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import si.mazi.rescu.SynchronizedValueFactory; + +public class BybitTimeStampFactory implements SynchronizedValueFactory { + private static final Logger LOG = LoggerFactory.getLogger(BybitTimeStampFactory.class); + + @Override + public Long createValue() { + return System.currentTimeMillis(); + } +} diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitAmendOrderPayload.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitAmendOrderPayload.java index 0dd551e8b9d..25e8f240a79 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitAmendOrderPayload.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitAmendOrderPayload.java @@ -6,7 +6,7 @@ import org.knowm.xchange.bybit.dto.BybitCategory; @Getter -@JsonInclude(Include.NON_NULL) +@JsonInclude(Include.NON_EMPTY) public class BybitAmendOrderPayload { BybitCategory category; diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitCancelAllOrdersParams.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitCancelAllOrdersParams.java new file mode 100644 index 00000000000..8cd67f83817 --- /dev/null +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitCancelAllOrdersParams.java @@ -0,0 +1,45 @@ +package org.knowm.xchange.bybit.dto.trade; + +import lombok.Getter; +import org.knowm.xchange.bybit.dto.BybitCategory; +import org.knowm.xchange.instrument.Instrument; +import org.knowm.xchange.service.trade.params.CancelAllOrders; + +@Getter +public class BybitCancelAllOrdersParams implements CancelAllOrders { + + private final BybitCategory category; + private final Instrument symbol; + private String baseCoin; + private String settleCoin; + private String orderFilter; + private String stopOrderType; + + public BybitCancelAllOrdersParams(BybitCategory category, Instrument symbol) { + this.category = category; + this.symbol = symbol; + } + + public BybitCancelAllOrdersParams(BybitCategory category, Instrument symbol, String baseCoin, + String settleCoin, String orderFilter, String stopOrderType) { + this.category = category; + this.symbol = symbol; + this.baseCoin = baseCoin; + this.settleCoin = settleCoin; + this.orderFilter = orderFilter; + this.stopOrderType = stopOrderType; + } + + @Override + public String toString() { + return "BybitCancelAllOrdersParams{" + + "category=" + category + + ", symbol=" + symbol + + ", baseCoin='" + baseCoin + '\'' + + ", settleCoin='" + settleCoin + '\'' + + ", orderFilter='" + orderFilter + '\'' + + ", stopOrderType='" + stopOrderType + '\'' + + '}'; + } +} + diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitCancelOrderParams.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitCancelOrderParams.java new file mode 100644 index 00000000000..bfe68fabdc2 --- /dev/null +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitCancelOrderParams.java @@ -0,0 +1,27 @@ +package org.knowm.xchange.bybit.dto.trade; + +import lombok.Getter; +import org.knowm.xchange.instrument.Instrument; +import org.knowm.xchange.service.trade.params.CancelOrderByUserReferenceParams; +import org.knowm.xchange.service.trade.params.DefaultCancelOrderByInstrumentAndIdParams; + +@Getter +public class BybitCancelOrderParams extends DefaultCancelOrderByInstrumentAndIdParams + implements CancelOrderByUserReferenceParams { + + private final String userReference; + + public BybitCancelOrderParams(Instrument instrument, String orderId, String userReference) { + super(instrument, orderId); + this.userReference = userReference; + } + + @Override + public String toString() { + return "BybitCancelOrderParams{" + + "instrument='" + getInstrument() + '\'' + + ", orderId='" + getOrderId() + '\'' + + ", userReference=" + getUserReference() + + '}'; + } +} \ No newline at end of file diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitOpenOrdersParam.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitOpenOrdersParam.java new file mode 100644 index 00000000000..458582aa768 --- /dev/null +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/BybitOpenOrdersParam.java @@ -0,0 +1,23 @@ +package org.knowm.xchange.bybit.dto.trade; + +import lombok.Getter; +import org.knowm.xchange.bybit.dto.BybitCategory; +import org.knowm.xchange.instrument.Instrument; +import org.knowm.xchange.service.trade.params.orders.DefaultOpenOrdersParamInstrument; + +@Getter +public class BybitOpenOrdersParam extends DefaultOpenOrdersParamInstrument { + private final BybitCategory category; + + public BybitOpenOrdersParam(Instrument instrument, BybitCategory category) { + super(instrument); + this.category = category; + } + @Override + public String toString() { + return "BybitOrderQueryParams{" + + "category='" + category + '\'' + + ", instrument='" + getInstrument() + '\'' + + '}'; + } +} diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/details/BybitOrderDetails.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/details/BybitOrderDetails.java index 218be4023b2..1304f6d7013 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/details/BybitOrderDetails.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/details/BybitOrderDetails.java @@ -4,23 +4,27 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import lombok.Data; import lombok.Value; import lombok.extern.jackson.Jacksonized; import org.knowm.xchange.bybit.dto.BybitCategorizedPayload; +import org.knowm.xchange.bybit.dto.trade.details.BybitOrderDetails.BybitInverseOrderDetails; import org.knowm.xchange.bybit.dto.trade.details.BybitOrderDetails.BybitLinearOrderDetails; import org.knowm.xchange.bybit.dto.trade.details.BybitOrderDetails.BybitSpotOrderDetails; +import org.knowm.xchange.bybit.dto.trade.details.inverse.BybitInverseOrderDetail; import org.knowm.xchange.bybit.dto.trade.details.linear.BybitLinearOrderDetail; import org.knowm.xchange.bybit.dto.trade.details.spot.BybitSpotOrderDetail; @Data -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "category", visible = true) +@JsonTypeInfo(use = Id.NAME, property = "category", visible = true,defaultImpl = BybitOrderDetails.class) @JsonSubTypes({ @Type(value = BybitLinearOrderDetails.class, name = "linear"), - @Type(value = BybitOrderDetails.class, name = "inverse"), + @Type(value = BybitInverseOrderDetails.class, name = "inverse"), @Type(value = BybitOrderDetails.class, name = "option"), @Type(value = BybitSpotOrderDetails.class, name = "spot"), }) + public class BybitOrderDetails extends BybitCategorizedPayload { @JsonProperty("nextPageCursor") @@ -33,4 +37,8 @@ public static class BybitLinearOrderDetails extends BybitOrderDetails {} + + @Jacksonized + @Value + public static class BybitInverseOrderDetails extends BybitOrderDetails {} } diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/details/inverse/BybitInverseOrderDetail.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/details/inverse/BybitInverseOrderDetail.java new file mode 100644 index 00000000000..807a8353a91 --- /dev/null +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/dto/trade/details/inverse/BybitInverseOrderDetail.java @@ -0,0 +1,118 @@ +package org.knowm.xchange.bybit.dto.trade.details.inverse; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.math.BigDecimal; +import java.util.Date; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; +import org.knowm.xchange.bybit.dto.trade.BybitOrderType; +import org.knowm.xchange.bybit.dto.trade.details.BybitOrderDetail; + +@SuperBuilder +@Jacksonized +@Value +public class BybitInverseOrderDetail extends BybitOrderDetail { + + @JsonProperty("orderLinkId") + String orderLinkId; + + @JsonProperty("blockTradeId") + String blockTradeId; + + @JsonProperty("isLeverage") + String isLeverage; + + @JsonProperty("positionIdx") + int positionIdx; + + @JsonProperty("cancelType") + String cancelType; + + @JsonProperty("rejectReason") + String rejectReason; + + @JsonProperty("leavesQty") + BigDecimal leavesQty; + + @JsonProperty("leavesValue") + BigDecimal leavesValue; + + @JsonProperty("cumExecValue") + BigDecimal cumExecValue; + + @JsonProperty("cumExecFee") + BigDecimal cumExecFee; + + @JsonProperty("timeInForce") + String timeInForce; + + @JsonProperty("orderType") + BybitOrderType orderType; + + @JsonProperty("stopOrderType") + String stopOrderType; + + @JsonProperty("orderIv") + String orderIv; + + @JsonProperty("triggerPrice") + BigDecimal triggerPrice; + + @JsonProperty("takeProfit") + BigDecimal takeProfit; + + @JsonProperty("stopLoss") + BigDecimal stopLoss; + + @JsonProperty("tpTriggerBy") + String tpTriggerBy; + + @JsonProperty("slTriggerBy") + String slTriggerBy; + + @JsonProperty("triggerDirection") + int triggerDirection; + + @JsonProperty("triggerBy") + String triggerBy; + + @JsonProperty("lastPriceOnCreated") + String lastPriceOnCreated; + + @JsonProperty("reduceOnly") + boolean reduceOnly; + + @JsonProperty("closeOnTrigger") + boolean closeOnTrigger; + + @JsonProperty("smpType") + String smpType; + + @JsonProperty("smpGroup") + int smpGroup; + + @JsonProperty("smpOrderId") + String smpOrderId; + + @JsonProperty("tpslMode") + String tpslMode; + + @JsonProperty("tpLimitPrice") + String tpLimitPrice; + + @JsonProperty("slLimitPrice") + String slLimitPrice; + + @JsonProperty("placeType") + String placeType; + + @JsonProperty("updatedTime") + Date updatedTime; + + @JsonProperty("createType") + String createType; + + @JsonProperty("marketUnit") + String marketUnit; +} diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitAccountService.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitAccountService.java index f1213d61a91..d00cf8328a7 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitAccountService.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitAccountService.java @@ -103,7 +103,7 @@ private List getAdaptedBalanceWallets() throws IOException { public BybitResult getFeeRates(BybitCategory category, Instrument instrument) throws IOException { - String symbol = ""; + String symbol = null; if (instrument != null) { symbol = BybitAdapters.convertToBybitSymbol(instrument); } diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitAccountServiceRaw.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitAccountServiceRaw.java index d3b41816b16..2dfeec82e30 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitAccountServiceRaw.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitAccountServiceRaw.java @@ -29,7 +29,7 @@ BybitResult getWalletBalances(BybitAccountType accountType) throws IOException { BybitResult walletBalances = bybitAuthenticated.getWalletBalance( - apiKey, signatureCreator, nonceFactory, accountType.name()); + apiKey, signatureCreator, exchange.getTimeStampFactory(), accountType.name()); if (!walletBalances.isSuccess()) { throw createBybitExceptionFromResult(walletBalances); } @@ -40,7 +40,7 @@ BybitResult getAllCoinsBalance(BybitAccountType accountTyp throws IOException { BybitResult allCoinsBalance = bybitAuthenticated.getAllCoinsBalance( - apiKey, signatureCreator, nonceFactory, accountType.name()); + apiKey, signatureCreator, exchange.getTimeStampFactory(), accountType.name()); if (!allCoinsBalance.isSuccess()) { throw createBybitExceptionFromResult(allCoinsBalance); } @@ -49,9 +49,11 @@ BybitResult getAllCoinsBalance(BybitAccountType accountTyp BybitResult getFeeRatesRaw(BybitCategory category, String symbol) throws IOException { - BybitResult bybitFeeRatesResult = - bybitAuthenticated.getFeeRates( - apiKey, signatureCreator, nonceFactory, category.getValue(), symbol); + BybitResult bybitFeeRatesResult; + bybitFeeRatesResult = + bybitAuthenticated.getFeeRate( + apiKey, signatureCreator, exchange.getTimeStampFactory(), category.getValue(), symbol); + if (!bybitFeeRatesResult.isSuccess()) { throw createBybitExceptionFromResult(bybitFeeRatesResult); } @@ -76,7 +78,7 @@ BybitResult setLeverageRaw(BybitCategory category, String symbol, double } BybitResult setLeverageResult = decorateApiCall(() -> - bybitAuthenticated.setLeverage(apiKey, signatureCreator, nonceFactory, payload)) + bybitAuthenticated.setLeverage(apiKey, signatureCreator, exchange.getTimeStampFactory(), payload)) .withRateLimiter(rateLimiter) .withRateLimiter(rateLimiter(GLOBAL_RATE_LIMITER)) .call(); @@ -92,7 +94,7 @@ BybitResult switchPositionModeRaw( BybitSwitchModePayload payload = new BybitSwitchModePayload(category.getValue(), symbol, coin, mode); BybitResult switchModeResult = - bybitAuthenticated.switchMode(apiKey, signatureCreator, nonceFactory, payload); + bybitAuthenticated.switchMode(apiKey, signatureCreator, exchange.getTimeStampFactory(), payload); // retCode=110025, retMsg=Position mode is not modified - also is success if (!switchModeResult.isSuccess() && switchModeResult.getRetCode() != 110025) { throw createBybitExceptionFromResult(switchModeResult); @@ -103,7 +105,7 @@ BybitResult switchPositionModeRaw( BybitResult accountInfoRaw() throws IOException { BybitResult accountInfo = - bybitAuthenticated.getAccountInfo(apiKey, signatureCreator, nonceFactory); + bybitAuthenticated.getAccountInfo(apiKey, signatureCreator, exchange.getTimeStampFactory()); if (!accountInfo.isSuccess()) { throw createBybitExceptionFromResult(accountInfo); } diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitBaseService.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitBaseService.java index 7fa0ceaa7a4..6713a013651 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitBaseService.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitBaseService.java @@ -1,26 +1,19 @@ package org.knowm.xchange.bybit.service; -import java.util.concurrent.TimeUnit; -import org.knowm.xchange.Exchange; import org.knowm.xchange.bybit.Bybit; import org.knowm.xchange.bybit.BybitAuthenticated; import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.client.ExchangeRestProxyBuilder; import org.knowm.xchange.client.ResilienceRegistries; -import org.knowm.xchange.service.BaseExchangeService; import org.knowm.xchange.service.BaseResilientExchangeService; -import org.knowm.xchange.service.BaseService; -import org.knowm.xchange.utils.nonce.CurrentTimeIncrementalNonceFactory; import si.mazi.rescu.ParamsDigest; -import si.mazi.rescu.SynchronizedValueFactory; public class BybitBaseService extends BaseResilientExchangeService { protected final BybitAuthenticated bybitAuthenticated; protected final Bybit bybit; protected final ParamsDigest signatureCreator; - protected final SynchronizedValueFactory nonceFactory = - new CurrentTimeIncrementalNonceFactory(TimeUnit.MILLISECONDS); + protected final String apiKey; protected BybitBaseService(BybitExchange exchange, ResilienceRegistries resilienceRegistries) { @@ -44,4 +37,5 @@ protected BybitBaseService(BybitExchange exchange, ResilienceRegistries resilien BybitDigest.createInstance(exchange.getExchangeSpecification().getSecretKey()); apiKey = exchange.getExchangeSpecification().getApiKey(); } + } diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeService.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeService.java index 4b49675143a..d0d47c86212 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeService.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeService.java @@ -4,8 +4,6 @@ import static org.knowm.xchange.bybit.BybitAdapters.convertToBybitSymbol; import static org.knowm.xchange.bybit.BybitAdapters.createBybitExceptionFromResult; import static org.knowm.xchange.bybit.dto.trade.details.BybitHedgeMode.TWOWAY; -import static org.knowm.xchange.dto.Order.OrderType.ASK; -import static org.knowm.xchange.dto.Order.OrderType.BID; import java.io.IOException; import java.util.ArrayList; @@ -13,13 +11,14 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import lombok.Getter; import org.knowm.xchange.bybit.BybitAdapters; import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; import org.knowm.xchange.bybit.dto.BybitResult; import org.knowm.xchange.bybit.dto.account.BybitCancelAllOrdersResponse; - +import org.knowm.xchange.bybit.dto.trade.BybitCancelAllOrdersParams; +import org.knowm.xchange.bybit.dto.trade.BybitCancelOrderParams; +import org.knowm.xchange.bybit.dto.trade.BybitOpenOrdersParam; import org.knowm.xchange.bybit.dto.trade.BybitOrderResponse; import org.knowm.xchange.bybit.dto.trade.BybitOrderType; import org.knowm.xchange.bybit.dto.trade.details.BybitHedgeMode; @@ -32,6 +31,7 @@ import org.knowm.xchange.dto.Order.OrderType; import org.knowm.xchange.dto.trade.LimitOrder; import org.knowm.xchange.dto.trade.MarketOrder; +import org.knowm.xchange.dto.trade.OpenOrders; import org.knowm.xchange.instrument.Instrument; import org.knowm.xchange.service.trade.TradeService; import org.knowm.xchange.service.trade.params.CancelAllOrders; @@ -39,7 +39,7 @@ import org.knowm.xchange.service.trade.params.CancelOrderByInstrument; import org.knowm.xchange.service.trade.params.CancelOrderByUserReferenceParams; import org.knowm.xchange.service.trade.params.CancelOrderParams; -import org.knowm.xchange.service.trade.params.DefaultCancelOrderByInstrumentAndIdParams; +import org.knowm.xchange.service.trade.params.orders.OpenOrdersParams; public class BybitTradeService extends BybitTradeServiceRaw implements TradeService { @@ -115,12 +115,12 @@ public Collection getOrder(String... orderIds) throws IOException { for (BybitCategory category : BybitCategory.values()) { BybitResult> bybitOrder = - getBybitOrder(category, orderId); + getBybitOrder(category, null,orderId); if (bybitOrder.getResult().getCategory().equals(category) && !bybitOrder.getResult().getList().isEmpty()) { BybitOrderDetail bybitOrderDetail = bybitOrder.getResult().getList().get(0); - Order order = adaptBybitOrderDetails(bybitOrderDetail); + Order order = adaptBybitOrderDetails(bybitOrderDetail,category); results.add(order); } } @@ -128,6 +128,28 @@ public Collection getOrder(String... orderIds) throws IOException { return results; } + @Override + public OpenOrders getOpenOrders(OpenOrdersParams params) throws IOException { + if (params instanceof BybitOpenOrdersParam) { + BybitCategory category = ((BybitOpenOrdersParam) params).getCategory(); + Instrument instrument = ((BybitOpenOrdersParam) params).getInstrument(); + if (category == null) { + throw new UnsupportedOperationException("Category is required"); + } + BybitResult> response = getBybitOrder(category, instrument, null); + List limitOrders = new ArrayList<>(); + if (response != null) { + for(BybitOrderDetail orderDetail:response.getResult().getList()) + limitOrders.add((LimitOrder) adaptBybitOrderDetails(orderDetail,category)); + } else { + throw new UnsupportedOperationException( + "Params must be instance of BybitCancelAllOrdersParams"); + } + return new OpenOrders(limitOrders); + } + return null; + } + @Override public String changeOrder(LimitOrder order) throws IOException { BybitCategory category = BybitAdapters.getCategory(order.getInstrument()); @@ -253,63 +275,4 @@ private int getPositionIdx(Order order) { return positionIdx; } - @Getter - public static final class BybitCancelOrderParams extends DefaultCancelOrderByInstrumentAndIdParams - implements CancelOrderByUserReferenceParams { - - private final String userReference; - - public BybitCancelOrderParams(Instrument instrument, String orderId, String userReference) { - super(instrument, orderId); - this.userReference = userReference; - } - - @Override - public String toString() { - return "BybitCancelOrderParams{" + - "instrument='" + getInstrument() + '\'' + - ", orderId='" + getOrderId() + '\'' + - ", userReference=" + getUserReference() + - '}'; - } - } - - @Getter - public static class BybitCancelAllOrdersParams implements CancelAllOrders { - - private final BybitCategory category; - private final Instrument symbol; - private String baseCoin; - private String settleCoin; - private String orderFilter; - private String stopOrderType; - - public BybitCancelAllOrdersParams(BybitCategory category, Instrument symbol) { - this.category = category; - this.symbol = symbol; - } - - public BybitCancelAllOrdersParams(BybitCategory category, Instrument symbol, String baseCoin, - String settleCoin, String orderFilter, String stopOrderType) { - this.category = category; - this.symbol = symbol; - this.baseCoin = baseCoin; - this.settleCoin = settleCoin; - this.orderFilter = orderFilter; - this.stopOrderType = stopOrderType; - } - - @Override - public String toString() { - return "BybitCancelAllOrdersParams{" + - "category=" + category + - ", symbol=" + symbol + - ", baseCoin='" + baseCoin + '\'' + - ", settleCoin='" + settleCoin + '\'' + - ", orderFilter='" + orderFilter + '\'' + - ", stopOrderType='" + stopOrderType + '\'' + - '}'; - } - } - } diff --git a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeServiceRaw.java b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeServiceRaw.java index 09ffd276fd1..3d6547ba168 100644 --- a/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeServiceRaw.java +++ b/xchange-bybit/src/main/java/org/knowm/xchange/bybit/service/BybitTradeServiceRaw.java @@ -1,5 +1,6 @@ package org.knowm.xchange.bybit.service; +import static org.knowm.xchange.bybit.BybitAdapters.convertToBybitSymbol; import static org.knowm.xchange.bybit.BybitAdapters.createBybitExceptionFromResult; import static org.knowm.xchange.bybit.BybitResilience.GLOBAL_RATE_LIMITER; import static org.knowm.xchange.bybit.BybitResilience.ORDER_AMEND_LINEAR_AND_INVERSE_RATE_LIMITER; @@ -23,11 +24,9 @@ import org.knowm.xchange.bybit.dto.BybitResult; import org.knowm.xchange.bybit.dto.account.BybitCancelAllOrdersPayload; import org.knowm.xchange.bybit.dto.account.BybitCancelAllOrdersResponse; - -import org.knowm.xchange.bybit.dto.trade.BybitOrder.SlTriggerBy; - import org.knowm.xchange.bybit.dto.trade.BybitAmendOrderPayload; import org.knowm.xchange.bybit.dto.trade.BybitCancelOrderPayload; +import org.knowm.xchange.bybit.dto.trade.BybitOrder.SlTriggerBy; import org.knowm.xchange.bybit.dto.trade.BybitOrderResponse; import org.knowm.xchange.bybit.dto.trade.BybitOrderType; import org.knowm.xchange.bybit.dto.trade.BybitPlaceOrderPayload; @@ -36,22 +35,29 @@ import org.knowm.xchange.bybit.dto.trade.details.BybitOrderDetails; import org.knowm.xchange.bybit.dto.trade.details.BybitTimeInForce; import org.knowm.xchange.client.ResilienceRegistries; +import org.knowm.xchange.instrument.Instrument; public class BybitTradeServiceRaw extends BybitBaseService { - protected BybitTradeServiceRaw(BybitExchange exchange, ResilienceRegistries resilienceRegistries) { + protected BybitTradeServiceRaw(BybitExchange exchange, + ResilienceRegistries resilienceRegistries) { super(exchange, resilienceRegistries); } BybitResult> getBybitOrder( - BybitCategory category, String orderId) throws IOException { - BybitResult> order = - bybitAuthenticated.getOpenOrders( - apiKey, signatureCreator, nonceFactory, category.getValue(), orderId); - if (!order.isSuccess()) { - throw createBybitExceptionFromResult(order); + BybitCategory category,Instrument instrument, String orderId) throws IOException { + String symbol = null; + if(instrument != null) { + symbol = convertToBybitSymbol(instrument); + } + + BybitResult> bybitOrder =bybitAuthenticated.getOrders( + apiKey, signatureCreator, exchange.getTimeStampFactory(), category.getValue(), symbol, orderId); + + if (!bybitOrder.isSuccess()) { + throw createBybitExceptionFromResult(bybitOrder); } - return order; + return bybitOrder; } BybitResult amendOrder( @@ -71,10 +77,7 @@ BybitResult amendOrder( String tpLimitPrice, String slLimitPrice) throws IOException { - // if only userId is used, don't need to send id - if (orderId != null && orderId.isEmpty()) { - orderId = null; - } + RateLimiter rateLimiter = getAmendOrderRateLimiter(category); BybitAmendOrderPayload payload = new BybitAmendOrderPayload( @@ -95,7 +98,7 @@ BybitResult amendOrder( slLimitPrice); BybitResult amendOrder = decorateApiCall( - () -> bybitAuthenticated.amendOrder(apiKey, signatureCreator, nonceFactory, payload)) + () -> bybitAuthenticated.amendOrder(apiKey, signatureCreator, exchange.getTimeStampFactory(), payload)) .withRateLimiter(rateLimiter) .withRateLimiter(rateLimiter(GLOBAL_RATE_LIMITER)) .call(); @@ -143,7 +146,8 @@ BybitResult placeOrder( payload.setTimeInForce(timeInForce.getValue()); } BybitResult placeOrder = - decorateApiCall(() -> bybitAuthenticated.placeOrder(apiKey, signatureCreator, nonceFactory, payload)) + decorateApiCall( + () -> bybitAuthenticated.placeOrder(apiKey, signatureCreator, exchange.getTimeStampFactory(), payload)) .withRateLimiter(getCreateOrderRateLimiter(category)) .withRateLimiter(rateLimiter(GLOBAL_RATE_LIMITER)) .call(); @@ -160,7 +164,7 @@ BybitResult cancelOrder( BybitCancelOrderPayload payload = new BybitCancelOrderPayload(category, symbol, orderId, orderLinkId); return decorateApiCall( - () -> bybitAuthenticated.cancelOrder(apiKey, signatureCreator, nonceFactory, payload)) + () -> bybitAuthenticated.cancelOrder(apiKey, signatureCreator, exchange.getTimeStampFactory(), payload)) .withRateLimiter(rateLimiter) .withRateLimiter(rateLimiter(GLOBAL_RATE_LIMITER)) .call(); @@ -172,13 +176,14 @@ BybitResult cancelAllOrders(String category, Strin BybitCancelAllOrdersPayload payload = new BybitCancelAllOrdersPayload(category, symbol, baseCoin, settleCoin, orderFilter, stopOrderType); BybitResult response = - bybitAuthenticated.cancelAllOrders(apiKey, signatureCreator, nonceFactory, payload); + bybitAuthenticated.cancelAllOrders(apiKey, signatureCreator, exchange.getTimeStampFactory(), payload); if (!response.isSuccess()) { throw createBybitExceptionFromResult(response); } return response; } + private RateLimiter getCreateOrderRateLimiter(BybitCategory category) { switch (category) { case LINEAR: @@ -204,6 +209,7 @@ private RateLimiter getCancelOrderRateLimiter(BybitCategory category) { } return null; } + private RateLimiter getAmendOrderRateLimiter(BybitCategory category) { switch (category) { case LINEAR: diff --git a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitCancelAllOrdersExample.java b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitCancelAllOrdersExample.java index b0cf7dd1f09..7b079048a61 100644 --- a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitCancelAllOrdersExample.java +++ b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitCancelAllOrdersExample.java @@ -12,8 +12,8 @@ import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; import org.knowm.xchange.bybit.dto.account.walletbalance.BybitAccountType; +import org.knowm.xchange.bybit.dto.trade.BybitCancelAllOrdersParams; import org.knowm.xchange.bybit.service.BybitAccountService; -import org.knowm.xchange.bybit.service.BybitTradeService.BybitCancelAllOrdersParams; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.derivative.FuturesContract; import org.knowm.xchange.dto.Order.OrderType; diff --git a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitCancelOrderExample.java b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitCancelOrderExample.java index 139f11e8c82..79dc7a53554 100644 --- a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitCancelOrderExample.java +++ b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitCancelOrderExample.java @@ -12,8 +12,8 @@ import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; import org.knowm.xchange.bybit.dto.account.walletbalance.BybitAccountType; +import org.knowm.xchange.bybit.dto.trade.BybitCancelOrderParams; import org.knowm.xchange.bybit.service.BybitAccountService; -import org.knowm.xchange.bybit.service.BybitTradeService.BybitCancelOrderParams; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.derivative.FuturesContract; import org.knowm.xchange.dto.Order.OrderStatus; diff --git a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitPlaceOrderExample.java b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitPlaceOrderExample.java index 03cac8bff0d..9db3878d0d4 100644 --- a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitPlaceOrderExample.java +++ b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitPlaceOrderExample.java @@ -11,11 +11,10 @@ import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; import org.knowm.xchange.bybit.dto.account.walletbalance.BybitAccountType; - +import org.knowm.xchange.bybit.dto.trade.BybitOpenOrdersParam; import org.knowm.xchange.bybit.dto.trade.details.BybitHedgeMode; import org.knowm.xchange.bybit.dto.trade.details.BybitTimeInForce; import org.knowm.xchange.bybit.service.BybitAccountService; - import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.derivative.FuturesContract; import org.knowm.xchange.dto.Order.OrderType; @@ -76,10 +75,20 @@ public static void testOrder() throws IOException { ETH_USDT_PERP, "", new Date(), - ticker.getLow()); + ticker.getHigh()); limitOrder.addOrderFlag(BybitTimeInForce.POSTONLY); limitOrder.addOrderFlag(BybitHedgeMode.TWOWAY); String limitOrderId = exchange.getTradeService().placeLimitOrder(limitOrder); System.out.println("limit order id: " + limitOrderId); + + // Main net only +// for (Order order : exchange.getTradeService().getOrder(limitOrderId)) { +// System.out.println("get order: " + order); +// } + + BybitOpenOrdersParam param = new BybitOpenOrdersParam(ETH_USDT_PERP, BybitCategory.LINEAR); + for (LimitOrder order : exchange.getTradeService().getOpenOrders(param).getOpenOrders()) { + System.out.println("get open orders: " + order); + } } } diff --git a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitRateLimiterTestExample.java b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitRateLimiterTestExample.java index c18fb6c0c21..68c95683473 100644 --- a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitRateLimiterTestExample.java +++ b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/examples/BybitRateLimiterTestExample.java @@ -15,7 +15,9 @@ import org.knowm.xchange.ExchangeFactory; import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.bybit.BybitExchange; +import org.knowm.xchange.bybit.dto.BybitCategory; import org.knowm.xchange.bybit.dto.account.walletbalance.BybitAccountType; +import org.knowm.xchange.bybit.service.BybitAccountService; import org.knowm.xchange.bybit.service.BybitTradeService; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.derivative.FuturesContract; @@ -50,6 +52,8 @@ private static void testRateLimits() throws IOException, InterruptedException { Ticker tickerETH_USDT_PERP = exchange.getMarketDataService().getTicker(ETH_USDT_PERP); LimitOrder limitOrderFuture = createOrder("", tickerETH_USDT_PERP.getHigh()); + BybitAccountService bybitAccountService = (BybitAccountService) exchange.getAccountService(); + bybitAccountService.switchPositionMode(BybitCategory.LINEAR, ETH_USDT_PERP, "USDT", 0); String limitFutureOrderId = exchange.getTradeService().placeLimitOrder(limitOrderFuture); BybitTradeService bybitTradeService = (BybitTradeService) exchange.getTradeService(); exchange.getResilienceRegistries().rateLimiters().rateLimiter(GLOBAL_RATE_LIMITER) @@ -68,11 +72,11 @@ private static void testRateLimits() throws IOException, InterruptedException { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3); for (int i = 0; i < 8; i++) { int finalI = i; -// System.out.println(sdf.format(new Date()) + -// " Amend order, availablePermissions: " + exchange.getResilienceRegistries() -// .rateLimiters() -// .rateLimiter(ORDER_AMEND_LINEAR_AND_INVERSE_RATE_LIMITER).getMetrics() -// .getAvailablePermissions()); + System.out.println(sdf.format(new Date()) + + " Amend order, availablePermissions: " + exchange.getResilienceRegistries() + .rateLimiters() + .rateLimiter(ORDER_AMEND_LINEAR_AND_INVERSE_RATE_LIMITER).getMetrics() + .getAvailablePermissions()); Thread.sleep(1); executor.execute(() -> { try { diff --git a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitAccountServiceTest.java b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitAccountServiceTest.java index 95f85b4c725..e880d7a3bcb 100644 --- a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitAccountServiceTest.java +++ b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitAccountServiceTest.java @@ -1,27 +1,17 @@ package org.knowm.xchange.bybit.service; -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; -import jakarta.ws.rs.core.Response.Status; import java.io.IOException; import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import org.apache.commons.io.IOUtils; -import org.junit.Before; import org.junit.Test; -import org.knowm.xchange.Exchange; import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; import org.knowm.xchange.bybit.dto.BybitResult; import org.knowm.xchange.bybit.dto.account.feerates.BybitFeeRate; import org.knowm.xchange.bybit.dto.account.feerates.BybitFeeRates; import org.knowm.xchange.bybit.dto.account.walletbalance.BybitAccountType; -import org.knowm.xchange.bybit.service.BybitTradeService.BybitCancelAllOrdersParams; import org.knowm.xchange.currency.Currency; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.derivative.FuturesContract; diff --git a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitTradeServiceRawTest.java b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitTradeServiceRawTest.java index 34450da652a..6950613b64c 100644 --- a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitTradeServiceRawTest.java +++ b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitTradeServiceRawTest.java @@ -6,7 +6,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; import static org.assertj.core.api.Assertions.assertThat; import static org.knowm.xchange.bybit.BybitAdapters.convertToBybitSymbol; -import static org.knowm.xchange.bybit.dto.trade.BybitSide.BUY; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -16,7 +15,6 @@ import java.nio.charset.StandardCharsets; import org.apache.commons.io.IOUtils; import org.junit.Test; -import org.knowm.xchange.Exchange; import org.knowm.xchange.bybit.BybitAdapters; import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; @@ -24,7 +22,6 @@ import org.knowm.xchange.bybit.dto.trade.BybitOrderResponse; import org.knowm.xchange.bybit.dto.trade.BybitOrderStatus; import org.knowm.xchange.bybit.dto.trade.BybitOrderType; -import org.knowm.xchange.bybit.dto.trade.BybitSide; import org.knowm.xchange.bybit.dto.trade.details.BybitOrderDetail; import org.knowm.xchange.bybit.dto.trade.details.BybitOrderDetails; import org.knowm.xchange.bybit.dto.trade.details.BybitTimeInForce; @@ -32,7 +29,6 @@ import org.knowm.xchange.bybit.dto.trade.details.spot.BybitSpotOrderDetail; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.Order.OrderType; -import org.knowm.xchange.instrument.Instrument; public class BybitTradeServiceRawTest extends BaseWiremockTest { @@ -48,7 +44,7 @@ public void testGetBybitLinearDetailOrder() throws IOException { BybitResult> actualOrderDetails = bybitAccountServiceRaw.getBybitOrder( - BybitCategory.LINEAR, "fd4300ae-7847-404e-b947-b46980a4d140"); + BybitCategory.LINEAR,null,"fd4300ae-7847-404e-b947-b46980a4d140"); assertThat(actualOrderDetails.getResult().getList()).hasSize(1); @@ -151,7 +147,7 @@ public void testGetBybitSpotDetailOrder() throws IOException { BybitResult> actualOrderDetails = bybitAccountServiceRaw.getBybitOrder( - BybitCategory.SPOT, "fd4300ae-7847-404e-b947-b46980a4d140"); + BybitCategory.SPOT, null,"fd4300ae-7847-404e-b947-b46980a4d140"); assertThat(actualOrderDetails.getResult().getList()).hasSize(1); diff --git a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitTradeServiceTest.java b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitTradeServiceTest.java index f4f90d2336d..2a09341c309 100644 --- a/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitTradeServiceTest.java +++ b/xchange-bybit/src/test/java/org/knowm/xchange/bybit/service/BybitTradeServiceTest.java @@ -1,21 +1,21 @@ package org.knowm.xchange.bybit.service; import static org.assertj.core.api.Assertions.assertThat; - import static org.junit.Assert.fail; import static org.knowm.xchange.currency.CurrencyPair.BTC_USDT; -import com.github.tomakehurst.wiremock.matching.ContainsPattern; import java.io.IOException; import java.math.BigDecimal; import java.util.Collection; import java.util.Date; +import java.util.List; import org.junit.Before; import org.junit.Test; import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; -import org.knowm.xchange.bybit.service.BybitTradeService.BybitCancelAllOrdersParams; -import org.knowm.xchange.bybit.service.BybitTradeService.BybitCancelOrderParams; +import org.knowm.xchange.bybit.dto.trade.BybitCancelAllOrdersParams; +import org.knowm.xchange.bybit.dto.trade.BybitCancelOrderParams; +import org.knowm.xchange.bybit.dto.trade.BybitOpenOrdersParam; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.derivative.FuturesContract; import org.knowm.xchange.dto.Order; @@ -24,46 +24,67 @@ import org.knowm.xchange.dto.trade.LimitOrder; import org.knowm.xchange.dto.trade.MarketOrder; import org.knowm.xchange.instrument.Instrument; +import org.knowm.xchange.service.trade.TradeService; public class BybitTradeServiceTest extends BaseWiremockTest { BybitExchange bybitExchange; - BybitTradeService bybitTradeService; + TradeService tradeService; @Before public void setUp() throws IOException { bybitExchange = createExchange(); - bybitTradeService = new BybitTradeService(bybitExchange, - bybitExchange.getResilienceRegistries()); + tradeService = new BybitTradeService(bybitExchange, bybitExchange.getResilienceRegistries()); } @Test - public void testGetBybitOrder() throws IOException { - initGetStub("/v5/order/realtime", "/getOrder.json5", "orderId", - new ContainsPattern("fd4300ae-7847-404e-b947-b46980a4d140")); + public void testGetBybitOrderLinear() throws IOException { + initGetStub("/v5/order/realtime", "/getOrderLinear.json5"); - Collection orders = bybitTradeService.getOrder("fd4300ae-7847-404e-b947-b46980a4d140"); + Collection orders = tradeService.getOrder("fd4300ae-7847-404e-b947-b46980a4d140"); assertThat(orders.size()).isEqualTo(1); Order order = (Order) orders.toArray()[0]; assertThat(order.getType()).isEqualTo(OrderType.BID); - assertThat(order.getInstrument()).isEqualTo(new CurrencyPair("ETH", "USDT")); + assertThat(order.getInstrument()).isEqualTo(new FuturesContract("ETH/USDT/PERP")); assertThat(order.getAveragePrice()).isEqualTo(new BigDecimal("0")); assertThat(order.getStatus()).isEqualTo(OrderStatus.NEW); assertThat(order.getOriginalAmount()).isEqualTo(new BigDecimal("0.10")); } + @Test + public void testGetOrders() throws IOException { + initGetStub("/v5/order/realtime", "/getOrderLinear.json5"); + BybitOpenOrdersParam param = new BybitOpenOrdersParam(new FuturesContract("ETH/USDT/PERP"), BybitCategory.LINEAR); + List limitOrder = tradeService.getOpenOrders(param).getOpenOrders(); + + assertThat(limitOrder.size()).isEqualTo(1); + + Order order = (Order) limitOrder.toArray()[0]; + assertThat(order.getType()).isEqualTo(OrderType.BID); + assertThat(order.getInstrument()).isEqualTo(new FuturesContract("ETH/USDT/PERP")); + assertThat(order.getStatus()).isEqualTo(OrderStatus.NEW); + assertThat(order.getOriginalAmount()).isEqualTo(new BigDecimal("0.10")); + } + + @Test(expected = BybitException.class) + public void testGetOrdersError() throws IOException { + initGetStub("/v5/order/realtime", "/getOrderError.json5"); + BybitOpenOrdersParam param = new BybitOpenOrdersParam(new FuturesContract("ETH/USDT/PERP"), BybitCategory.LINEAR); + List limitOrder = tradeService.getOpenOrders(param).getOpenOrders(); + } + @Test public void testPlaceBybitOrder() throws IOException { initPostStub("/v5/order/create", "/placeMarketOrder.json5"); MarketOrder marketOrder = new MarketOrder(OrderType.ASK, new BigDecimal("0.1"), new CurrencyPair("BTC", "USDT")); - String marketOrderId = bybitTradeService.placeMarketOrder(marketOrder); + String marketOrderId = tradeService.placeMarketOrder(marketOrder); assertThat(marketOrderId).isEqualTo("1321003749386327552"); LimitOrder limitOrder = new LimitOrder(OrderType.EXIT_BID, new BigDecimal("0.1"), new CurrencyPair("BTC", "USDT"), "", new Date(1672211918471L), new BigDecimal("110")); - String limitOrderId = bybitTradeService.placeLimitOrder(limitOrder); + String limitOrderId = tradeService.placeLimitOrder(limitOrder); assertThat(limitOrderId).isEqualTo("1321003749386327552"); } @@ -75,7 +96,7 @@ public void testChangeBybitOrder() throws IOException { new CurrencyPair("BTC", "USDT"), "", new Date(1672211918471L), new BigDecimal("110")); String orderId = - bybitTradeService.changeOrder(limitOrder); + tradeService.changeOrder(limitOrder); assertThat(orderId).isEqualTo("c6f055d9-7f21-4079-913d-e6523a9cfffa"); } @@ -86,15 +107,15 @@ public void testCancelBybitOrder() throws IOException { Instrument BTC_USDT_PERP = new FuturesContract("BTC/USDT/PERP"); try { - bybitTradeService.cancelOrder(new BybitCancelOrderParams(BTC_USDT, "", "")); + tradeService.cancelOrder(new BybitCancelOrderParams(BTC_USDT, "", "")); fail("Expected UnsupportedOperationException"); } catch (UnsupportedOperationException ignored) { } - boolean resultSpot = bybitTradeService.cancelOrder( + boolean resultSpot = tradeService.cancelOrder( new BybitCancelOrderParams(BTC_USDT, "c6f055d9-7f21-4079-913d-e6523a9cfffa", "")); - boolean resultFuture0 = bybitTradeService.cancelOrder( + boolean resultFuture0 = tradeService.cancelOrder( new BybitCancelOrderParams(BTC_USDT_PERP, "c6f055d9-7f21-4079-913d-e6523a9cfffa", "")); - boolean resultFuture1 = bybitTradeService.cancelOrder( + boolean resultFuture1 = tradeService.cancelOrder( new BybitCancelOrderParams(BTC_USDT_PERP, "", "linear-004")); assertThat(resultSpot).isTrue(); assertThat(resultFuture0).isTrue(); @@ -107,23 +128,23 @@ public void testCancelAllBybitOrder() throws IOException { Instrument BTC_USDT_PERP = new FuturesContract("BTC/USDT/PERP"); try { - bybitTradeService.cancelAllOrders(new BybitCancelAllOrdersParams( + tradeService.cancelAllOrders(new BybitCancelAllOrdersParams( null, null)); fail("Expected UnsupportedOperationException"); } catch (UnsupportedOperationException ignored) { } try { - bybitTradeService.cancelAllOrders(new BybitCancelAllOrdersParams(BybitCategory.LINEAR, null)); + tradeService.cancelAllOrders(new BybitCancelAllOrdersParams(BybitCategory.LINEAR, null)); fail("Expected UnsupportedOperationException"); } catch (UnsupportedOperationException ignored) { } - Collection resultSpot = bybitTradeService.cancelAllOrders( + Collection resultSpot = tradeService.cancelAllOrders( new BybitCancelAllOrdersParams( BybitCategory.SPOT, null)); - Collection resultFuture0 = bybitTradeService.cancelAllOrders( + Collection resultFuture0 = tradeService.cancelAllOrders( new BybitCancelAllOrdersParams(BybitCategory.LINEAR, BTC_USDT_PERP)); assertThat(resultSpot.stream().findFirst().get()).isEqualTo("1616024329462743808"); diff --git a/xchange-bybit/src/test/resources/getOrderError.json5 b/xchange-bybit/src/test/resources/getOrderError.json5 new file mode 100644 index 00000000000..5d950996701 --- /dev/null +++ b/xchange-bybit/src/test/resources/getOrderError.json5 @@ -0,0 +1 @@ +{"retCode":10032,"retMsg":"Demo trading are not supported.","result":{},"retExtInfo":{},"time":1736869113003} \ No newline at end of file diff --git a/xchange-bybit/src/test/resources/getOrderLinear.json5 b/xchange-bybit/src/test/resources/getOrderLinear.json5 new file mode 100644 index 00000000000..f6078291770 --- /dev/null +++ b/xchange-bybit/src/test/resources/getOrderLinear.json5 @@ -0,0 +1,55 @@ +{ + "retCode": 0, + "retMsg": "OK", + "result": { + "list": [ + { + "orderId": "fd4300ae-7847-404e-b947-b46980a4d140", + "orderLinkId": "test-000005", + "blockTradeId": "", + "symbol": "ETHUSDT", + "price": "1600.00", + "qty": "0.10", + "side": "Buy", + "isLeverage": "", + "positionIdx": 1, + "orderStatus": "New", + "cancelType": "UNKNOWN", + "rejectReason": "EC_NoError", + "avgPrice": "0", + "leavesQty": "0.10", + "leavesValue": "160", + "cumExecQty": "0.00", + "cumExecValue": "0", + "cumExecFee": "0", + "timeInForce": "GTC", + "orderType": "Limit", + "stopOrderType": "UNKNOWN", + "orderIv": "", + "triggerPrice": "0.00", + "takeProfit": "2500.00", + "stopLoss": "1500.00", + "tpTriggerBy": "LastPrice", + "slTriggerBy": "LastPrice", + "triggerDirection": 0, + "triggerBy": "UNKNOWN", + "lastPriceOnCreated": "", + "reduceOnly": false, + "closeOnTrigger": false, + "smpType": "None", + "smpGroup": 0, + "smpOrderId": "", + "tpslMode": "Full", + "tpLimitPrice": "", + "slLimitPrice": "", + "placeType": "", + "createdTime": "1684738540559", + "updatedTime": "1684738540561" + } + ], + "nextPageCursor": "page_args%3Dfd4300ae-7847-404e-b947-b46980a4d140%26symbol%3D6%26", + "category": "linear" + }, + "retExtInfo": {}, + "time": 1684765770483 +} \ No newline at end of file diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java index 560cabf3668..19108ae540b 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java @@ -332,4 +332,14 @@ public void setChannelInactiveHandler( WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) { streamingService.setChannelInactiveHandler(channelInactiveHandler); } + + @Override + public void resubscribeChannels() { + streamingService.resubscribeChannels(); + } + + @Override + public Observable connectionIdle() { + return streamingService.subscribeIdle(); + } } diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java index e1956902bc2..323242bdfdd 100644 --- a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java +++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingExchange.java @@ -2,8 +2,12 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingExchange; -import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; +import info.bitrich.xchangestream.service.netty.ConnectionStateModel.State; +import info.bitrich.xchangestream.service.netty.WebSocketClientHandler.WebSocketMessageHandler; import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Observable; +import java.util.ArrayList; +import java.util.List; import org.knowm.xchange.bybit.BybitExchange; import org.knowm.xchange.bybit.dto.BybitCategory; import org.slf4j.Logger; @@ -29,36 +33,49 @@ public class BybitStreamingExchange extends BybitExchange implements StreamingEx private BybitStreamingMarketDataService streamingMarketDataService; private BybitStreamingTradeService streamingTradeService; + private BybitUserDataStreamingService userDataStreamingService; + @Override protected void initServices() { super.initServices(); this.streamingService = new BybitStreamingService(getApiUrl(), exchangeSpecification); + if (isApiKeyValid()) { + this.userDataStreamingService = new BybitUserDataStreamingService( + getApiUrlWithAuth(), exchangeSpecification); + } this.streamingMarketDataService = new BybitStreamingMarketDataService(streamingService); - this.streamingTradeService = new BybitStreamingTradeService(streamingService); + this.streamingTradeService = new BybitStreamingTradeService(userDataStreamingService); + } + + private boolean isApiKeyValid() { + return exchangeSpecification.getApiKey() != null && !exchangeSpecification.getApiKey() + .isEmpty(); } private String getApiUrl() { String apiUrl; - if (exchangeSpecification.getApiKey() == null) { - if (Boolean.TRUE.equals( - exchangeSpecification.getExchangeSpecificParametersItem(SPECIFIC_PARAM_TESTNET))) { - apiUrl = TESTNET_URI; - } else { - apiUrl = URI; - } - apiUrl += "/" + ((BybitCategory) exchangeSpecification.getExchangeSpecificParametersItem( - EXCHANGE_TYPE)).getValue(); + if (Boolean.TRUE.equals( + exchangeSpecification.getExchangeSpecificParametersItem(SPECIFIC_PARAM_TESTNET))) { + apiUrl = TESTNET_URI; + } else { + apiUrl = URI; + } + apiUrl += "/" + ((BybitCategory) exchangeSpecification.getExchangeSpecificParametersItem( + EXCHANGE_TYPE)).getValue(); + return apiUrl; + } + + private String getApiUrlWithAuth() { + String apiUrl; + if (Boolean.TRUE.equals( + exchangeSpecification.getExchangeSpecificParametersItem(USE_SANDBOX))) { + apiUrl = DEMO_AUTH_URI; } else { if (Boolean.TRUE.equals( - exchangeSpecification.getExchangeSpecificParametersItem(USE_SANDBOX))) { - apiUrl = DEMO_AUTH_URI; + exchangeSpecification.getExchangeSpecificParametersItem(SPECIFIC_PARAM_TESTNET))) { + apiUrl = TESTNET_AUTH_URI; } else { - if (Boolean.TRUE.equals( - exchangeSpecification.getExchangeSpecificParametersItem(SPECIFIC_PARAM_TESTNET))) { - apiUrl = TESTNET_AUTH_URI; - } else { - apiUrl = AUTH_URI; - } + apiUrl = AUTH_URI; } } return apiUrl; @@ -67,18 +84,51 @@ private String getApiUrl() { @Override public Completable connect(ProductSubscription... args) { LOG.info("Connect to BybitStream"); - return streamingService.connect(); + List completableList = new ArrayList<>(); + completableList.add(streamingService.connect()); + if (isApiKeyValid()) { + LOG.info("Connect to BybitStream with auth"); + completableList.add(userDataStreamingService.connect()); + } + return Completable.concat(completableList); } @Override public Completable disconnect() { - streamingService.pingPongDisconnectIfConnected(); - return streamingService.disconnect(); + List completableList = new ArrayList<>(); + if (streamingService != null) { + streamingService.pingPongDisconnectIfConnected(); + completableList.add(streamingService.disconnect()); + streamingService = null; + } + if (userDataStreamingService != null) { + userDataStreamingService.pingPongDisconnectIfConnected(); + completableList.add(userDataStreamingService.disconnect()); + userDataStreamingService = null; + } + return Completable.concat(completableList); + } + + @Override + public BybitStreamingTradeService getStreamingTradeService() { + if (userDataStreamingService != null && userDataStreamingService.isAuthorized()) { + return streamingTradeService; + } else { + throw new IllegalArgumentException("Authentication required for private streams"); + } } @Override public boolean isAlive() { - return streamingService != null && streamingService.isSocketOpen(); + // In normal situation - streamingService is always runs, userDataStreamingService - depends + if (streamingService != null && streamingService.isSocketOpen()) { + if (isApiKeyValid()) { + return userDataStreamingService != null && userDataStreamingService.isSocketOpen(); + } + return true; + } else { + return false; + } } @Override @@ -91,18 +141,53 @@ public BybitStreamingMarketDataService getStreamingMarketDataService() { return streamingMarketDataService; } - @Override - public BybitStreamingTradeService getStreamingTradeService() { - return streamingTradeService; - } - /** * Enables the user to listen on channel inactive events and react appropriately. * * @param channelInactiveHandler a WebSocketMessageHandler instance. */ public void setChannelInactiveHandler( - WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) { + WebSocketMessageHandler channelInactiveHandler) { streamingService.setChannelInactiveHandler(channelInactiveHandler); } + + public void setUserDataChannelInactiveHandler( + WebSocketMessageHandler channelInactiveHandler) { + userDataStreamingService.setChannelInactiveHandler(channelInactiveHandler); + } + + @Override + public Observable reconnectFailure() { + return streamingService.subscribeReconnectFailure(); + } + + public Observable reconnectFailurePrivateChannel() { + return userDataStreamingService.subscribeReconnectFailure(); + } + + @Override + public Observable connectionStateObservable() { + return streamingService.subscribeConnectionState(); + } + + public Observable connectionStateObservablePrivateChannel() { + return userDataStreamingService.subscribeConnectionState(); + } + + @Override + public void resubscribeChannels() { + streamingService.resubscribeChannels(); + if (userDataStreamingService != null) { + userDataStreamingService.resubscribeChannels(); + } + } + + @Override + public Observable connectionIdle() { + return streamingService.subscribeIdle(); + } + + public Observable connectionIdlePrivateChannel() { + return userDataStreamingService.subscribeIdle(); + } } diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingMarketDataService.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingMarketDataService.java index 8ed3f60f1d4..1bb665b869c 100644 --- a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingMarketDataService.java +++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingMarketDataService.java @@ -3,6 +3,7 @@ import static org.knowm.xchange.bybit.BybitAdapters.convertToBybitSymbol; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import dto.marketdata.BybitOrderbook; import dto.marketdata.BybitPublicOrder; import dto.trade.BybitTrade; @@ -14,7 +15,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -62,35 +62,51 @@ public Observable getOrderBook(Instrument instrument, Object... args) String channelUniqueId = ORDERBOOK + depth + "." + convertToBybitSymbol(instrument); return streamingService .subscribeChannel(channelUniqueId) - .flatMap( + .map( jsonNode -> { - BybitOrderbook bybitOrderBooks = mapper.treeToValue(jsonNode, BybitOrderbook.class); - String type = bybitOrderBooks.getDataType(); - if (type.equalsIgnoreCase("snapshot")) { - OrderBook orderBook = - BybitStreamAdapters.adaptOrderBook(bybitOrderBooks, instrument); - orderBookUpdateIdPrev.set(bybitOrderBooks.getData().getU()); - orderBookMap.put(channelUniqueId, orderBook); - return Observable.just(orderBook); - } else if (type.equalsIgnoreCase("delta")) { - return applyDeltaSnapshot( - channelUniqueId, instrument, bybitOrderBooks, orderBookUpdateIdPrev); + try { + BybitOrderbook bybitOrderBooks = mapper.treeToValue(jsonNode, BybitOrderbook.class); + String type = bybitOrderBooks.getDataType(); + if (type.equalsIgnoreCase("snapshot")) { + OrderBook orderBook = + BybitStreamAdapters.adaptOrderBook(bybitOrderBooks, instrument); + orderBookUpdateIdPrev.set(bybitOrderBooks.getData().getU()); + orderBookMap.put(channelUniqueId, orderBook); + return orderBook; + } else if (type.equalsIgnoreCase("delta")) { + return applyDeltaSnapshot( + channelUniqueId, instrument, bybitOrderBooks, orderBookUpdateIdPrev); + } + return new OrderBook(null, Lists.newArrayList(), Lists.newArrayList(), false); + } catch (IllegalStateException e) { + LOG.warn( + "Resubscribing {} channel after adapter error {}", instrument, e.getMessage()); + // Resubscribe to the channel, triggering a new snapshot + orderBookMap.remove(channelUniqueId); + if (streamingService.isSocketOpen()) { + streamingService.sendMessage( + streamingService.getUnsubscribeMessage(channelUniqueId, args)); + streamingService.sendMessage( + streamingService.getSubscribeMessage(channelUniqueId, args)); + } + return new OrderBook(null, Lists.newArrayList(), Lists.newArrayList(), false); } - return Observable.fromIterable(new LinkedList<>()); - }); + }) + .filter(orderBook -> !orderBook.getBids().isEmpty() && !orderBook.getAsks().isEmpty()); } - private Observable applyDeltaSnapshot( + private OrderBook applyDeltaSnapshot( String channelUniqueId, Instrument instrument, BybitOrderbook bybitOrderBookUpdate, AtomicLong orderBookUpdateIdPrev) { OrderBook orderBook = orderBookMap.getOrDefault(channelUniqueId, null); if (orderBook == null) { - LOG.error("Failed to get orderBook, channelUniqueId= {}", channelUniqueId); - return Observable.fromIterable(new LinkedList<>()); + LOG.debug("Failed to get orderBook, channelUniqueId= {}", channelUniqueId); + return new OrderBook(null, Lists.newArrayList(), Lists.newArrayList(), false); } - if (orderBookUpdateIdPrev.incrementAndGet() == bybitOrderBookUpdate.getData().getU()) { + if (orderBookUpdateIdPrev.incrementAndGet() == bybitOrderBookUpdate.getData() + .getU()) { LOG.debug( "orderBookUpdate id {}, seq {} ", bybitOrderBookUpdate.getData().getU(), @@ -111,14 +127,13 @@ private Observable applyDeltaSnapshot( if (orderBookUpdatesSubscriptions.get(instrument) != null) { orderBookUpdatesSubscriptions(instrument, asks, bids, timestamp); } - return Observable.just(orderBook); + return orderBook; } else { LOG.error( "orderBookUpdate id sequence failed, expected {}, in fact {}", - orderBookUpdateIdPrev, + orderBookUpdateIdPrev.get(), bybitOrderBookUpdate.getData().getU()); - // resubscribe or what here? - return Observable.fromIterable(new LinkedList<>()); + throw new IllegalStateException("orderBookUpdate id sequence failed"); } } diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java index 4125cafbe80..de84da1c8e6 100644 --- a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java +++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingService.java @@ -1,37 +1,25 @@ package info.bitrich.xchangestream.bybit; import static info.bitrich.xchangestream.bybit.BybitStreamingExchange.EXCHANGE_TYPE; -import static org.knowm.xchange.utils.DigestUtils.bytesToHex; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import dto.BybitSubscribeMessage; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; -import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler; +import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextHandler; import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.CompletableSource; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; -import java.time.Instant; import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.crypto.Mac; -import javax.crypto.SecretKey; -import javax.crypto.spec.SecretKeySpec; -import lombok.Getter; +import lombok.Setter; import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.bybit.dto.BybitCategory; -import org.knowm.xchange.exceptions.ExchangeException; -import org.knowm.xchange.service.BaseParamsDigest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,8 +30,10 @@ public class BybitStreamingService extends JsonNettyStreamingService { private final Observable pingPongSrc = Observable.interval(15, 20, TimeUnit.SECONDS); private Disposable pingPongSubscription; private final ExchangeSpecification spec; + @Setter private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; - @Getter private boolean isAuthorized = false; + + public BybitStreamingService(String apiUrl, ExchangeSpecification spec) { super(apiUrl); @@ -59,9 +49,6 @@ public Completable connect() { return conn.andThen( (CompletableSource) (completable) -> { - if (spec.getApiKey() != null) { - login(); - } pingPongDisconnectIfConnected(); pingPongSubscription = pingPongSrc.subscribe(o -> this.sendMessage("{\"op\":\"ping\"}")); @@ -69,27 +56,6 @@ public Completable connect() { }); } - private void login() { - String key = spec.getApiKey(); - long expires = Instant.now().toEpochMilli() + 10000; - String _val = "GET/realtime" + expires; - try { - Mac mac = Mac.getInstance(BaseParamsDigest.HMAC_SHA_256); - final SecretKey secretKey = - new SecretKeySpec( - spec.getSecretKey().getBytes(StandardCharsets.UTF_8), BaseParamsDigest.HMAC_SHA_256); - mac.init(secretKey); - String signature = bytesToHex(mac.doFinal(_val.getBytes(StandardCharsets.UTF_8))); - List args = - Stream.of(key, String.valueOf(expires), signature).collect(Collectors.toList()); - String message = objectMapper.writeValueAsString(new BybitSubscribeMessage("auth", args)); - this.sendMessage(message); - } catch (NoSuchAlgorithmException | InvalidKeyException e) { - throw new ExchangeException("Invalid API secret", e); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } @Override protected String getChannelNameFromMessage(JsonNode message) { @@ -101,14 +67,14 @@ protected String getChannelNameFromMessage(JsonNode message) { @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { - LOG.info(" getSubscribeMessage {}", channelName); + LOG.info("getSubscribeMessage {}", channelName); return objectMapper.writeValueAsString( new BybitSubscribeMessage("subscribe", Collections.singletonList(channelName))); } @Override public String getUnsubscribeMessage(String channelName, Object... args) throws IOException { - LOG.info(" getUnsubscribeMessage {}", channelName); + LOG.info("getUnsubscribeMessage {}", channelName); return objectMapper.writeValueAsString( new BybitSubscribeMessage("unsubscribe", Collections.singletonList(channelName))); } @@ -133,19 +99,20 @@ public void messageHandler(String message) { } if (success) { switch (op) { - case "pong": case "subscribe": case "unsubscribe": { break; } - case "auth": - { - isAuthorized = true; - break; - } } return; + } else { + // different op result of public channels and private channels + // https://bybit-exchange.github.io/docs/v5/ws/connect#how-to-send-the-heartbeat-packet + if (op.equals("ping") || op.equals("pong")) { + LOG.debug("Received PONG message: {}", message); + return; + } } handleMessage(jsonNode); } @@ -158,11 +125,38 @@ public void pingPongDisconnectIfConnected() { @Override protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() { - return WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler.INSTANCE; + return WebSocketClientCompressionAllowClientNoContextHandler.INSTANCE; + } + + @Override + protected WebSocketClientHandler getWebSocketClientHandler( + WebSocketClientHandshaker handshake, WebSocketClientHandler.WebSocketMessageHandler handler) { + LOG.info("Registering BybitWebSocketClientHandler"); + return new BybitWebSocketClientHandler(handshake, handler); } - public void setChannelInactiveHandler( - WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) { - this.channelInactiveHandler = channelInactiveHandler; + /** + * Custom client handler in order to execute an external, user-provided handler on channel events. + */ + class BybitWebSocketClientHandler extends NettyWebSocketClientHandler { + + public BybitWebSocketClientHandler( + WebSocketClientHandshaker handshake, WebSocketMessageHandler handler) { + super(handshake, handler); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + super.channelInactive(ctx); + if (channelInactiveHandler != null) { + channelInactiveHandler.onMessage("WebSocket Client disconnected!"); + } + } } + } diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingTradeService.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingTradeService.java index a7521662635..1adf35bf58f 100644 --- a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingTradeService.java +++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitStreamingTradeService.java @@ -9,16 +9,17 @@ import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import io.reactivex.rxjava3.core.Observable; import org.knowm.xchange.bybit.dto.BybitCategory; +import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.Order; import org.knowm.xchange.dto.account.OpenPosition; import org.knowm.xchange.instrument.Instrument; public class BybitStreamingTradeService implements StreamingTradeService { - private final BybitStreamingService streamingService; + private final BybitUserDataStreamingService streamingService; private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); - public BybitStreamingTradeService(BybitStreamingService streamingService) { + public BybitStreamingTradeService(BybitUserDataStreamingService streamingService) { this.streamingService = streamingService; } @@ -29,8 +30,8 @@ public BybitStreamingTradeService(BybitStreamingService streamingService) { */ public Observable getOrderChanges(Instrument instrument, Object... args) { String channelUniqueId = "order"; - if(args[0] != null && args[0] instanceof BybitCategory) { - channelUniqueId += "." + ((BybitCategory)args[0]).getValue(); + if (args[0] != null && args[0] instanceof BybitCategory) { + channelUniqueId += "." + ((BybitCategory) args[0]).getValue(); } return streamingService .subscribeChannel(channelUniqueId) @@ -43,6 +44,11 @@ public Observable getOrderChanges(Instrument instrument, Object... args) }); } + @Override + public Observable getOrderChanges(CurrencyPair pair, Object... args) { + return getOrderChanges((Instrument) pair, args); + } + public Observable getComplexOrderChanges(BybitCategory category) { String channelUniqueId = "order"; if (category != null) { diff --git a/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitUserDataStreamingService.java b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitUserDataStreamingService.java new file mode 100644 index 00000000000..b30f405c21f --- /dev/null +++ b/xchange-stream-bybit/src/main/java/info/bitrich/xchangestream/bybit/BybitUserDataStreamingService.java @@ -0,0 +1,216 @@ +package info.bitrich.xchangestream.bybit; + +import static org.knowm.xchange.utils.DigestUtils.bytesToHex; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import dto.BybitSubscribeMessage; +import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; +import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextHandler; +import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableSource; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.disposables.Disposable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.crypto.Mac; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; +import lombok.Getter; +import lombok.Setter; +import org.knowm.xchange.ExchangeSpecification; +import org.knowm.xchange.exceptions.ExchangeException; +import org.knowm.xchange.service.BaseParamsDigest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BybitUserDataStreamingService extends JsonNettyStreamingService { + + private static final Logger LOG = LoggerFactory.getLogger(BybitUserDataStreamingService.class); + private Disposable pingPongSubscription; + private final Observable pingPongSrc = Observable.interval(15, 20, TimeUnit.SECONDS); + private final ExchangeSpecification spec; + @Getter + private boolean isAuthorized = false; + @Setter + private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; + + + public BybitUserDataStreamingService(String url, ExchangeSpecification spec) { + super(url); + this.spec = spec; + // this.setEnableLoggingHandler(true); + } + + @Override + public Completable connect() { + Completable conn = super.connect(); + return conn.andThen( + (CompletableSource) + (completable) -> { + login(); + pingPongDisconnectIfConnected(); + pingPongSubscription = + pingPongSrc.subscribe(o -> this.sendMessage("{\"op\":\"ping\"}")); + completable.onComplete(); + }); + } + + + private void login() { + String key = spec.getApiKey(); + long expires = Instant.now().toEpochMilli() + 10000; + String _val = "GET/realtime" + expires; + try { + Mac mac = Mac.getInstance(BaseParamsDigest.HMAC_SHA_256); + final SecretKey secretKey = + new SecretKeySpec( + spec.getSecretKey().getBytes(StandardCharsets.UTF_8), BaseParamsDigest.HMAC_SHA_256); + mac.init(secretKey); + String signature = bytesToHex(mac.doFinal(_val.getBytes(StandardCharsets.UTF_8))); + List args = + Stream.of(key, String.valueOf(expires), signature).collect(Collectors.toList()); + String message = objectMapper.writeValueAsString(new BybitSubscribeMessage("auth", args)); + this.sendMessage(message); + } catch (NoSuchAlgorithmException | InvalidKeyException e) { + throw new ExchangeException("Invalid API secret", e); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + protected String getChannelNameFromMessage(JsonNode message) throws IOException { + if (message.has("topic")) { + return message.get("topic").asText(); + } + return ""; + } + + @Override + public String getSubscribeMessage(String channelName, Object... args) throws IOException { + LOG.info("getSubscribeMessage {}", channelName); + return objectMapper.writeValueAsString( + new BybitSubscribeMessage("subscribe", Collections.singletonList(channelName))); + } + + @Override + public String getUnsubscribeMessage(String channelName, Object... args) throws IOException { + LOG.info("getUnsubscribeMessage {}", channelName); + return objectMapper.writeValueAsString( + new BybitSubscribeMessage("unsubscribe", Collections.singletonList(channelName))); + } + + @Override + public void messageHandler(String message) { + LOG.debug("Received message: {}", message); + JsonNode jsonNode; + try { + jsonNode = objectMapper.readTree(message); + } catch (IOException e) { + LOG.error("Error parsing incoming message to JSON: {}", message); + return; + } + String op = ""; + boolean success = false; + if (jsonNode.has("op")) { + op = jsonNode.get("op").asText(); + } + if (jsonNode.has("success")) { + success = jsonNode.get("success").asBoolean(); + } + if (success) { + switch (op) { + case "subscribe": + case "unsubscribe": { + break; + } + case "auth": { + isAuthorized = true; + resubscribeChannelsAfterLogin(); + break; + } + } + return; + } else { + switch (op) { + // different op result of public channels and private channels + // https://bybit-exchange.github.io/docs/v5/ws/connect#how-to-send-the-heartbeat-packet + case "pong": { + LOG.debug("Received PONG message: {}", message); + return; + } + case "auth": { + LOG.error("Received AUTH message: {}", jsonNode.get("ret_msg")); + return; + } + } + } + handleMessage(jsonNode); + } + + @Override + protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() { + return WebSocketClientCompressionAllowClientNoContextHandler.INSTANCE; + } + + public void pingPongDisconnectIfConnected() { + if (pingPongSubscription != null && !pingPongSubscription.isDisposed()) { + pingPongSubscription.dispose(); + } + } + + /** + * Custom client handler in order to execute an external, user-provided handler on channel events. + */ + class BybitUserDataWebSocketClientHandler extends NettyWebSocketClientHandler { + + public BybitUserDataWebSocketClientHandler( + WebSocketClientHandshaker handshake, WebSocketMessageHandler handler) { + super(handshake, handler); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + super.channelInactive(ctx); + if (channelInactiveHandler != null) { + channelInactiveHandler.onMessage("WebSocket Client disconnected!"); + } + } + } + @Override + public void resubscribeChannels() { + // need to authorize first + } + + private void resubscribeChannelsAfterLogin() { + for (Entry entry : channels.entrySet()) { + try { + Subscription subscription = entry.getValue(); + sendMessage(getSubscribeMessage(subscription.getChannelName(), subscription.getArgs())); + } catch (IOException e) { + LOG.error("Failed to reconnect channel: {}", entry.getKey()); + } + } + } + +} + diff --git a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamOrderBookExample.java b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamOrderBookExample.java index d48181599f7..53d6159265c 100644 --- a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamOrderBookExample.java +++ b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamOrderBookExample.java @@ -7,13 +7,15 @@ import java.util.ArrayList; import java.util.List; import org.knowm.xchange.bybit.dto.BybitCategory; -import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.derivative.FuturesContract; import org.knowm.xchange.instrument.Instrument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BybitStreamOrderBookExample { + private static final Logger log = LoggerFactory.getLogger(BybitStreamOrderBookExample.class); + public static void main(String[] args) { // Stream orderBook and OrderBookUpdates try { @@ -23,36 +25,34 @@ public static void main(String[] args) { } } + static List booksDisposable = new ArrayList<>(); + static Instrument XRP_PERP = new FuturesContract("XRP/USDT/PERP"); + static StreamingExchange exchange; + private static void getOrderBookExample() throws InterruptedException { - Instrument BTC_SPOT = new CurrencyPair("BTC/USDT"); - StreamingExchange exchange = connect(BybitCategory.SPOT, false); - List booksDisposable = new ArrayList<>(); - List booksUpdatesDisposable = new ArrayList<>(); + exchange = connect(BybitCategory.LINEAR, false); + subscribeOrderBook(); + Thread.sleep(600000L); + for (Disposable dis : booksDisposable) { + dis.dispose(); + } + exchange.disconnect().blockingAwait(); + } + private static void subscribeOrderBook() { booksDisposable.add( - exchange.getStreamingMarketDataService().getOrderBook(BTC_SPOT) - .doOnError( - error -> { - log.error(error.getMessage()); - }) - .subscribe( - orderBook -> log.info("orderBook: {}", orderBook.getTimeStamp()))); - - booksUpdatesDisposable.add( - exchange - .getStreamingMarketDataService() - .getOrderBookUpdates(BTC_SPOT) + exchange.getStreamingMarketDataService().getOrderBook(XRP_PERP) .doOnError( error -> { log.error(error.getMessage()); + for (Disposable dis : booksDisposable) { + dis.dispose(); + } + subscribeOrderBook(); }) .subscribe( - orderBookUpdates -> log.info("orderBookUpdates: {}", orderBookUpdates))); - Thread.sleep(4000L); - for (Disposable dis:booksDisposable) - dis.dispose(); - for (Disposable dis:booksUpdatesDisposable) - dis.dispose(); - exchange.disconnect().blockingAwait(); + orderBook -> System.out.print("."), throwable -> { + log.error(throwable.getMessage()); + })); } } diff --git a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamPositionChangeExample.java b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamPositionChangeExample.java index 288e9b4b80f..edd7a2646de 100644 --- a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamPositionChangeExample.java +++ b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamPositionChangeExample.java @@ -42,7 +42,7 @@ private static void positionChangeExample() throws IOException { StreamingExchange exchange = connect(BybitCategory.LINEAR, true); ticker = (exchange.getMarketDataService().getTicker(ETH_PERP)); amount = exchange.getExchangeMetaData().getInstruments().get(ETH_PERP).getMinimumAmount(); - //minimal trade size - 5 USDT + // minimal trade size - 5 USDT if (amount.multiply(ticker.getBid()).compareTo(new BigDecimal("5.0")) <= 0) { amount = new BigDecimal("5") @@ -82,9 +82,9 @@ private static void positionChangeExample() throws IOException { throw new RuntimeException(e); } positionChangesDisposable.dispose(); + tradesDisposable.dispose(); + exchange.disconnect().blockingAwait(); } } - - diff --git a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamTestNetExample.java b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamTestNetExample.java index ecb4fa1b718..9ac7e03da4b 100644 --- a/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamTestNetExample.java +++ b/xchange-stream-bybit/src/test/java/info/bitrich/xchangestream/bybit/example/BybitStreamTestNetExample.java @@ -33,9 +33,7 @@ public class BybitStreamTestNetExample { // Uses TEST_NET public static void main(String[] args) { try { - - auth(); - + withAuth(); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } @@ -47,7 +45,7 @@ public static void main(String[] args) { private static final Instrument ETH_SPOT = new CurrencyPair("ETH/USDT"); - private static void auth() throws IOException, InterruptedException { + private static void withAuth() throws IOException, InterruptedException { ExchangeSpecification exchangeSpecification = new BybitStreamingExchange().getDefaultExchangeSpecification(); exchangeSpecification.setApiKey(System.getProperty("test_api_key")); @@ -77,7 +75,7 @@ private static void auth() throws IOException, InterruptedException { Thread.sleep(2000L); AtomicReference order = new AtomicReference<>(); Disposable disposableOrderChanges = - ((BybitStreamingTradeService) exchange.getStreamingTradeService()) + exchange.getStreamingTradeService() .getOrderChanges(null,BybitCategory.LINEAR) .doOnError(error -> log.error("OrderChanges error", error)) .subscribe( @@ -96,7 +94,7 @@ private static void auth() throws IOException, InterruptedException { throwable -> log.error("ComplexPosition throwable,{}", throwable.getMessage())); Thread.sleep(3000L); exchange.getTradeService().placeLimitOrder(limitOrder); - Thread.sleep(30000L); + Thread.sleep(3000L); disposableOrderChanges.dispose(); disposableComplexPositionChanges.dispose(); exchange.disconnect().blockingAwait(); diff --git a/xchange-stream-bybit/src/test/resources/logback.xml b/xchange-stream-bybit/src/test/resources/logback.xml index 211265d002a..b047dba6867 100644 --- a/xchange-stream-bybit/src/test/resources/logback.xml +++ b/xchange-stream-bybit/src/test/resources/logback.xml @@ -20,7 +20,7 @@ - + - + \ No newline at end of file diff --git a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingExchange.java b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingExchange.java index 19c3d2c235e..e42f518d251 100644 --- a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingExchange.java +++ b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingExchange.java @@ -4,8 +4,10 @@ import info.bitrich.xchangestream.core.StreamingExchange; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.core.StreamingTradeService; +import info.bitrich.xchangestream.service.netty.ConnectionStateModel; import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Observable; import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException; import org.knowm.xchange.okex.OkexExchange; @@ -102,4 +104,24 @@ public void setChannelInactiveHandler( WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) { streamingService.setChannelInactiveHandler(channelInactiveHandler); } + + @Override + public Observable reconnectFailure() { + return streamingService.subscribeReconnectFailure(); + } + + @Override + public Observable connectionStateObservable() { + return streamingService.subscribeConnectionState(); + } + + @Override + public void resubscribeChannels() { + streamingService.resubscribeChannels(); + } + + @Override + public Observable connectionIdle() { + return streamingService.subscribeIdle(); + } } diff --git a/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java b/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java index e4c22fe6692..b2f2cd9c0eb 100644 --- a/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java +++ b/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java @@ -555,6 +555,7 @@ protected NettyWebSocketClientHandler( @Override public void channelInactive(ChannelHandlerContext ctx) { + connectionStateModel.setState(State.CLOSED); if (isManualDisconnect.compareAndSet(true, false)) { // Don't attempt to reconnect } else {