Skip to content

Commit 0a58aa2

Browse files
authored
Add TryGetTokenForDataLossDetection method to avoid unnecessary CancellationTokenSource creation and Timer creation on every RPC (#689)
* Add TryGetTokenForDataLossDetection method to avoid unnecessary CancellationTokenSource creation - Added TryGetTokenForDataLossDetection() method to IWatchForRedisLosingAllItsData interface - Implemented the method in WatchForRedisLosingAllItsData to return token immediately if monitoring is active - Updated RedisPendingRequestQueue to use try method first, falling back to GetTokenForDataLossDetection - Refactored DequeueAsync to use centralized DataLossCancellationToken method instead of duplicating logic - Updated test implementations (RedisNeverLosesData and CancellableDataLossWatchForRedisLosingAllItsData) - Improves performance by avoiding CancellationTokenSource allocation when monitoring is already ready - Maintains backward compatibility with existing GetTokenForDataLossDetection method - Reduces code duplication by centralizing data loss cancellation token logic Resolves TODO comment in WatchForRedisLosingAllItsData.cs * . * Add happy path test
1 parent 7de56a8 commit 0a58aa2

File tree

6 files changed

+62
-5
lines changed

6 files changed

+62
-5
lines changed

source/Halibut.Tests/Queue/Redis/RedisDataLoseDetection/WatchForRedisLosingAllItsDataFixture.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ public async Task WhenTheConnectionToRedisCanNotBeCreated_WhenAskingForALostData
2424
await AssertException.Throws<TaskCanceledException>(watcher.GetTokenForDataLossDetection(TimeSpan.FromSeconds(1), CancellationToken));
2525
}
2626

27+
[Test]
28+
public async Task WhenTheConnectionToRedisCanBeEstablished_AndSomeoneHasAlreadyGotTheCancellationToken_TryGetCancellationTokenReturnsTheCT()
29+
{
30+
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade();
31+
await using var watcher = new WatchForRedisLosingAllItsData(redisFacade, HalibutLog, watchInterval:TimeSpan.FromSeconds(1));
32+
33+
34+
await watcher.GetTokenForDataLossDetection(TimeSpan.FromSeconds(5), CancellationToken);
35+
36+
watcher.TryGetTokenForDataLossDetection().Should().NotBeNull();
37+
}
38+
2739
[Test]
2840
public async Task WhenTheConnectionToRedisIsInitiallyDown_WhenAskingForALostDataCancellationToken_AndTheConnectionToRedisReturns_TheCancellationTokenIsReturned()
2941
{

source/Halibut.Tests/Queue/Redis/Utils/CancellableDataLossWatchForRedisLosingAllItsData.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,14 @@ public async Task<CancellationToken> GetTokenForDataLossDetection(TimeSpan timeT
3737
return await TaskCompletionSource.Task;
3838
#pragma warning restore VSTHRD003
3939
}
40+
41+
public CancellationToken? TryGetTokenForDataLossDetection()
42+
{
43+
if (TaskCompletionSource.Task.IsCompleted && TaskCompletionSource.Task.Status == TaskStatus.RanToCompletion)
44+
{
45+
return TaskCompletionSource.Task.Result;
46+
}
47+
return null;
48+
}
4049
}
4150
}

source/Halibut.Tests/Queue/Redis/Utils/RedisNeverLosesData.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ public Task<CancellationToken> GetTokenForDataLossDetection(TimeSpan timeToWait,
1919
return Task.FromResult(CancellationToken.None);
2020
}
2121

22+
public CancellationToken? TryGetTokenForDataLossDetection()
23+
{
24+
return CancellationToken.None;
25+
}
26+
2227
public ValueTask DisposeAsync()
2328
{
2429
return ValueTask.CompletedTask;

source/Halibut/Queue/Redis/RedisDataLossDetection/IWatchForRedisLosingAllItsData.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,12 @@ public interface IWatchForRedisLosingAllItsData : IAsyncDisposable
1414
/// <param name="cancellationToken"></param>
1515
/// <returns>A cancellation token which is triggered when data loss occurs.</returns>
1616
Task<CancellationToken> GetTokenForDataLossDetection(TimeSpan timeToWait, CancellationToken cancellationToken);
17+
18+
/// <summary>
19+
/// Tries to get a cancellation token for data loss detection if monitoring is already active.
20+
/// This method returns immediately without waiting.
21+
/// </summary>
22+
/// <returns>A cancellation token if monitoring is active, null if monitoring is not yet ready.</returns>
23+
CancellationToken? TryGetTokenForDataLossDetection();
1724
}
1825
}

