diff --git a/HISTORY.md b/HISTORY.md index d63c859..cd8fdfb 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,6 @@ +## v3.2.0 +* Added `ConnectResult` return type from `CoinbaseProWebSocket.ConnectAsync()` for better semantic connection handling. + ## v3.0.5 * Fixed `Withdrawals.GetWithdrawal()`. Previously used wrong URL path. diff --git a/README.md b/README.md index c803e55..a09ba04 100644 --- a/README.md +++ b/README.md @@ -197,8 +197,9 @@ Be sure to [check the documentation here](https://docs.pro.coinbase.com/?r=1#sub var socket = ...; //using authenticated or unauthenticated instance //Connect the websocket, -//when this connect method completes, the socket is ready -await socket.ConnectAsync(); +//when this connect method completes, the socket is ready or failure occured. +var result = await socket.ConnectAsync(); +if( !result.Success ) throw new Exception("Failed to connect."); //add an event handler for the message received event on the raw socket socket.RawSocket.MessageReceived += RawSocket_MessageReceived; diff --git a/Source/Coinbase.Pro/WebSockets/CoinbaseProWebSocket.cs b/Source/Coinbase.Pro/WebSockets/CoinbaseProWebSocket.cs index d1b963b..e62b384 100644 --- a/Source/Coinbase.Pro/WebSockets/CoinbaseProWebSocket.cs +++ b/Source/Coinbase.Pro/WebSockets/CoinbaseProWebSocket.cs @@ -25,6 +25,20 @@ public void EnsureValid() } } + public class ConnectResult + { + public ConnectResult(bool success, object sender, EventArgs eventArgs) + { + this.Success = success; + this.Sender = sender; + this.EventArgs = eventArgs; + } + + public bool Success { get; } + public object Sender { get; } + public EventArgs EventArgs { get; } + } + public class CoinbaseProWebSocket : IDisposable { public const string Endpoint = "wss://ws-feed.pro.coinbase.com"; @@ -38,7 +52,7 @@ public CoinbaseProWebSocket(WebSocketConfig config = null) public WebSocketConfig Config { get; } - protected TaskCompletionSource connectingTcs; + protected TaskCompletionSource connectingTcs; protected IProxyConnector Proxy { get; set; } @@ -46,14 +60,14 @@ public CoinbaseProWebSocket(WebSocketConfig config = null) /// Connect the websocket to Coinbase Pro. /// /// - public Task ConnectAsync() + public Task ConnectAsync() { if( this.RawSocket != null ) throw new InvalidOperationException( $"The {nameof(RawSocket)} is already created from a previous {nameof(ConnectAsync)} call. " + $"If you get this exception, you'll need to dispose of this {nameof(CoinbaseProWebSocket)} and create a new instance. " + $"Don't call {nameof(ConnectAsync)} multiple times on the same instance."); - this.connectingTcs = new TaskCompletionSource(); + this.connectingTcs = new TaskCompletionSource(); if( this.RawSocket is null ) { @@ -63,19 +77,33 @@ public Task ConnectAsync() } this.RawSocket.Opened += RawSocket_Opened; + this.RawSocket.Error += RawSocket_Error; this.RawSocket.Open(); return this.connectingTcs.Task; } + private void RawSocket_Error(object sender, ErrorEventArgs e) + { + TrySetConnectResult(false, sender, e); + } + private void RawSocket_Opened(object sender, EventArgs e) { + TrySetConnectResult(true, sender, e); + } + + protected void TrySetConnectResult(bool result, object sender, EventArgs args) + { + var connectResult = new ConnectResult(result, sender, args); + if( sender is WebSocket socket ) { socket.Opened -= RawSocket_Opened; + socket.Error -= RawSocket_Error; } - - Task.Run(() => this.connectingTcs.SetResult(true)); + + Task.Run(() => this.connectingTcs.TrySetResult(connectResult)); } public void EnableFiddlerDebugProxy(IProxyConnector proxy) diff --git a/Source/Coinbase.Tests/IntegrationTests/WebSocketTests.cs b/Source/Coinbase.Tests/IntegrationTests/WebSocketTests.cs index 53a6c15..ec855a0 100644 --- a/Source/Coinbase.Tests/IntegrationTests/WebSocketTests.cs +++ b/Source/Coinbase.Tests/IntegrationTests/WebSocketTests.cs @@ -29,7 +29,11 @@ public void BeforeEachTest() [Test] public async Task connect() { - await socket.ConnectAsync(); + var result = await socket.ConnectAsync(); + if( !result.Success ) + { + throw new Exception("Connection error."); + } //https://docs.pro.coinbase.com/?r=1#protocol-overview // Request @@ -80,7 +84,11 @@ private void RawSocket_MessageReceived(object sender, WebSocket4Net.MessageRecei [Test] public async Task connect_simple() { - await socket.ConnectAsync(); + var result = await socket.ConnectAsync(); + if( !result.Success ) + { + throw new Exception("Connect error."); + } var sub = new Subscription { diff --git a/Source/Examples/Program.cs b/Source/Examples/Program.cs index c14ba68..16ae795 100644 --- a/Source/Examples/Program.cs +++ b/Source/Examples/Program.cs @@ -83,7 +83,9 @@ public static async Task SubscribeToWebsocketEvents(Credentials creds) //socket.EnableFiddlerDebugProxy(new HttpConnectProxy(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8888))); #endif - await socket.ConnectAsync(); + var result = await socket.ConnectAsync(); + if( !result.Success ) + throw new Exception("Connect error."); WriteLine(">> Connected."); diff --git a/Source/Examples/ResilientWebSocket.cs b/Source/Examples/ResilientWebSocket.cs new file mode 100644 index 0000000..27899f5 --- /dev/null +++ b/Source/Examples/ResilientWebSocket.cs @@ -0,0 +1,167 @@ +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Coinbase.Pro.Models; +using Coinbase.Pro.WebSockets; +using SuperSocket.ClientEngine; +using SuperSocket.ClientEngine.Proxy; +using WebSocket4Net; +using static System.Console; + +namespace Examples +{ + public class ResilientWebSocket + { + private readonly Credentials credentials; + private CoinbaseProWebSocket coinbase; + private Subscription subscription; + private SemaphoreSlim locker = new SemaphoreSlim(1, 1); + private CancellationTokenSource cts; + + public ResilientWebSocket(Credentials credentials) + { + this.credentials = credentials; + } + + public Task Start(Subscription subscription) + { + this.subscription = subscription; + this.cts = new CancellationTokenSource(); + return Task.Run(() => SafeReconnect()); + } + + public async Task Stop() + { + this.cts?.Cancel(); + + WriteLine("Waiting 80 sec for shutdown..."); + this.locker.Wait(80_000); + + WriteLine("Shutdown complete."); + WriteLine("!! Websocket is closed! ResilientWebSocket stopped."); + } + + private async Task Reconnect(Credentials creds, Subscription subscription) + { + if (this.cts.IsCancellationRequested) return; + + this.coinbase = new CoinbaseProWebSocket(new WebSocketConfig + { + ApiKey = creds.ApiKey, + Secret = creds.ApiSecret, + Passphrase = creds.ApiPassphrase, + SocketUri = "wss://ws-feed-public.sandbox.pro.coinbase.com" + }); + + WriteLine(">> Connecting websocket..."); + + //Uncomment depending on your TFM if you want to debug the websocket + //connection to Coinbase Pro with Fiddler +#if !NETFRAMEWORK + coinbase.EnableFiddlerDebugProxy(new HttpConnectProxy(IPEndPoint.Parse("127.0.0.1:8888"))); +#else + coinbase.EnableFiddlerDebugProxy(new HttpConnectProxy(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8888))); +#endif + + var result = await coinbase.ConnectAsync(); + if( !result.Success ) + { + var ex = new Exception("Connect failed.") + { + Data = {{"ConnectResult", result}} + }; + throw ex; + } + + WriteLine(">> Connected."); + + coinbase.RawSocket.Closed += Websocket_Closed; + coinbase.RawSocket.Error += Websocket_Error; + coinbase.RawSocket.MessageReceived += Websocket_MessageReceived; + + WriteLine(">> Subscribing to events..."); + var sub = new Subscription + { + Channels = subscription.Channels, + ProductIds = subscription.ProductIds + }; + await coinbase.SubscribeAsync(sub); + + WriteLine(">> Subscribed."); + } + + private void Websocket_MessageReceived(object sender, MessageReceivedEventArgs e) + { + if (this.cts.IsCancellationRequested) return; + + WriteLine("Message received."); + if (WebSocketHelper.TryParse(e.Message, out var msg)) + { + if (msg is HeartbeatEvent hb) + { + WriteLine($"Sequence: {hb.Sequence}, Last Trade Id: {hb.LastTradeId}"); + } + } + } + + private void Websocket_Error(object sender, ErrorEventArgs e) + { + if (this.cts.IsCancellationRequested) return; + + WriteLine("!! Websocket Error! "); + WriteLine(e); + } + + private void Websocket_Closed(object sender, EventArgs e) + { + if (this.cts.IsCancellationRequested) return; + + WriteLine("!! The websocket closed!"); + WriteLine("!! Reconnecting..."); + Task.Run(() => SafeReconnect()); + } + + private async Task SafeReconnect() + { + if( this.cts.IsCancellationRequested ) return; + + if (!locker.Wait(0)) return; //any threads that can't acquire the lock, go away + + while ( !this.cts.IsCancellationRequested ) + { + try + { + SafeShutdown(); + await Reconnect(this.credentials, this.subscription); + break; + } + catch(Exception e) + { + WriteLine(e); + } + } + + locker.Release(); + } + + private void SafeShutdown() + { + if (this.coinbase?.RawSocket is null) return; + if (this.subscription is null) return; + + this.coinbase.RawSocket.Closed -= Websocket_Closed; + this.coinbase.RawSocket.Error -= Websocket_Error; + this.coinbase.RawSocket.MessageReceived -= Websocket_MessageReceived; + + if (this.coinbase.RawSocket.State == WebSocketState.Open) + { + this.coinbase.Unsubscribe(this.subscription); + WriteLine("!! Closing Websocket..."); + this.coinbase.RawSocket.Close(); + } + + this.coinbase.Dispose(); + } + } +}