Skip to content

Commit 670e010

Browse files
authored
Fix a bug where the watcher would not terminate if it could not subscribe to Redis. (#674)
* Fix a bug in the watcher when redis is offline before the watcher can subscribe * Unsub when we are done * .
1 parent 0dc3646 commit 670e010

File tree

3 files changed

+133
-35
lines changed

3 files changed

+133
-35
lines changed

source/Halibut.Tests/Queue/Redis/NodeHeartBeat/NodeHeartBeatSenderFixture.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,60 @@ public async Task WhenWatchingTheNodeProcessingTheRequestIsStillAlive_AndTheWatc
257257
result.Should().Be(NodeWatcherResult.NodeMayHaveDisconnected);
258258
}
259259

260+
[Test]
261+
public async Task WhenWatchingTheNodeProcessingTheRequestIsStillAlive_AndTheWatchersConnectionToRedisGoesDownBeforeItStartsWatching_ShouldReturnProcessingNodeIsLikelyDisconnected()
262+
{
263+
// Arrange
264+
var endpoint = new Uri("poll://" + Guid.NewGuid());
265+
var requestActivityId = Guid.NewGuid();
266+
var log = new TestContextLogCreator("NodeHeartBeat", LogLevel.Trace).CreateNewForPrefix("");
267+
var guid = Guid.NewGuid();
268+
269+
using var portForwarder = PortForwardingToRedisBuilder.ForwardingToRedis(Logger);
270+
await using var unstableRedisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder, guid);
271+
await using var stableRedisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: guid);
272+
273+
var unstableRedisTransport = new HalibutRedisTransport(unstableRedisFacade);
274+
var stableRedisTransport = new HalibutRedisTransport(stableRedisFacade);
275+
276+
var request = new RequestMessageBuilder(endpoint.ToString())
277+
.WithActivityId(requestActivityId)
278+
.Build();
279+
var pendingRequest = new RedisPendingRequest(request, log);
280+
281+
// Start heartbeat sender
282+
await using var heartBeatSender = new NodeHeartBeatSender(
283+
endpoint,
284+
requestActivityId,
285+
stableRedisTransport,
286+
log,
287+
HalibutQueueNodeSendingPulses.RequestProcessorNode,
288+
defaultDelayBetweenPulses: TimeSpan.FromMilliseconds(200));
289+
290+
// Mark request as collected so watcher proceeds to monitoring phase
291+
await pendingRequest.RequestHasBeenCollectedAndWillBeTransferred();
292+
293+
// Act - Kill the connection to stop heartbeats
294+
portForwarder.EnterKillNewAndExistingConnectionsMode();
295+
296+
// Start the watcher
297+
var watcherTask = NodeHeartBeatWatcher.WatchThatNodeProcessingTheRequestIsStillAlive(
298+
endpoint,
299+
request,
300+
pendingRequest,
301+
unstableRedisTransport,
302+
timeBetweenCheckingIfRequestWasCollected: TimeSpan.FromSeconds(1),
303+
log,
304+
maxTimeBetweenHeartBeetsBeforeProcessingNodeIsAssumedToBeOffline: TimeSpan.FromSeconds(5), // Short timeout for test
305+
CancellationToken);
306+
307+
// Assert
308+
await Task.WhenAny(Task.Delay(TimeSpan.FromSeconds(20)), watcherTask);
309+
watcherTask.IsCompleted.Should().BeTrue("Since it should have detected no heart beats have been sent for some time.");
310+
var result = await watcherTask;
311+
result.Should().Be(NodeWatcherResult.NodeMayHaveDisconnected);
312+
}
313+
260314
[Test]
261315
public async Task WhenWatchingTheNodeProcessingTheRequestIsStillAlive_AndTheConnectionIsSuperStableAndWeStopWatching_WatcherShouldReturnNodeStayedConnected()
262316
{

source/Halibut.Tests/Queue/Redis/RedisPendingRequestQueueFixture.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System;
33
using System.Threading;
44
using System.Threading.Tasks;
5+
using Docker.DotNet.Models;
56
using FluentAssertions;
67
using Halibut.Diagnostics;
78
using Halibut.Exceptions;
@@ -116,6 +117,45 @@ public async Task FullSendAndReceiveShouldWork()
116117
responseMessage.Result.Should().Be("Yay");
117118
}
118119