source/Halibut/Queue/Redis/RedisDataLossDetection/WatchForRedisLosingAllItsData.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public WatchForRedisLosingAllItsData(RedisFacade redisFacade, ILog log, TimeSpan
4343
var _ = Task.Run(async () => await KeepWatchingForDataLoss(cts.Token));
4444
}
4545

46-
private TaskCompletionSource<CancellationToken> taskCompletionSource = new TaskCompletionSource<CancellationToken>();
46+
TaskCompletionSource<CancellationToken> taskCompletionSource = new TaskCompletionSource<CancellationToken>();
4747

4848
/// <summary>
4949
/// Will cause the caller to wait until we are connected to redis and so can detect datalose.
@@ -53,14 +53,30 @@ public WatchForRedisLosingAllItsData(RedisFacade redisFacade, ILog log, TimeSpan
5353
/// <returns>A cancellation token which is triggered when data lose occurs.</returns>
5454
public async Task<CancellationToken> GetTokenForDataLossDetection(TimeSpan timeToWait, CancellationToken cancellationToken)
5555
{
56-
if (taskCompletionSource.Task.IsCompleted)
56+
var localCopyOfTaskCompletionSource = taskCompletionSource;
57+
if (localCopyOfTaskCompletionSource.Task.IsCompleted)
5758
{
58-
return await taskCompletionSource.Task;
59+
return await localCopyOfTaskCompletionSource.Task;
5960
}
6061

6162
await using var cts = new CancelOnDisposeCancellationToken(cancellationToken);
6263
cts.CancelAfter(timeToWait);
63-
return await taskCompletionSource.Task.WaitAsync(cts.Token);
64+
return await localCopyOfTaskCompletionSource.Task.WaitAsync(cts.Token);
65+
}
66+
67+
/// <summary>
68+
/// Tries to get a cancellation token for data loss detection if monitoring is already active.
69+
/// This method returns immediately without waiting.
70+
/// </summary>
71+
/// <returns>A cancellation token if monitoring is active, null if monitoring is not yet ready.</returns>
72+
public CancellationToken? TryGetTokenForDataLossDetection()
73+
{
74+
var localCopyOfTaskCompletionSource = taskCompletionSource;
75+
if (localCopyOfTaskCompletionSource.Task.IsCompleted && localCopyOfTaskCompletionSource.Task.Status == TaskStatus.RanToCompletion)
76+
{
77+
return localCopyOfTaskCompletionSource.Task.Result;
78+
}
79+
return null;
6480
}
6581

6682
private async Task KeepWatchingForDataLoss(CancellationToken cancellationToken)

source/Halibut/Queue/Redis/RedisPendingRequestQueue.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ public RedisPendingRequestQueue(
117117

118118
async Task<CancellationToken> DataLossCancellationToken(CancellationToken? cancellationToken)
119119
{
120+
// Try to get the token immediately if monitoring is already active
121+
var token = watchForRedisLosingAllItsData.TryGetTokenForDataLossDetection();
122+
if (token.HasValue)
123+
{
124+
return token.Value;
125+
}
126+
127+
// Fall back to waiting for the token if not immediately available
120128
await using var cts = new CancelOnDisposeCancellationToken(queueCts.Token, cancellationToken ?? CancellationToken.None);
121129
return await watchForRedisLosingAllItsData.GetTokenForDataLossDetection(TimeSpan.FromSeconds(30), cts.Token);
122130
}
@@ -383,7 +391,7 @@ async Task<bool> TryClearRequestFromQueue(RedisPendingRequest redisPending)
383391
{
384392
// There is a chance the data loss occured after we got the data but before here.
385393
// In that case we will just time out because of the lack of heart beats.
386-
var dataLossCT = await watchForRedisLosingAllItsData.GetTokenForDataLossDetection(TimeSpan.FromSeconds(30), queueToken);
394+
var dataLossCT = await DataLossCancellationToken(cancellationToken);
387395

388396
disposables.AddAsyncDisposable(new NodeHeartBeatSender(
389397
endpoint,

0 commit comments

Comments
 (0)