diff --git a/pom.xml b/pom.xml
index eb939bce3ed..e1c642ec75b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,7 @@
xchange-stream-binance
xchange-stream-bitfinex
xchange-stream-bitflyer
+ xchange-stream-bitget
xchange-stream-bitmex
xchange-stream-bitstamp
xchange-stream-btcmarkets
diff --git a/xchange-stream-bitget/.gitignore b/xchange-stream-bitget/.gitignore
new file mode 100644
index 00000000000..b1bb6c087ff
--- /dev/null
+++ b/xchange-stream-bitget/.gitignore
@@ -0,0 +1,2 @@
+http-client.private.env.json
+integration-test.env.properties
\ No newline at end of file
diff --git a/xchange-stream-bitget/README.md b/xchange-stream-bitget/README.md
new file mode 100644
index 00000000000..f728ba8eda6
--- /dev/null
+++ b/xchange-stream-bitget/README.md
@@ -0,0 +1,21 @@
+## Using IntelliJ Idea HTTP client
+
+There are *.http files stored in `src/test/resources/rest` that can be used with IntelliJ Idea HTTP Client.
+
+Some requests need authorization, so the api credentials have to be stored in `http-client.private.env.json` in module's root. Sample content can be found in `example.http-client.private.env.json`
+
+> [!CAUTION]
+> Never commit your api credentials to the repository!
+
+
+[HTTP Client documentation](https://www.jetbrains.com/help/idea/http-client-in-product-code-editor.html)
+
+## Running integration tests that require API keys
+
+Integration tests that require API keys read them from environment variables. They can be defined in `integration-test.env.properties`. Sample content can be found in `example.integration-test.env.properties`.
+
+If no keys are provided the integration tests that need them are skipped.
+
+> [!CAUTION]
+> Never commit your api credentials to the repository!
+
diff --git a/xchange-stream-bitget/example.http-client.private.env.json b/xchange-stream-bitget/example.http-client.private.env.json
new file mode 100644
index 00000000000..9cfdcc6645a
--- /dev/null
+++ b/xchange-stream-bitget/example.http-client.private.env.json
@@ -0,0 +1,7 @@
+{
+ "default": {
+ "api_key": "replace_me",
+ "api_secret": "replace_me",
+ "api_passphrase": "replace_me"
+ }
+}
\ No newline at end of file
diff --git a/xchange-stream-bitget/example.integration-test.env.properties b/xchange-stream-bitget/example.integration-test.env.properties
new file mode 100644
index 00000000000..d8c62995f61
--- /dev/null
+++ b/xchange-stream-bitget/example.integration-test.env.properties
@@ -0,0 +1,4 @@
+apiKey=change_me
+secretKey=change_me
+passphrase=change_me
+
diff --git a/xchange-stream-bitget/http-client.env.json b/xchange-stream-bitget/http-client.env.json
new file mode 100644
index 00000000000..2b35d8616aa
--- /dev/null
+++ b/xchange-stream-bitget/http-client.env.json
@@ -0,0 +1,5 @@
+{
+ "default": {
+ "base_url": "wss://ws.bitget.com"
+ }
+}
\ No newline at end of file
diff --git a/xchange-stream-bitget/lombok.config b/xchange-stream-bitget/lombok.config
new file mode 100644
index 00000000000..9bb2e34e6fc
--- /dev/null
+++ b/xchange-stream-bitget/lombok.config
@@ -0,0 +1,2 @@
+lombok.equalsAndHashCode.callSuper = call
+lombok.tostring.callsuper = call
diff --git a/xchange-stream-bitget/pom.xml b/xchange-stream-bitget/pom.xml
new file mode 100644
index 00000000000..eab8637b292
--- /dev/null
+++ b/xchange-stream-bitget/pom.xml
@@ -0,0 +1,57 @@
+
+
+ 4.0.0
+
+ org.knowm.xchange
+ xchange-parent
+ 5.2.1-SNAPSHOT
+
+ xchange-stream-bitget
+
+ XChange Bitget Stream
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+
+ org.knowm.xchange
+ xchange-bitget
+ ${project.parent.version}
+
+
+
+ org.knowm.xchange
+ xchange-stream-core
+ ${project.parent.version}
+
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+
+ integration-test.env.properties
+
+
+
+
+
+
+
+
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetPrivateStreamingService.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetPrivateStreamingService.java
new file mode 100644
index 00000000000..afd7bdc08ba
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetPrivateStreamingService.java
@@ -0,0 +1,77 @@
+package info.bitrich.xchangestream.bitget;
+
+import info.bitrich.xchangestream.bitget.config.Config;
+import info.bitrich.xchangestream.bitget.dto.common.Operation;
+import info.bitrich.xchangestream.bitget.dto.request.BitgetLoginRequest;
+import info.bitrich.xchangestream.bitget.dto.request.BitgetLoginRequest.LoginPayload;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetEventNotification;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetEventNotification.Event;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsNotification;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Map.Entry;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class BitgetPrivateStreamingService extends BitgetStreamingService {
+
+ private final String apiKey;
+ private final String apiSecret;
+ private final String apiPassword;
+
+ public BitgetPrivateStreamingService(
+ String apiUri, String apiKey, String apiSecret, String apiPassword) {
+ super(apiUri);
+ this.apiKey = apiKey;
+ this.apiSecret = apiSecret;
+ this.apiPassword = apiPassword;
+ }
+
+ /** Sends login message right after connecting */
+ @Override
+ public void resubscribeChannels() {
+ sendLoginMessage();
+ }
+
+ public void resubscribeChannelsAfterLogin() {
+ for (Entry entry : channels.entrySet()) {
+ try {
+ Subscription subscription = entry.getValue();
+ sendMessage(getSubscribeMessage(subscription.getChannelName(), subscription.getArgs()));
+ } catch (IOException e) {
+ log.error("Failed to reconnect channel: {}", entry.getKey());
+ }
+ }
+ }
+
+ @SneakyThrows
+ private void sendLoginMessage() {
+ Instant timestamp = Instant.now(Config.getInstance().getClock());
+ BitgetLoginRequest bitgetLoginRequest =
+ BitgetLoginRequest.builder()
+ .operation(Operation.LOGIN)
+ .payload(
+ LoginPayload.builder()
+ .apiKey(apiKey)
+ .passphrase(apiPassword)
+ .timestamp(timestamp)
+ .signature(BitgetStreamingAuthHelper.sign(timestamp, apiSecret))
+ .build())
+ .build();
+ sendMessage(objectMapper.writeValueAsString(bitgetLoginRequest));
+ }
+
+ @Override
+ protected void handleMessage(BitgetWsNotification message) {
+ // subscribe to channels after sucessful login confirmation
+ if (message instanceof BitgetEventNotification) {
+ BitgetEventNotification eventNotification = (BitgetEventNotification) message;
+ if (eventNotification.getEvent() == Event.LOGIN && eventNotification.getCode() == 0) {
+ resubscribeChannelsAfterLogin();
+ return;
+ }
+ }
+ super.handleMessage(message);
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingAdapters.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingAdapters.java
new file mode 100644
index 00000000000..da53b942e50
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingAdapters.java
@@ -0,0 +1,137 @@
+package info.bitrich.xchangestream.bitget;
+
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel;
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel.ChannelType;
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel.MarketType;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetTickerNotification;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetTickerNotification.TickerData;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsOrderBookSnapshotNotification;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsOrderBookSnapshotNotification.OrderBookData;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsUserTradeNotification;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsUserTradeNotification.BitgetFillData;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsUserTradeNotification.FeeDetail;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.experimental.UtilityClass;
+import org.apache.commons.lang3.ArrayUtils;
+import org.knowm.xchange.bitget.BitgetAdapters;
+import org.knowm.xchange.currency.CurrencyPair;
+import org.knowm.xchange.dto.Order.OrderType;
+import org.knowm.xchange.dto.marketdata.OrderBook;
+import org.knowm.xchange.dto.marketdata.Ticker;
+import org.knowm.xchange.dto.trade.LimitOrder;
+import org.knowm.xchange.dto.trade.UserTrade;
+import org.knowm.xchange.instrument.Instrument;
+
+@UtilityClass
+public class BitgetStreamingAdapters {
+
+ public Ticker toTicker(BitgetTickerNotification notification) {
+ TickerData bitgetTickerDto = notification.getPayloadItems().get(0);
+
+ CurrencyPair currencyPair = BitgetAdapters.toCurrencyPair(bitgetTickerDto.getInstrument());
+ if (currencyPair == null) {
+ return null;
+ }
+
+ return new Ticker.Builder()
+ .instrument(currencyPair)
+ .open(bitgetTickerDto.getOpen24h())
+ .last(bitgetTickerDto.getLastPrice())
+ .bid(bitgetTickerDto.getBestBidPrice())
+ .ask(bitgetTickerDto.getBestAskPrice())
+ .high(bitgetTickerDto.getHigh24h())
+ .low(bitgetTickerDto.getLow24h())
+ .volume(bitgetTickerDto.getAssetVolume24h())
+ .quoteVolume(bitgetTickerDto.getQuoteVolume24h())
+ .timestamp(BitgetAdapters.toDate(bitgetTickerDto.getTimestamp()))
+ .bidSize(bitgetTickerDto.getBestBidSize())
+ .askSize(bitgetTickerDto.getBestAskSize())
+ .percentageChange(bitgetTickerDto.getChange24h())
+ .build();
+ }
+
+ /** Returns unique subscription id. Can be used as key for subscriptions caching */
+ public String toSubscriptionId(BitgetChannel bitgetChannel) {
+ return Stream.of(
+ bitgetChannel.getMarketType(),
+ bitgetChannel.getChannelType(),
+ bitgetChannel.getInstrumentId())
+ .map(String::valueOf)
+ .collect(Collectors.joining("_"));
+ }
+
+ /**
+ * Creates {@code BitgetChannel} from arguments
+ *
+ * @param args [{@code ChannelType}, {@code MarketType}, {@code Instrument}/{@code null}]
+ */
+ public BitgetChannel toBitgetChannel(Object... args) {
+ ChannelType channelType = (ChannelType) ArrayUtils.get(args, 0);
+ MarketType marketType = (MarketType) ArrayUtils.get(args, 1);
+ Instrument instrument = (Instrument) ArrayUtils.get(args, 2);
+
+ return BitgetChannel.builder()
+ .channelType(channelType)
+ .marketType(marketType)
+ .instrumentId(
+ Optional.ofNullable(instrument).map(BitgetAdapters::toString).orElse("default"))
+ .build();
+ }
+
+ public OrderBook toOrderBook(
+ BitgetWsOrderBookSnapshotNotification notification, Instrument instrument) {
+ OrderBookData orderBookData = notification.getPayloadItems().get(0);
+ List asks =
+ orderBookData.getAsks().stream()
+ .map(
+ priceSizeEntry ->
+ new LimitOrder(
+ OrderType.ASK,
+ priceSizeEntry.getSize(),
+ instrument,
+ null,
+ null,
+ priceSizeEntry.getPrice()))
+ .collect(Collectors.toList());
+
+ List bids =
+ orderBookData.getBids().stream()
+ .map(
+ priceSizeEntry ->
+ new LimitOrder(
+ OrderType.BID,
+ priceSizeEntry.getSize(),
+ instrument,
+ null,
+ null,
+ priceSizeEntry.getPrice()))
+ .collect(Collectors.toList());
+
+ return new OrderBook(BitgetAdapters.toDate(orderBookData.getTimestamp()), asks, bids);
+ }
+
+ public UserTrade toUserTrade(BitgetWsUserTradeNotification notification) {
+ BitgetFillData bitgetFillData = notification.getPayloadItems().get(0);
+ return new UserTrade(
+ bitgetFillData.getOrderSide(),
+ bitgetFillData.getAssetAmount(),
+ BitgetAdapters.toCurrencyPair(bitgetFillData.getSymbol()),
+ bitgetFillData.getPrice(),
+ BitgetAdapters.toDate(bitgetFillData.getUpdatedAt()),
+ bitgetFillData.getTradeId(),
+ bitgetFillData.getOrderId(),
+ bitgetFillData.getFeeDetails().stream()
+ .map(FeeDetail::getTotalFee)
+ .map(BigDecimal::abs)
+ .reduce(BigDecimal.ZERO, BigDecimal::add),
+ bitgetFillData.getFeeDetails().stream()
+ .map(FeeDetail::getCurrency)
+ .findFirst()
+ .orElse(null),
+ null);
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingAuthHelper.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingAuthHelper.java
new file mode 100644
index 00000000000..7fd0ceff829
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingAuthHelper.java
@@ -0,0 +1,42 @@
+package info.bitrich.xchangestream.bitget;
+
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import lombok.experimental.UtilityClass;
+import org.knowm.xchange.service.BaseParamsDigest;
+
+@UtilityClass
+public class BitgetStreamingAuthHelper {
+
+ /** Generates signature based on timestamp */
+ public String sign(Instant timestamp, String secretString) {
+ final SecretKey secretKey =
+ new SecretKeySpec(
+ secretString.getBytes(StandardCharsets.UTF_8), BaseParamsDigest.HMAC_SHA_256);
+ Mac mac = createMac(secretKey, secretKey.getAlgorithm());
+
+ String payloadToSign = String.format("%sGET/user/verify", timestamp.getEpochSecond());
+ mac.update(payloadToSign.getBytes(StandardCharsets.UTF_8));
+
+ return Base64.getEncoder().encodeToString(mac.doFinal());
+ }
+
+ private Mac createMac(SecretKey secretKey, String hmacString) {
+ try {
+ Mac mac = Mac.getInstance(hmacString);
+ mac.init(secretKey);
+ return mac;
+ } catch (InvalidKeyException e) {
+ throw new IllegalArgumentException("Invalid key for hmac initialization.", e);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(
+ "Illegal algorithm for post body digest. Check the implementation.");
+ }
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingExchange.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingExchange.java
new file mode 100644
index 00000000000..8d5d758d604
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingExchange.java
@@ -0,0 +1,64 @@
+package info.bitrich.xchangestream.bitget;
+
+import info.bitrich.xchangestream.bitget.config.Config;
+import info.bitrich.xchangestream.core.ProductSubscription;
+import info.bitrich.xchangestream.core.StreamingAccountService;
+import info.bitrich.xchangestream.core.StreamingExchange;
+import info.bitrich.xchangestream.core.StreamingMarketDataService;
+import info.bitrich.xchangestream.core.StreamingTradeService;
+import io.reactivex.rxjava3.core.Completable;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.knowm.xchange.bitget.BitgetExchange;
+
+@Getter
+public class BitgetStreamingExchange extends BitgetExchange implements StreamingExchange {
+
+ private BitgetStreamingService publicStreamingService;
+ private BitgetPrivateStreamingService privateStreamingService;
+ private StreamingMarketDataService streamingMarketDataService;
+ private StreamingTradeService streamingTradeService;
+ private StreamingAccountService streamingAccountService;
+
+ @Override
+ public Completable connect(ProductSubscription... args) {
+ publicStreamingService = new BitgetStreamingService(Config.V2_PUBLIC_WS_URL);
+ if (StringUtils.isNoneBlank(
+ exchangeSpecification.getApiKey(),
+ exchangeSpecification.getSecretKey(),
+ exchangeSpecification.getPassword())) {
+ privateStreamingService =
+ new BitgetPrivateStreamingService(
+ Config.V2_PRIVATE_WS_URL,
+ exchangeSpecification.getApiKey(),
+ exchangeSpecification.getSecretKey(),
+ exchangeSpecification.getPassword());
+ streamingTradeService = new BitgetStreamingTradeService(privateStreamingService);
+ privateStreamingService.connect().blockingAwait();
+ }
+ applyStreamingSpecification(exchangeSpecification, publicStreamingService);
+ streamingMarketDataService = new BitgetStreamingMarketDataService(publicStreamingService);
+
+ return publicStreamingService.connect();
+ }
+
+ @Override
+ public Completable disconnect() {
+ BitgetStreamingService service = publicStreamingService;
+ publicStreamingService = null;
+ streamingMarketDataService = null;
+ streamingTradeService = null;
+ streamingAccountService = null;
+ return service.disconnect();
+ }
+
+ @Override
+ public boolean isAlive() {
+ return publicStreamingService != null && publicStreamingService.isSocketOpen();
+ }
+
+ @Override
+ public void useCompressedMessages(boolean compressedMessages) {
+ publicStreamingService.useCompressedMessages(compressedMessages);
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingMarketDataService.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingMarketDataService.java
new file mode 100644
index 00000000000..951b4df98ad
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingMarketDataService.java
@@ -0,0 +1,55 @@
+package info.bitrich.xchangestream.bitget;
+
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel.ChannelType;
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel.MarketType;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetTickerNotification;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsOrderBookSnapshotNotification;
+import info.bitrich.xchangestream.core.StreamingMarketDataService;
+import io.reactivex.rxjava3.core.Observable;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.Validate;
+import org.knowm.xchange.currency.CurrencyPair;
+import org.knowm.xchange.dto.marketdata.OrderBook;
+import org.knowm.xchange.dto.marketdata.Ticker;
+
+public class BitgetStreamingMarketDataService implements StreamingMarketDataService {
+
+ private final BitgetStreamingService service;
+
+ public BitgetStreamingMarketDataService(BitgetStreamingService service) {
+ this.service = service;
+ }
+
+ /**
+ * @param currencyPair Currency pair of the order book
+ * @param args Order book level: {@link Integer} 1, 5 or 15
+ */
+ @Override
+ public Observable getOrderBook(CurrencyPair currencyPair, Object... args) {
+ Integer orderBookLevel = (Integer) ArrayUtils.get(args, 0, null);
+ Validate.notNull(orderBookLevel, "Not implemented");
+ Validate.inclusiveBetween(1, 15, orderBookLevel, "Not implemented");
+
+ ChannelType channelType;
+ if (orderBookLevel == 1) {
+ channelType = ChannelType.DEPTH1;
+ } else if (orderBookLevel <= 5) {
+ channelType = ChannelType.DEPTH5;
+ } else {
+ channelType = ChannelType.DEPTH15;
+ }
+
+ return service
+ .subscribeChannel(null, channelType, MarketType.SPOT, currencyPair)
+ .map(BitgetWsOrderBookSnapshotNotification.class::cast)
+ .map(notification -> BitgetStreamingAdapters.toOrderBook(notification, currencyPair));
+ }
+
+ @Override
+ public Observable getTicker(CurrencyPair currencyPair, Object... args) {
+ return service
+ .subscribeChannel(null, ChannelType.TICKER, MarketType.SPOT, currencyPair)
+ .map(BitgetTickerNotification.class::cast)
+ .map(BitgetStreamingAdapters::toTicker);
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingService.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingService.java
new file mode 100644
index 00000000000..a2e2bb5e251
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingService.java
@@ -0,0 +1,127 @@
+package info.bitrich.xchangestream.bitget;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import info.bitrich.xchangestream.bitget.config.Config;
+import info.bitrich.xchangestream.bitget.dto.common.Action;
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel;
+import info.bitrich.xchangestream.bitget.dto.common.Operation;
+import info.bitrich.xchangestream.bitget.dto.request.BitgetWsRequest;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetEventNotification;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsNotification;
+import info.bitrich.xchangestream.service.netty.NettyStreamingService;
+import java.io.IOException;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class BitgetStreamingService extends NettyStreamingService {
+
+ protected final ObjectMapper objectMapper = Config.getInstance().getObjectMapper();
+
+ public BitgetStreamingService(String apiUri) {
+ super(apiUri, Integer.MAX_VALUE);
+ }
+
+ @Override
+ protected String getChannelNameFromMessage(BitgetWsNotification message) {
+ return BitgetStreamingAdapters.toSubscriptionId(message.getChannel());
+ }
+
+ /**
+ * @param channelName ignored
+ * @param args [{@code ChannelType}, {@code MarketType}, {@code Instrument}/{@code null}]
+ * @return message to be sent for subscribing
+ * @see BitgetStreamingAdapters#toSubscriptionId
+ */
+ @Override
+ public String getSubscribeMessage(String channelName, Object... args) throws IOException {
+ BitgetChannel bitgetChannel = BitgetStreamingAdapters.toBitgetChannel(args);
+
+ BitgetWsRequest request =
+ BitgetWsRequest.builder().operation(Operation.SUBSCRIBE).channel(bitgetChannel).build();
+ return objectMapper.writeValueAsString(request);
+ }
+
+ /**
+ * @param channelName ignored
+ * @param args [{@code ChannelType}, {@code MarketType}, {@code Instrument}/{@code null}]
+ * @return message to be sent for unsubscribing
+ * @see BitgetStreamingAdapters#toSubscriptionId
+ */
+ @Override
+ public String getUnsubscribeMessage(String channelName, Object... args) throws IOException {
+ BitgetChannel bitgetChannel = BitgetStreamingAdapters.toBitgetChannel(args);
+
+ BitgetWsRequest request =
+ BitgetWsRequest.builder().operation(Operation.UNSUBSCRIBE).channel(bitgetChannel).build();
+ return objectMapper.writeValueAsString(request);
+ }
+
+ @Override
+ protected void handleMessage(BitgetWsNotification message) {
+ log.debug("Processing {}", message.toString());
+ // no special processing of event messages
+ if (message instanceof BitgetEventNotification) {
+ return;
+ }
+ super.handleMessage(message);
+ }
+
+ @Override
+ protected void handleChannelMessage(String channel, BitgetWsNotification message) {
+ if (message.getAction() == null || message.getAction() != Action.SNAPSHOT) {
+ return;
+ }
+ super.handleChannelMessage(channel, message);
+ }
+
+ /**
+ * @param channelName name of channel
+ * @param args array with [{@code MarketType}, {@code Instrument}, ...]
+ * @return subscription id in form of "marketType_channelName_instrument1_instrumentX"
+ */
+ @Override
+ public String getSubscriptionUniqueId(String channelName, Object... args) {
+ BitgetChannel bitgetChannel = BitgetStreamingAdapters.toBitgetChannel(args);
+
+ return BitgetStreamingAdapters.toSubscriptionId(bitgetChannel);
+ }
+
+ @Override
+ public void messageHandler(String message) {
+ log.debug("Received message: {}", message);
+ BitgetWsNotification bitgetWsNotification;
+
+ // Parse incoming message to JSON
+ try {
+ JsonNode jsonNode = objectMapper.readTree(message);
+
+ // try to parse event
+ if (jsonNode.has("event")) {
+ ((ObjectNode) jsonNode).put("messageType", "event");
+ }
+ // copy nested value of arg.channel to the root of json to detect deserialization type
+ else if (jsonNode.has("arg") && jsonNode.get("arg").has("channel")) {
+ ((ObjectNode) jsonNode).put("messageType", jsonNode.get("arg").get("channel").asText());
+ }
+
+ bitgetWsNotification = objectMapper.treeToValue(jsonNode, BitgetWsNotification.class);
+
+ } catch (IOException e) {
+ log.error("Error parsing incoming message to JSON: {}", message);
+ log.error(e.getMessage(), e);
+ return;
+ }
+
+ // if payload has several items process each item as a separate notification
+ if (bitgetWsNotification.getPayloadItems() != null
+ && bitgetWsNotification.getPayloadItems().size() > 1) {
+ for (Object payloadItem : bitgetWsNotification.getPayloadItems()) {
+ handleMessage(bitgetWsNotification.toBuilder().payloadItem(payloadItem).build());
+ }
+ } else {
+ handleMessage(bitgetWsNotification);
+ }
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingTradeService.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingTradeService.java
new file mode 100644
index 00000000000..c5ef2b6fff5
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/BitgetStreamingTradeService.java
@@ -0,0 +1,29 @@
+package info.bitrich.xchangestream.bitget;
+
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel.ChannelType;
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel.MarketType;
+import info.bitrich.xchangestream.bitget.dto.response.BitgetWsUserTradeNotification;
+import info.bitrich.xchangestream.core.StreamingTradeService;
+import io.reactivex.rxjava3.core.Observable;
+import lombok.AllArgsConstructor;
+import org.knowm.xchange.currency.CurrencyPair;
+import org.knowm.xchange.dto.trade.UserTrade;
+
+@AllArgsConstructor
+public class BitgetStreamingTradeService implements StreamingTradeService {
+
+ private final BitgetStreamingService service;
+
+ @Override
+ public Observable getUserTrades(CurrencyPair currencyPair, Object... args) {
+ return service
+ .subscribeChannel(null, ChannelType.FILL, MarketType.SPOT, currencyPair)
+ .map(BitgetWsUserTradeNotification.class::cast)
+ .map(BitgetStreamingAdapters::toUserTrade);
+ }
+
+ @Override
+ public Observable getUserTrades() {
+ return getUserTrades(null);
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/config/Config.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/config/Config.java
new file mode 100644
index 00000000000..49d11e5e859
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/config/Config.java
@@ -0,0 +1,44 @@
+package info.bitrich.xchangestream.bitget.config;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import java.time.Clock;
+import lombok.Data;
+
+@Data
+public final class Config {
+
+ public static final String V2_PUBLIC_WS_URL = "wss://ws.bitget.com/v2/ws/public";
+ public static final String V2_PRIVATE_WS_URL = "wss://ws.bitget.com/v2/ws/private";
+
+ private ObjectMapper objectMapper;
+ private Clock clock;
+
+ private static Config instance = new Config();
+
+ private Config() {
+ clock = Clock.systemDefaultZone();
+
+ objectMapper = new ObjectMapper();
+
+ // by default read and write timetamps as milliseconds
+ objectMapper.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
+ objectMapper.configure(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
+
+ // don't fail un unknown properties
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ // don't write nulls
+ objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ // enable parsing to Instant
+ objectMapper.registerModule(new JavaTimeModule());
+ }
+
+ public static Config getInstance() {
+ return instance;
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/config/converter/InstantToTimestampSecondsConverter.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/config/converter/InstantToTimestampSecondsConverter.java
new file mode 100644
index 00000000000..4cf7ebc8483
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/config/converter/InstantToTimestampSecondsConverter.java
@@ -0,0 +1,13 @@
+package info.bitrich.xchangestream.bitget.config.converter;
+
+import com.fasterxml.jackson.databind.util.StdConverter;
+import java.time.Instant;
+
+/** Converts {@code Instant} to timestamp in seconds */
+public class InstantToTimestampSecondsConverter extends StdConverter {
+
+ @Override
+ public Long convert(Instant value) {
+ return value.getEpochSecond();
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/Action.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/Action.java
new file mode 100644
index 00000000000..e28d50dc336
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/Action.java
@@ -0,0 +1,8 @@
+package info.bitrich.xchangestream.bitget.dto.common;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum Action {
+ @JsonProperty("snapshot")
+ SNAPSHOT
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/BitgetChannel.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/BitgetChannel.java
new file mode 100644
index 00000000000..5c5011d636a
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/BitgetChannel.java
@@ -0,0 +1,55 @@
+package info.bitrich.xchangestream.bitget.dto.common;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.jackson.Jacksonized;
+
+@Data
+@Builder
+@Jacksonized
+public class BitgetChannel {
+
+ @JsonProperty("instType")
+ private MarketType marketType;
+
+ @JsonProperty("channel")
+ private ChannelType channelType;
+
+ @JsonProperty("instId")
+ private String instrumentId;
+
+ @Getter
+ @AllArgsConstructor
+ public static enum MarketType {
+ SPOT("SPOT");
+
+ @JsonValue private final String value;
+
+ public String toString() {
+ return value;
+ }
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public static enum ChannelType {
+ TICKER("ticker"),
+
+ DEPTH("books"),
+ DEPTH1("books1"),
+ DEPTH5("books5"),
+ DEPTH15("books15"),
+
+ FILL("fill");
+
+ @JsonValue private final String value;
+
+ public String toString() {
+ return value;
+ }
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/Operation.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/Operation.java
new file mode 100644
index 00000000000..88520a389a7
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/common/Operation.java
@@ -0,0 +1,14 @@
+package info.bitrich.xchangestream.bitget.dto.common;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum Operation {
+ @JsonProperty("subscribe")
+ SUBSCRIBE,
+
+ @JsonProperty("unsubscribe")
+ UNSUBSCRIBE,
+
+ @JsonProperty("login")
+ LOGIN
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/request/BitgetLoginRequest.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/request/BitgetLoginRequest.java
new file mode 100644
index 00000000000..6f0a4a47327
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/request/BitgetLoginRequest.java
@@ -0,0 +1,44 @@
+package info.bitrich.xchangestream.bitget.dto.request;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import info.bitrich.xchangestream.bitget.config.converter.InstantToTimestampSecondsConverter;
+import info.bitrich.xchangestream.bitget.dto.common.Operation;
+import java.time.Instant;
+import java.util.List;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Singular;
+import lombok.extern.jackson.Jacksonized;
+
+@Data
+@Builder
+@Jacksonized
+public class BitgetLoginRequest {
+
+ @JsonProperty("op")
+ private Operation operation;
+
+ @Singular
+ @JsonProperty("args")
+ private List payloads;
+
+ @Data
+ @Builder
+ @Jacksonized
+ public static class LoginPayload {
+
+ @JsonProperty("apiKey")
+ private String apiKey;
+
+ @JsonProperty("passphrase")
+ private String passphrase;
+
+ @JsonProperty("timestamp")
+ @JsonSerialize(converter = InstantToTimestampSecondsConverter.class)
+ private Instant timestamp;
+
+ @JsonProperty("sign")
+ private String signature;
+ }
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/request/BitgetWsRequest.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/request/BitgetWsRequest.java
new file mode 100644
index 00000000000..d85eda60e8c
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/request/BitgetWsRequest.java
@@ -0,0 +1,23 @@
+package info.bitrich.xchangestream.bitget.dto.request;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import info.bitrich.xchangestream.bitget.dto.common.BitgetChannel;
+import info.bitrich.xchangestream.bitget.dto.common.Operation;
+import java.util.List;
+import lombok.Data;
+import lombok.Singular;
+import lombok.experimental.SuperBuilder;
+import lombok.extern.jackson.Jacksonized;
+
+@Data
+@SuperBuilder
+@Jacksonized
+public class BitgetWsRequest {
+
+ @JsonProperty("op")
+ private Operation operation;
+
+ @Singular
+ @JsonProperty("args")
+ private List channels;
+}
diff --git a/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/response/BitgetEventNotification.java b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/response/BitgetEventNotification.java
new file mode 100644
index 00000000000..ec8aabdb628
--- /dev/null
+++ b/xchange-stream-bitget/src/main/java/info/bitrich/xchangestream/bitget/dto/response/BitgetEventNotification.java
@@ -0,0 +1,35 @@
+package info.bitrich.xchangestream.bitget.dto.response;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+import lombok.extern.jackson.Jacksonized;
+
+@Data
+@SuperBuilder(toBuilder = true)
+@Jacksonized
+public class BitgetEventNotification extends BitgetWsNotification