120+
121+
[Test]
122+
public async Task TheProcessingTimeOfARequestCanExceedWatcherTimeouts()
123+
{
124+
// Arrange
125+
var endpoint = new Uri("poll://" + Guid.NewGuid());
126+
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade();
127+
var redisTransport = new HalibutRedisTransport(redisFacade);
128+
129+
var request = new RequestMessageBuilder("poll://test-endpoint").Build();
130+
request.Destination.PollingRequestQueueTimeout = TimeSpan.FromSeconds(4); // Setting this low makes the test more real, long requests will exceed this timeout.
131+
132+
var node1Sender = new RedisPendingRequestQueue(endpoint, new RedisNeverLosesData(), HalibutLog, redisTransport, CreateMessageSerialiserAndDataStreamStorage(), new HalibutTimeoutsAndLimits());
133+
await node1Sender.WaitUntilQueueIsSubscribedToReceiveMessages();
134+
node1Sender.RequestSenderNodeHeartBeatTimeout = TimeSpan.FromSeconds(4);
135+
node1Sender.RequestReceiverNodeHeartBeatTimeout = TimeSpan.FromSeconds(4);
136+
137+
node1Sender.RequestSenderNodeHeartBeatRate = TimeSpan.FromMilliseconds(100);
138+
node1Sender.RequestReceiverNodeHeartBeatRate = TimeSpan.FromMilliseconds(100);
139+
140+
var queueAndWaitAsync = node1Sender.QueueAndWaitAsync(request, CancellationToken.None);
141+
var requestMessageWithCancellationToken = await node1Sender.DequeueAsync(CancellationToken);
142+
143+
requestMessageWithCancellationToken.Should().NotBeNull();
144+
requestMessageWithCancellationToken!.RequestMessage.Id.Should().Be(request.Id);
145+
146+
// Act
147+
// Pretend the processing takes a long time, longer than the heart beat timeout.
148+
await Task.Delay(TimeSpan.FromSeconds(15), CancellationToken);
149+
150+
var response = ResponseMessage.FromResult(requestMessageWithCancellationToken.RequestMessage, "Yay");
151+
await node1Sender.ApplyResponse(response, requestMessageWithCancellationToken.RequestMessage.ActivityId);
152+
153+
var responseMessage = await queueAndWaitAsync;
154+
155+
// Assert
156+
responseMessage.Result.Should().Be("Yay");
157+
}
158+
119159
[Test]
120160
public async Task FullSendAndReceiveWithDataStreamShouldWork()
121161
{

source/Halibut/Queue/Redis/NodeHeartBeat/NodeHeartBeatWatcher.cs

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public static async Task<NodeWatcherResult> WatchThatNodeProcessingTheRequestIsS
2020
TimeSpan maxTimeBetweenHeartBeetsBeforeProcessingNodeIsAssumedToBeOffline,
2121
CancellationToken watchCancellationToken)
2222
{
23-
log = log.ForContext<NodeHeartBeatSender>();
23+
log = log.ForContext<NodeHeartBeatWatcher>();
2424
// Once the pending's CT has been cancelled we no longer care to keep observing
2525
await using var cts = new CancelOnDisposeCancellationToken(watchCancellationToken, redisPending.PendingRequestCancellationToken);
2626
try
@@ -74,49 +74,53 @@ static async Task<NodeWatcherResult> WatchForPulsesFromNode(
7474
log.Write(EventType.Diagnostic, "Starting to watch for pulses from {0} node, request {1}, endpoint {2}", watchingForPulsesFrom, requestActivityId, endpoint);
7575

7676
DateTimeOffset? lastHeartBeat = DateTimeOffset.Now;
77-
77+
78+
await using var subscriptionCts = new CancelOnDisposeCancellationToken(watchCancellationToken);
79+
// Non-blocking subscription for heart beats.
80+
var subscriptionTask = Task.Run(async () => await halibutRedisTransport.SubscribeToNodeHeartBeatChannel(
81+
endpoint,
82+
requestActivityId,
83+
watchingForPulsesFrom,
84+
async () =>
85+
{
86+
await Task.CompletedTask;
87+
lastHeartBeat = DateTimeOffset.Now;
88+
log.Write(EventType.Diagnostic, "Received heartbeat from {0} node, request {1}", watchingForPulsesFrom, requestActivityId);
89+
}, subscriptionCts.Token));
7890
try
7991
{
80-
// Currently we will wait until the CT is cancelled to get a subscription,
81-
// instead it would be better if we either
82-
// - waited for maxTimeBetweenHeartBeetsBeforeNodeIsAssumedToBeOffline to get a subscription.
83-
// - SubscribeToNodeHeartBeatChannel returned immediately even if it doesn't have a subscription, and instead it works
84-
// in the background to get one unless the CT is triggered, or it is disposed.
85-
// https://whimsical.com/subscribetonodeheartbeatchannel-should-timeout-while-waiting-to--NFWwmPkE7pTBdm2PRUC8Tf
86-
await using var subscription = await halibutRedisTransport.SubscribeToNodeHeartBeatChannel(
87-
endpoint,
88-
requestActivityId,
89-
watchingForPulsesFrom,
90-
async () =>
91-
{
92-
await Task.CompletedTask;
93-
lastHeartBeat = DateTimeOffset.Now;
94-
log.Write(EventType.Diagnostic, "Received heartbeat from {0} node, request {1}", watchingForPulsesFrom, requestActivityId);
95-
}, watchCancellationToken);
96-
97-
while (!watchCancellationToken.IsCancellationRequested)
92+
try
9893
{
99-
var timeSinceLastHeartBeat = DateTimeOffset.Now - lastHeartBeat.Value;
100-
if (timeSinceLastHeartBeat > maxTimeBetweenHeartBeetsBeforeNodeIsAssumedToBeOffline)
94+
95+
while (!watchCancellationToken.IsCancellationRequested)
10196
{
102-
log.Write(EventType.Diagnostic, "{0} node appears disconnected, request {1}, last heartbeat was {2} seconds ago", watchingForPulsesFrom, requestActivityId, timeSinceLastHeartBeat.TotalSeconds);
103-
return NodeWatcherResult.NodeMayHaveDisconnected;
97+
var timeSinceLastHeartBeat = DateTimeOffset.Now - lastHeartBeat.Value;
98+
if (timeSinceLastHeartBeat > maxTimeBetweenHeartBeetsBeforeNodeIsAssumedToBeOffline)
99+
{
100+
log.Write(EventType.Diagnostic, "{0} node appears disconnected, request {1}, last heartbeat was {2} seconds ago", watchingForPulsesFrom, requestActivityId, timeSinceLastHeartBeat.TotalSeconds);
101+
return NodeWatcherResult.NodeMayHaveDisconnected;
102+
}
103+
104+
var timeToWait = TimeSpanHelper.Min(
105+
TimeSpan.FromSeconds(30),
106+
maxTimeBetweenHeartBeetsBeforeNodeIsAssumedToBeOffline - timeSinceLastHeartBeat + TimeSpan.FromSeconds(1));
107+
108+
await Try.IgnoringError(async () => await Task.Delay(timeToWait, watchCancellationToken));
104109
}
105110

106-
var timeToWait = TimeSpanHelper.Min(
107-
TimeSpan.FromSeconds(30),
108-
maxTimeBetweenHeartBeetsBeforeNodeIsAssumedToBeOffline - timeSinceLastHeartBeat + TimeSpan.FromSeconds(1));
109-
110-
await Try.IgnoringError(async () => await Task.Delay(timeToWait, watchCancellationToken));
111+
log.Write(EventType.Diagnostic, "{0} node watcher cancelled, request {1}", watchingForPulsesFrom, requestActivityId);
112+
return NodeWatcherResult.NoDisconnectSeen;
113+
}
114+
catch (Exception ex) when (!watchCancellationToken.IsCancellationRequested)
115+
{
116+
log.WriteException(EventType.Diagnostic, "Error while watching {0} node, request {1}", ex, watchingForPulsesFrom, requestActivityId);
117+
throw;
111118
}
112-
113-
log.Write(EventType.Diagnostic, "{0} node watcher cancelled, request {1}", watchingForPulsesFrom, requestActivityId);
114-
return NodeWatcherResult.NoDisconnectSeen;
115119
}
116-
catch (Exception ex) when (!watchCancellationToken.IsCancellationRequested)
120+
finally
117121
{
118-
log.WriteException(EventType.Diagnostic, "Error while watching {0} node, request {1}", ex, watchingForPulsesFrom, requestActivityId);
119-
throw;
122+
await Try.IgnoringError(async () => await subscriptionCts.CancelAsync());
123+
await Try.IgnoringError(async () => await (await subscriptionTask).DisposeAsync());
120124
}
121125
}
122126

0 commit comments

Comments
 (0)