Skip to content

Commit f0acf60

Browse files
LukeButtersCopilotrhysparry
authored
Support upload transfer progress of DataStreams in the Redis Queue. (#679)
* Support update progress for the Redis PRQ * Add E2E test * Add test * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * Update source/Halibut/Queue/QueuedDataStreams/HeartBeatMessage.cs Co-authored-by: Copilot <[email protected]> * Update source/Halibut/Queue/Redis/MessageStorage/RedisDataStreamTransferProgressRecorder.cs Co-authored-by: Copilot <[email protected]> * Update source/Halibut/DataStreams/StreamCopierWithProgress.cs Co-authored-by: Copilot <[email protected]> * Update source/Halibut/DataStreams/IDataStreamTransferProgress.cs Co-authored-by: Rhys Parry <[email protected]> * . * . * . --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: Rhys Parry <[email protected]>
1 parent 403e9b8 commit f0acf60

File tree

39 files changed

+1273
-138
lines changed

39 files changed

+1273
-138
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using FluentAssertions;
8+
using Halibut.DataStreams;
9+
using Halibut.Queue.QueuedDataStreams;
10+
using NSubstitute;
11+
using NUnit.Framework;
12+
13+
namespace Halibut.Tests.Queue.QueuedDataStreams
14+
{
15+
[TestFixture]
16+
public class HeartBeatDrivenDataStreamProgressReporterFixture
17+
{
18+
[Test]
19+
public async Task HeartBeatReceived_WithSingleDataStream_CallsUpdateProgressAsyncWithPercentageComplete()
20+
{
21+
// Arrange
22+
const int streamSize = 100;
23+
var progressUpdates = new List<int>();
24+
var progressUpdateCalls = 0;
25+
26+
// Create a mock stream of size 100 bytes
27+
var mockStream = new MemoryStream(new byte[streamSize]);
28+
29+
// Create the updateProgressAsync function that captures progress updates
30+
Task UpdateProgressAsync(int percentageComplete, CancellationToken ct)
31+
{
32+
progressUpdates.Add(percentageComplete);
33+
Interlocked.Increment(ref progressUpdateCalls);
34+
return Task.CompletedTask;
35+
}
36+
37+
// Create a DataStream using FromStream with our progress callback
38+
var dataStream = DataStream.FromStream(mockStream, UpdateProgressAsync);
39+
40+
// Create the HeartBeatDrivenDataStreamProgressReporter with our single DataStream
41+
var progressReporter = HeartBeatDrivenDataStreamProgressReporter.CreateForDataStreams(new[] { dataStream });
42+
43+
// Create heart beat messages with different progress values
44+
var heartBeatMessage25 = new HeartBeatMessage
45+
{
46+
DataStreamProgress = new Dictionary<Guid, long>
47+
{
48+
{ dataStream.Id, 25 } // 25% complete (25 out of 100 bytes)
49+
}
50+
};
51+
52+
var heartBeatMessage50 = new HeartBeatMessage
53+
{
54+
DataStreamProgress = new Dictionary<Guid, long>
55+
{
56+
{ dataStream.Id, 50 } // 50% complete (50 out of 100 bytes)
57+
}
58+
};
59+
60+
var heartBeatMessage100 = new HeartBeatMessage
61+
{
62+
DataStreamProgress = new Dictionary<Guid, long>
63+
{
64+
{ dataStream.Id, 100 } // 100% complete (100 out of 100 bytes)
65+
}
66+
};
67+
68+
// Act
69+
await progressReporter.HeartBeatReceived(heartBeatMessage25, CancellationToken.None);
70+
await progressReporter.HeartBeatReceived(heartBeatMessage50, CancellationToken.None);
71+
await progressReporter.HeartBeatReceived(heartBeatMessage100, CancellationToken.None);
72+
73+
// Assert
74+
progressUpdateCalls.Should().Be(3, "updateProgressAsync should be called for each heart beat");
75+
progressUpdates.Should().ContainInOrder(25, 50, 100);
76+
77+
// Clean up
78+
await progressReporter.DisposeAsync();
79+
}
80+
81+
[Test]
82+
public async Task OnDisposeTheProgressShouldBeMarkedAs100PercentComplete()
83+
{
84+
// Arrange
85+
const int streamSize = 100;
86+
var progressUpdates = new List<int>();
87+
var progressUpdateCalls = 0;
88+
89+
// Create a mock stream of size 100 bytes
90+
var mockStream = new MemoryStream(new byte[streamSize]);
91+
92+
// Create the updateProgressAsync function that captures progress updates
93+
Task UpdateProgressAsync(int percentageComplete, CancellationToken ct)
94+
{
95+
progressUpdates.Add(percentageComplete);
96+
Interlocked.Increment(ref progressUpdateCalls);
97+
return Task.CompletedTask;
98+
}
99+
100+
// Create a DataStream using FromStream with our progress callback
101+
var dataStream = DataStream.FromStream(mockStream, UpdateProgressAsync);
102+
103+
// Create the HeartBeatDrivenDataStreamProgressReporter with our single DataStream
104+
var progressReporter = HeartBeatDrivenDataStreamProgressReporter.CreateForDataStreams(new[] { dataStream });
105+
106+
// Create heart beat messages with different progress values
107+
var heartBeatMessage25 = new HeartBeatMessage
108+
{
109+
DataStreamProgress = new Dictionary<Guid, long>
110+
{
111+
{ dataStream.Id, 25 } // 25% complete (25 out of 100 bytes)
112+
}
113+
};
114+
115+
await progressReporter.HeartBeatReceived(heartBeatMessage25, CancellationToken.None);
116+
117+
// Act
118+
await progressReporter.DisposeAsync();
119+
120+
121+
// Assert
122+
progressUpdates.Should().ContainInOrder(25, 100); // We should still receive 100% complete, since
123+
// on dispose we want to let the callback know it is done
124+
}
125+
}
126+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Linq;
5+
using System.Threading;
6+
using FluentAssertions;
7+
using Halibut.Queue.QueuedDataStreams;
8+
using Halibut.Queue.Redis.MessageStorage;
9+
using Newtonsoft.Json;
10+
using NUnit.Framework;
11+
12+
namespace Halibut.Tests.Queue.QueuedDataStreams
13+
{
14+
public class HeartBeatMessageFixture : BaseTest
15+
{
16+
[Test]
17+
public void Deserialize_WithValidEmptyJson_ShouldReturnHeartBeatMessageWithEmptyProgress()
18+
{
19+
// Arrange
20+
var json = "{\"DataStreamProgress\":{}}";
21+
22+
// Act
23+
var heartBeatMessage = HeartBeatMessage.Deserialize(json);
24+
25+
// Assert
26+
heartBeatMessage.Should().NotBeNull();
27+
heartBeatMessage.DataStreamProgress.Should().NotBeNull();
28+
heartBeatMessage.DataStreamProgress.Should().BeEmpty();
29+
}
30+
31+
[Test]
32+
public void WhenGivenAPreviouslySerialisedHeartBeatMessage_Deserialize_ShouldReturnHeartBeatMessage()
33+
{
34+
// Arrange
35+
var json = @"{
36+
""DataStreamProgress"": {
37+
""731aba31-0272-4111-80b2-8f727ef70af1"": 0,
38+
""0103c541-a7b6-4590-84c2-f7098816b617"": 1024
39+
}
40+
}";
41+
42+
// Act
43+
var heartBeatMessage = HeartBeatMessage.Deserialize(json);
44+
45+
// Assert
46+
heartBeatMessage.Should().NotBeNull();
47+
heartBeatMessage.DataStreamProgress.Should().NotBeNull();
48+
heartBeatMessage.DataStreamProgress.Should().HaveCount(2);
49+
heartBeatMessage.DataStreamProgress.Should().ContainKey(Guid.Parse("731aba31-0272-4111-80b2-8f727ef70af1"));
50+
heartBeatMessage.DataStreamProgress[Guid.Parse("731aba31-0272-4111-80b2-8f727ef70af1")].Should().Be(0);
51+
heartBeatMessage.DataStreamProgress.Should().ContainKey(Guid.Parse("0103c541-a7b6-4590-84c2-f7098816b617"));
52+
heartBeatMessage.DataStreamProgress[Guid.Parse("0103c541-a7b6-4590-84c2-f7098816b617")].Should().Be(1024);
53+
}
54+
55+
[Test]
56+
public void Deserialize_WithEmptyStringJson_ShouldReturnEmptyHeartBeatMessage()
57+
{
58+
// Act
59+
var heartBeatMessage = HeartBeatMessage.Deserialize(string.Empty);
60+
61+
// Assert
62+
heartBeatMessage.Should().NotBeNull();
63+
heartBeatMessage.DataStreamProgress.Should().NotBeNull();
64+
heartBeatMessage.DataStreamProgress.Should().BeEmpty();
65+
}
66+
67+
[Test]
68+
public void Deserialize_WithEmptyDataShouldWork()
69+
{
70+
// Arrange
71+
var emptyJson = "{}";
72+
73+
// Act & Assert
74+
var heartBeatMessage = HeartBeatMessage.Deserialize(emptyJson);
75+
heartBeatMessage.DataStreamProgress.Should().BeEmpty();
76+
}
77+
78+
[Test]
79+
public void SerializeAndDeserialize_RoundTrip_ShouldWork()
80+
{
81+
// Arrange
82+
var dataStreamId1 = Guid.NewGuid();
83+
var dataStreamId2 = Guid.NewGuid();
84+
85+
var original = new HeartBeatMessage
86+
{
87+
DataStreamProgress = new Dictionary<Guid, long>
88+
{
89+
{ dataStreamId1, 0L },
90+
{ dataStreamId2, 1024L }
91+
}
92+
};
93+
94+
// Act
95+
var json = HeartBeatMessage.Serialize(original);
96+
var deserialized = HeartBeatMessage.Deserialize(json);
97+
98+
// Assert
99+
deserialized.Should().NotBeNull();
100+
deserialized.DataStreamProgress.Should().NotBeNull();
101+
deserialized.DataStreamProgress.Should().HaveCount(2);
102+
103+
deserialized.DataStreamProgress.Should().ContainKey(dataStreamId1);
104+
deserialized.DataStreamProgress[dataStreamId1].Should().Be(0L);
105+
106+
deserialized.DataStreamProgress.Should().ContainKey(dataStreamId2);
107+
deserialized.DataStreamProgress[dataStreamId2].Should().Be(1024L);
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)