From 838f0fa789dd816dbc718c30eb465a4113b6ac75 Mon Sep 17 00:00:00 2001 From: Lucas Teles Date: Fri, 22 Nov 2024 15:11:11 -0300 Subject: [PATCH] synchrous message handler --- .../PingMessageHandler.cs | 21 ++++-------- .../Backdash.Benchmarks.Ping/Program.cs | 5 +-- .../Cases/UdpClientBenchmark.cs | 12 +++---- .../Backdash.Benchmarks/Network/Message.cs | 22 +++---------- samples/SpaceWar.Lobby/Scenes/LobbyScene.cs | 6 ++-- .../SpaceWar.Lobby/Services/LobbyUdpClient.cs | 2 +- samples/SpaceWar.Shared/Logic/GameState.cs | 2 +- src/Backdash/Backends/Peer2PeerBackend.cs | 4 +-- src/Backdash/Network/Client/PeerClient.cs | 2 +- src/Backdash/Network/Client/PeerObserver.cs | 21 +++++++----- src/Backdash/Network/PeerConnectionFactory.cs | 4 +-- .../Network/Protocol/Comm/IMessageSender.cs | 1 - .../Network/Protocol/Comm/ProtocolInbox.cs | 25 ++++++-------- .../Network/Protocol/Comm/ProtocolOutbox.cs | 3 -- .../Network/ProtocolInputEventQueue.cs | 26 +++++++++++---- tests/Backdash.Tests/Backdash.Tests.csproj | 12 ++++--- .../Integration/Network/UdpPeerClientTests.cs | 33 ++++++++----------- .../TestUtils/Network/PeerEventObserver.cs | 11 +++---- 18 files changed, 99 insertions(+), 113 deletions(-) diff --git a/benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs b/benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs index ec827e58..83c0c0fe 100644 --- a/benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs +++ b/benchmarks/Backdash.Benchmarks.Ping/PingMessageHandler.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Net; using Backdash.Network.Client; @@ -8,29 +9,21 @@ sealed class PingMessageHandler(IPeerClient sender) : IPeerObserver public static long TotalProcessed => processedCount; static long processedCount; - public async ValueTask OnPeerMessage( - PingMessage message, + public void OnPeerMessage( + in PingMessage message, SocketAddress from, - int bytesReceived, - CancellationToken stoppingToken + int bytesReceived ) { - if (stoppingToken.IsCancellationRequested) - return; Interlocked.Increment(ref processedCount); + var reply = message switch { PingMessage.Ping => PingMessage.Pong, PingMessage.Pong => PingMessage.Ping, _ => throw new ArgumentOutOfRangeException(nameof(message), message, null), }; - try - { - await sender.SendTo(from, reply, null, stoppingToken); - } - catch (OperationCanceledException) - { - // skip - } + + Trace.Assert(sender.TrySendTo(from, reply)); } } diff --git a/benchmarks/Backdash.Benchmarks.Ping/Program.cs b/benchmarks/Backdash.Benchmarks.Ping/Program.cs index 136c22df..0d5158a4 100644 --- a/benchmarks/Backdash.Benchmarks.Ping/Program.cs +++ b/benchmarks/Backdash.Benchmarks.Ping/Program.cs @@ -4,7 +4,7 @@ using Backdash.Network.Client; using Backdash.Serialization; -var totalDuration = TimeSpan.FromSeconds(20); +var totalDuration = TimeSpan.FromSeconds(10); var snapshotInterval = TimeSpan.FromSeconds(0); var printSnapshots = false; @@ -28,7 +28,7 @@ measurer.Start(); IPEndPoint peer2Endpoint = new(IPAddress.Loopback, 9001); -_ = peer1.SendTo(peer2Endpoint.Serialize(), PingMessage.Ping).AsTask(); +peer1.TrySendTo(peer2Endpoint.Serialize(), PingMessage.Ping); Console.WriteLine("Press enter to stop."); SpinWait.SpinUntil(() => Console.KeyAvailable || stopToken.IsCancellationRequested); @@ -42,6 +42,7 @@ Console.Clear(); Console.WriteLine(measurer.Summary(printSnapshots)); + IPeerClient CreateClient(int port) { PeerObserverGroup observers = new(); diff --git a/benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs b/benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs index 81eaf61e..26fc6fdd 100644 --- a/benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs +++ b/benchmarks/Backdash.Benchmarks/Cases/UdpClientBenchmark.cs @@ -44,13 +44,13 @@ void OnProcessed(long count) IPEndPoint pongerEndpoint = new(IPAddress.Loopback, 9001); var pongerAddress = pongerEndpoint.Serialize(); - Task[] tasks = - [ + Trace.Assert(pinger.TrySendTo(pongerAddress, PingMessage.Ping)); + + await Task.WhenAll( pinger.Start(ct), - ponger.Start(ct), - pinger.SendTo(pongerAddress, PingMessage.Ping, null, ct).AsTask(), - ]; - await Task.WhenAll(tasks).ConfigureAwait(false); + ponger.Start(ct) + ).ConfigureAwait(false); + pingerHandler.OnProcessed -= OnProcessed; Trace.Assert(pingerHandler.BadMessages is 0, $"** Pinger: {pingerHandler.BadMessages} bad messages"); diff --git a/benchmarks/Backdash.Benchmarks/Network/Message.cs b/benchmarks/Backdash.Benchmarks/Network/Message.cs index bda11c52..a317b668 100644 --- a/benchmarks/Backdash.Benchmarks/Network/Message.cs +++ b/benchmarks/Backdash.Benchmarks/Network/Message.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Net; using Backdash.Network.Client; @@ -18,35 +19,22 @@ sealed class PingMessageHandler(string name, IPeerClient sender) : public long BadMessages => badMessages; public event Action OnProcessed = delegate { }; - public async ValueTask OnPeerMessage( - PingMessage message, - SocketAddress from, - int bytesReceived, - CancellationToken stoppingToken + public void OnPeerMessage(in PingMessage message, SocketAddress from, int bytesReceived ) { - if (stoppingToken.IsCancellationRequested) - return; - if (stoppingToken.IsCancellationRequested) - return; Interlocked.Increment(ref processedCount); + if (!Enum.IsDefined(message)) Interlocked.Increment(ref badMessages); + var reply = message switch { PingMessage.Ping => PingMessage.Pong, PingMessage.Pong => PingMessage.Ping, _ => throw new ArgumentOutOfRangeException(nameof(message), message, null), }; - try - { - await sender.SendTo(from, reply, null, stoppingToken); - } - catch (OperationCanceledException) - { - // skip - } + Trace.Assert(sender.TrySendTo(from, reply)); OnProcessed(processedCount); #if DEBUG Console.WriteLine( diff --git a/samples/SpaceWar.Lobby/Scenes/LobbyScene.cs b/samples/SpaceWar.Lobby/Scenes/LobbyScene.cs index affca172..71fec07d 100644 --- a/samples/SpaceWar.Lobby/Scenes/LobbyScene.cs +++ b/samples/SpaceWar.Lobby/Scenes/LobbyScene.cs @@ -364,7 +364,7 @@ void StartPlayerBattleScene() players.Add(player.PeerId == user.PeerId ? new LocalPlayer(playerNumber) : new RemotePlayer(playerNumber, - lobbyUdpClient.GetFallbackEndpoint(user, player))); + LobbyUdpClient.GetFallbackEndpoint(user, player))); } if (lobbyInfo.SpectatorMapping.SingleOrDefault(m => m.Host == user.PeerId) @@ -373,7 +373,7 @@ void StartPlayerBattleScene() var spectators = lobbyInfo.Spectators.Where(s => spectatorIds.Contains(s.PeerId)); foreach (var spectator in spectators) { - var spectatorEndpoint = lobbyUdpClient.GetFallbackEndpoint(user, spectator); + var spectatorEndpoint = LobbyUdpClient.GetFallbackEndpoint(user, spectator); players.Add(new Spectator(spectatorEndpoint)); } } @@ -388,7 +388,7 @@ void StartSpectatorBattleScene() ?.Host; var host = lobbyInfo.Players.Single(x => x.PeerId == hostId); var playerCount = lobbyInfo.Players.Length; - var hostEndpoint = lobbyUdpClient.GetFallbackEndpoint(user, host); + var hostEndpoint = LobbyUdpClient.GetFallbackEndpoint(user, host); Window.Title = $"Space War {Config.LocalPort} - {user.Username} watching {host.Username}"; diff --git a/samples/SpaceWar.Lobby/Services/LobbyUdpClient.cs b/samples/SpaceWar.Lobby/Services/LobbyUdpClient.cs index b5827160..51e3b710 100644 --- a/samples/SpaceWar.Lobby/Services/LobbyUdpClient.cs +++ b/samples/SpaceWar.Lobby/Services/LobbyUdpClient.cs @@ -79,7 +79,7 @@ async ValueTask Receive(CancellationToken stoppingToken) } // Use local IP when over same network - public IPEndPoint GetFallbackEndpoint(User user, Peer peer) + public static IPEndPoint GetFallbackEndpoint(User user, Peer peer) { if (Equals(peer.Endpoint.Address, user.IP) && peer.LocalEndpoint is not null) return peer.LocalEndpoint; diff --git a/samples/SpaceWar.Shared/Logic/GameState.cs b/samples/SpaceWar.Shared/Logic/GameState.cs index ffa34241..65b6f2df 100644 --- a/samples/SpaceWar.Shared/Logic/GameState.cs +++ b/samples/SpaceWar.Shared/Logic/GameState.cs @@ -44,7 +44,7 @@ public void Init(int numberOfPlayers) Missile: false ); - public GameInput ParseShipInputs(PlayerInputs inputs, in Ship ship) + public static GameInput ParseShipInputs(PlayerInputs inputs, in Ship ship) { if (!ship.Active) return new(); diff --git a/src/Backdash/Backends/Peer2PeerBackend.cs b/src/Backdash/Backends/Peer2PeerBackend.cs index 7e61603e..ac261c68 100644 --- a/src/Backdash/Backends/Peer2PeerBackend.cs +++ b/src/Backdash/Backends/Peer2PeerBackend.cs @@ -28,7 +28,7 @@ sealed class Peer2PeerBackend : IRollbackSession synchronizer; readonly ConnectionsState localConnections; readonly IBackgroundJobManager backgroundJobManager; - readonly ProtocolInputEventQueue peerInputEventQueue; + readonly IProtocolInputEventQueue peerInputEventQueue; readonly IProtocolInputEventPublisher> peerCombinedInputsEventPublisher; readonly PeerConnectionFactory peerConnectionFactory; readonly List>> spectators; @@ -72,7 +72,7 @@ BackendServices services Random = services.DeterministicRandom; syncNumber = services.Random.MagicNumber(); - peerInputEventQueue = new(); + peerInputEventQueue = new ProtocolInputEventQueue(); peerCombinedInputsEventPublisher = new ProtocolCombinedInputsEventPublisher(peerInputEventQueue); inputGroupSerializer = new ConfirmedInputsSerializer(inputSerializer); localConnections = new(Max.NumberOfPlayers); diff --git a/src/Backdash/Network/Client/PeerClient.cs b/src/Backdash/Network/Client/PeerClient.cs index 358b35f5..f60345f5 100644 --- a/src/Backdash/Network/Client/PeerClient.cs +++ b/src/Backdash/Network/Client/PeerClient.cs @@ -220,7 +220,7 @@ public async Task StartReceiving(CancellationToken cancellationToken) try { serializer.Deserialize(buffer.AsSpan(..receivedSize), ref msg); - await observer.OnPeerMessage(msg, address, receivedSize, cancellationToken).ConfigureAwait(false); + observer.OnPeerMessage(in msg, address, receivedSize); } catch (NetcodeDeserializationException ex) { diff --git a/src/Backdash/Network/Client/PeerObserver.cs b/src/Backdash/Network/Client/PeerObserver.cs index b3b613f6..88b5cffe 100644 --- a/src/Backdash/Network/Client/PeerObserver.cs +++ b/src/Backdash/Network/Client/PeerObserver.cs @@ -1,5 +1,6 @@ using System.Net; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; namespace Backdash.Network.Client; @@ -7,12 +8,12 @@ namespace Backdash.Network.Client; /// Observe a /// /// -public interface IPeerObserver where T : struct +public interface IPeerObserver where T : struct { /// /// Handle new message from peer /// - ValueTask OnPeerMessage(T message, SocketAddress from, int bytesReceived, CancellationToken stoppingToken); + void OnPeerMessage(in T message, SocketAddress from, int bytesReceived); } sealed class PeerObserverGroup : IPeerObserver @@ -22,12 +23,16 @@ sealed class PeerObserverGroup : IPeerObserver public void Add(IPeerObserver observer) => observers.Add(observer); public void Remove(IPeerObserver observer) => observers.Remove(observer); - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - public async ValueTask OnPeerMessage( - T message, SocketAddress from, int bytesReceived, CancellationToken stoppingToken - ) + public void OnPeerMessage(in T message, SocketAddress from, int bytesReceived) { - for (var i = 0; i < observers.Count; i++) - await observers[i].OnPeerMessage(message, from, bytesReceived, stoppingToken).ConfigureAwait(false); + var span = CollectionsMarshal.AsSpan(observers); + ref var pointer = ref MemoryMarshal.GetReference(span); + ref var end = ref Unsafe.Add(ref pointer, span.Length); + + while (Unsafe.IsAddressLessThan(ref pointer, ref end)) + { + pointer.OnPeerMessage(in message, from, bytesReceived); + pointer = ref Unsafe.Add(ref pointer, 1)!; + } } } diff --git a/src/Backdash/Network/PeerConnectionFactory.cs b/src/Backdash/Network/PeerConnectionFactory.cs index e1596959..460798e0 100644 --- a/src/Backdash/Network/PeerConnectionFactory.cs +++ b/src/Backdash/Network/PeerConnectionFactory.cs @@ -27,8 +27,8 @@ IProtocolInputEventPublisher inputEventQueue var timeSync = new TimeSync(timeSyncOptions, logger); var outbox = new ProtocolOutbox(state, peer, clock, logger); var syncManager = new ProtocolSynchronizer(logger, clock, random, state, options, outbox, networkEventHandler); - var inbox = new ProtocolInbox( - options, inputSerializer, state, clock, syncManager, outbox, networkEventHandler, inputEventQueue, logger); + var inbox = new ProtocolInbox(options, inputSerializer, state, clock, syncManager, outbox, + networkEventHandler, inputEventQueue, logger); var inputBuffer = new ProtocolInputBuffer(options, inputSerializer, state, logger, timeSync, outbox, inbox); diff --git a/src/Backdash/Network/Protocol/Comm/IMessageSender.cs b/src/Backdash/Network/Protocol/Comm/IMessageSender.cs index 546ded88..f9dd4cbb 100644 --- a/src/Backdash/Network/Protocol/Comm/IMessageSender.cs +++ b/src/Backdash/Network/Protocol/Comm/IMessageSender.cs @@ -4,6 +4,5 @@ namespace Backdash.Network.Protocol.Comm; interface IMessageSender { - ValueTask SendMessageAsync(in ProtocolMessage msg, CancellationToken ct); bool SendMessage(in ProtocolMessage msg); } diff --git a/src/Backdash/Network/Protocol/Comm/ProtocolInbox.cs b/src/Backdash/Network/Protocol/Comm/ProtocolInbox.cs index 1229e1c1..c2311093 100644 --- a/src/Backdash/Network/Protocol/Comm/ProtocolInbox.cs +++ b/src/Backdash/Network/Protocol/Comm/ProtocolInbox.cs @@ -35,13 +35,7 @@ Logger logger public GameInput LastReceivedInput => lastReceivedInput; public Frame LastAckedFrame { get; private set; } = Frame.Null; - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - public async ValueTask OnPeerMessage( - ProtocolMessage message, - SocketAddress from, - int bytesReceived, - CancellationToken stoppingToken - ) + public void OnPeerMessage(in ProtocolMessage message, SocketAddress from, int bytesReceived) { if (!from.Equals(state.PeerAddress.Address)) return; @@ -70,17 +64,18 @@ CancellationToken stoppingToken var skipped = (ushort)(seqNum - nextReceivedSeq); if (skipped > options.MaxSequenceDistance) { - logger.Write(LogLevel.Debug, $"dropping out of order packet (seq: {seqNum}, last seq:{nextReceivedSeq})"); + logger.Write(LogLevel.Debug, + $"dropping out of order packet (seq: {seqNum}, last seq:{nextReceivedSeq})"); return; } } nextReceivedSeq = seqNum; logger.Write(LogLevel.Trace, $"recv {message} from {state.Player}"); - if (HandleMessage(ref message, out var replyMsg)) + if (HandleMessage(in message, out var replyMsg)) { - if (replyMsg.Header.Type is not MessageType.Unknown) - await messageSender.SendMessageAsync(in replyMsg, stoppingToken).ConfigureAwait(false); + if (replyMsg.Header.Type is not MessageType.Unknown && !messageSender.SendMessage(in replyMsg)) + logger.Write(LogLevel.Warning, $"inbox response dropped (seq: {seqNum})"); state.Stats.Received.LastTime = clock.GetTimeStamp(); state.Stats.Received.TotalPackets++; @@ -93,14 +88,14 @@ CancellationToken stoppingToken } } - bool HandleMessage(ref ProtocolMessage message, out ProtocolMessage replyMsg) + bool HandleMessage(in ProtocolMessage message, out ProtocolMessage replyMsg) { replyMsg = new(MessageType.Unknown); var handled = message.Header.Type switch { MessageType.SyncRequest => OnSyncRequest(in message, ref replyMsg), MessageType.SyncReply => OnSyncReply(in message, ref replyMsg), - MessageType.Input => OnInput(ref message.Input), + MessageType.Input => OnInput(in message.Input), MessageType.QualityReport => OnQualityReport(in message, out replyMsg), MessageType.QualityReply => OnQualityReply(in message), MessageType.InputAck => OnInputAck(in message), @@ -112,7 +107,7 @@ bool HandleMessage(ref ProtocolMessage message, out ProtocolMessage replyMsg) return handled; } - bool OnInput(ref InputMessage msg) + bool OnInput(in InputMessage msg) { logger.Write(LogLevel.Trace, $"Acked Frame: {LastAckedFrame}"); /* @@ -158,7 +153,7 @@ in remoteStatus[i].LastFrame lastReceivedFrame = msg.StartFrame.Previous(); var nextFrame = lastReceivedFrame.Next(); var currentFrame = msg.StartFrame; - var decompressor = InputEncoder.GetDecompressor(ref msg); + var decompressor = InputEncoder.GetDecompressor(ref Unsafe.AsRef(in msg)); if (currentFrame < nextFrame) { var framesAhead = nextFrame.Number - currentFrame.Number; diff --git a/src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs b/src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs index a6215a55..56be358f 100644 --- a/src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs +++ b/src/Backdash/Network/Protocol/Comm/ProtocolOutbox.cs @@ -16,9 +16,6 @@ Logger logger { int nextSendSeq; - public ValueTask SendMessageAsync(in ProtocolMessage msg, CancellationToken ct) => - peer.SendTo(state.PeerAddress.Address, in msg, this, ct); - public bool SendMessage(in ProtocolMessage msg) => peer.TrySendTo(state.PeerAddress.Address, in msg, this); public void BeforeSendMessage(ref ProtocolMessage message) diff --git a/src/Backdash/Network/ProtocolInputEventQueue.cs b/src/Backdash/Network/ProtocolInputEventQueue.cs index 9a1a877c..51dc53e4 100644 --- a/src/Backdash/Network/ProtocolInputEventQueue.cs +++ b/src/Backdash/Network/ProtocolInputEventQueue.cs @@ -1,4 +1,6 @@ using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Threading.Channels; using Backdash.Synchronizing.Input; using Backdash.Synchronizing.Input.Confirmed; @@ -11,20 +13,25 @@ readonly record struct GameInputEvent(PlayerHandle Player, GameInput Input = Input; } + interface IProtocolInputEventPublisher where TInput : unmanaged { void Publish(in GameInputEvent evt); } + interface IProtocolInputEventConsumer where TInput : unmanaged { bool TryConsume(out GameInputEvent nextEvent); } + interface IProtocolInputEventQueue : IDisposable, IProtocolInputEventPublisher, IProtocolInputEventConsumer where TInput : unmanaged; + sealed class ProtocolInputEventQueue : IProtocolInputEventQueue where TInput : unmanaged { bool disposed; + readonly Channel> channel = Channel.CreateUnbounded>( new() { @@ -32,13 +39,16 @@ sealed class ProtocolInputEventQueue : IProtocolInputEventQueue SingleReader = true, AllowSynchronousContinuations = true, }); + public bool TryConsume(out GameInputEvent nextEvent) => channel.Reader.TryRead(out nextEvent); + public void Publish(in GameInputEvent evt) { if (disposed) return; var published = channel.Writer.TryWrite(evt); Trace.Assert(published); } + public void Dispose() { if (disposed) return; @@ -46,6 +56,7 @@ public void Dispose() channel.Writer.Complete(); } } + sealed class ProtocolCombinedInputsEventPublisher(IProtocolInputEventPublisher peerInputEventPublisher) : IProtocolInputEventPublisher> where TInput : unmanaged @@ -54,14 +65,15 @@ public void Publish(in GameInputEvent> evt) { var player = evt.Player; var frame = evt.Input.Frame; - for (var i = 0; i < evt.Input.Data.Count; i++) + + var span = evt.Input.Data.Inputs[..evt.Input.Data.Count]; + ref var pointer = ref MemoryMarshal.GetReference(span); + ref var end = ref Unsafe.Add(ref pointer, span.Length); + + while (Unsafe.IsAddressLessThan(ref pointer, ref end)) { - ref readonly var current = ref evt.Input.Data.Inputs[i]; - peerInputEventPublisher.Publish( - new( - player, - new(current, frame)) - ); + peerInputEventPublisher.Publish(new(player, new(pointer, frame))); + pointer = ref Unsafe.Add(ref pointer, 1)!; } } } diff --git a/tests/Backdash.Tests/Backdash.Tests.csproj b/tests/Backdash.Tests/Backdash.Tests.csproj index dec102cb..6ccda26a 100644 --- a/tests/Backdash.Tests/Backdash.Tests.csproj +++ b/tests/Backdash.Tests/Backdash.Tests.csproj @@ -6,16 +6,16 @@ false true false - CS1591 + CS1591;NU1903 - + - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all @@ -24,6 +24,10 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/tests/Backdash.Tests/Specs/Integration/Network/UdpPeerClientTests.cs b/tests/Backdash.Tests/Specs/Integration/Network/UdpPeerClientTests.cs index 593abc27..85de681d 100644 --- a/tests/Backdash.Tests/Specs/Integration/Network/UdpPeerClientTests.cs +++ b/tests/Backdash.Tests/Specs/Integration/Network/UdpPeerClientTests.cs @@ -18,12 +18,11 @@ public async Task ShouldSend() var (client, server) = context; StringValue msg = "hello server"; SemaphoreSlim sem = new(0, 1); - server.Observer.OnMessage += (message, sender, _, _) => + server.Observer.OnMessage += (message, sender, _) => { message.Value.Should().Be("hello server"); sender.Should().Be(client.Address); sem.Release(); - return ValueTask.CompletedTask; }; await client.Client.SendTo(server.Address, msg); var pass = await sem.WaitAsync(TimeSpan.FromSeconds(1)); @@ -36,19 +35,18 @@ public async Task ShouldSendAndReceive() using Peer2PeerFixture context = new(new StringBinarySerializer()); var (client, server) = context; AsyncCounter counter = new(); - server.Observer.OnMessage += async (message, sender, _, token) => + server.Observer.OnMessage += (message, sender, _) => { message.Value.Should().Be("hello server"); sender.Should().Be(client.Address); counter.Inc(); - await server.Client.SendTo(sender, "hello client", null, token); + server.Client.TrySendTo(sender, "hello client", null); }; - client.Observer.OnMessage += (message, sender, _, _) => + client.Observer.OnMessage += (message, sender, _) => { message.Value.Should().Be("hello client"); sender.Should().Be(server.Address); counter.Inc(); - return ValueTask.CompletedTask; }; await client.Client.SendTo(server.Address, "hello server"); await WaitFor.BeTrue(() => counter.Value is 2); @@ -84,19 +82,17 @@ public async Task ShouldProcessConcurrent() var (client, server) = context; var totalResult = 0; AsyncCounter counter = new(); - server.Observer.OnMessage += (message, sender, _, _) => + server.Observer.OnMessage += (message, sender, _) => { sender.Should().Be(client.Address); HandleMessage(ref totalResult, message); counter.Inc(); - return ValueTask.CompletedTask; }; - client.Observer.OnMessage += (message, sender, _, _) => + client.Observer.OnMessage += (message, sender, _) => { sender.Should().Be(server.Address); HandleMessage(ref totalResult, message); counter.Inc(); - return ValueTask.CompletedTask; }; var messageCount = 100; Random rnd = new(42); @@ -125,16 +121,16 @@ public async Task ShouldSendReceiveBetween() var (client, server) = context; var totalResult = 0; AsyncCounter counter = new(); - server.Observer.OnMessage += async (message, sender, _, token) => + server.Observer.OnMessage += (message, sender, _) => { sender.Should().Be(client.Address); - await HandleMessageAsync(message, server.Client, sender, token); + HandleMessageAsync(message, server.Client, sender); counter.Inc(); }; - client.Observer.OnMessage += async (message, sender, _, token) => + client.Observer.OnMessage += (message, sender, _) => { sender.Should().Be(server.Address); - await HandleMessageAsync(message, client.Client, sender, token); + HandleMessageAsync(message, client.Client, sender); counter.Inc(); }; var messageCount = 100; @@ -154,11 +150,10 @@ public async Task ShouldSendReceiveBetween() totalResult.Should().Be(0); return; - async ValueTask HandleMessageAsync( + void HandleMessageAsync( OpMessage message, IPeerClient udpClient, - SocketAddress sender, - CancellationToken ct + SocketAddress sender ) { switch (message) @@ -168,11 +163,11 @@ CancellationToken ct break; case OpMessage.IncrementCallback: Interlocked.Increment(ref totalResult); - await udpClient.SendTo(sender, OpMessage.Decrement, null, ct); + Assert.True(udpClient.TrySendTo(sender, OpMessage.Decrement)); break; case OpMessage.DecrementCallback: Interlocked.Decrement(ref totalResult); - await udpClient.SendTo(sender, OpMessage.Increment, null, ct); + Assert.True(udpClient.TrySendTo(sender, OpMessage.Increment)); break; default: throw new ArgumentOutOfRangeException(nameof(message), message, null); diff --git a/tests/Backdash.Tests/TestUtils/Network/PeerEventObserver.cs b/tests/Backdash.Tests/TestUtils/Network/PeerEventObserver.cs index 84270f14..6a311e56 100644 --- a/tests/Backdash.Tests/TestUtils/Network/PeerEventObserver.cs +++ b/tests/Backdash.Tests/TestUtils/Network/PeerEventObserver.cs @@ -6,11 +6,8 @@ namespace Backdash.Tests.TestUtils.Network; sealed class PeerEventObserver : IPeerObserver where T : struct { - public event Func OnMessage = delegate - { - return ValueTask.CompletedTask; - }; - ValueTask IPeerObserver.OnPeerMessage( - T message, SocketAddress from, int bytesReceived, CancellationToken stoppingToken - ) => OnMessage(message, from, bytesReceived, stoppingToken); + public event Action OnMessage = delegate { }; + + void IPeerObserver.OnPeerMessage(in T message, SocketAddress from, int bytesReceived) => + OnMessage(message, from, bytesReceived); }