Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/Halibut.Tests/ManyPollingTentacleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public async Task WhenMakingManyConcurrentRequestsToManyServices_AllRequestsComp
{
var numberOfPollingServices = 100;
int concurrency = 20;
int numberOfCallsToMake = Math.Min(numberOfPollingServices, 20);
int numberOfCallsToMake = Math.Min(numberOfPollingServices, 100);

var logFactory = new CachingLogFactory(new TestContextLogCreator("", LogLevel.Trace));
var services = GetDelegateServiceFactory();
Expand Down
1,776 changes: 891 additions & 885 deletions source/Halibut.Tests/Queue/Redis/RedisPendingRequestQueueFixture.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ public MessageSerialiserAndDataStreamStorageWithVirtualMethods(IMessageSerialise
return messageSerialiserAndDataStreamStorage.PrepareRequest(request, cancellationToken);
}

public virtual Task<(RequestMessage, RequestDataStreamsTransferProgress)> ReadRequest(RedisStoredMessage jsonRequest, CancellationToken cancellationToken)
public virtual Task<(PreparedRequestMessage, RequestDataStreamsTransferProgress)> ReadRequest(RedisStoredMessage jsonRequest, CancellationToken cancellationToken)
{
return messageSerialiserAndDataStreamStorage.ReadRequest(jsonRequest, cancellationToken);
}

public virtual Task<RedisStoredMessage> PrepareResponse(ResponseMessage response, CancellationToken cancellationToken)
public virtual Task<RedisStoredMessage> PrepareResponseForStorageInRedis(Guid activityId, ResponseBytesAndDataStreams response, CancellationToken cancellationToken)
{
return messageSerialiserAndDataStreamStorage.PrepareResponse(response, cancellationToken);
return messageSerialiserAndDataStreamStorage.PrepareResponseForStorageInRedis(activityId, response, cancellationToken);
}

public virtual Task<ResponseMessage> ReadResponse(RedisStoredMessage jsonResponse, CancellationToken cancellationToken)
public virtual Task<ResponseMessage> ReadResponseFromRedisStoredMessage(RedisStoredMessage jsonResponse, CancellationToken cancellationToken)
{
return messageSerialiserAndDataStreamStorage.ReadResponse(jsonResponse, cancellationToken);
return messageSerialiserAndDataStreamStorage.ReadResponseFromRedisStoredMessage(jsonResponse, cancellationToken);
}
}

Expand All @@ -60,7 +60,7 @@ public MessageSerialiserAndDataStreamStorageThatThrowsWhenReadingResponse(IMessa
this.exception = exception;
}

public override Task<ResponseMessage> ReadResponse(RedisStoredMessage jsonResponse, CancellationToken cancellationToken)
public override Task<ResponseMessage> ReadResponseFromRedisStoredMessage(RedisStoredMessage jsonResponse, CancellationToken cancellationToken)
{
throw exception();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public async Task ApplyResponse(ResponseMessage response, Guid requestActivityId
public async Task<ResponseMessage> QueueAndWaitAsync(RequestMessage request, CancellationToken requestCancellationToken)
=> await inner.QueueAndWaitAsync(request, requestCancellationToken);

public async Task ApplyRawResponse(ResponseBytesAndDataStreams response, Guid nextRequestActivityId)
{
await inner.ApplyRawResponse(response, nextRequestActivityId);
}

public ValueTask DisposeAsync()
{
return inner.DisposeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public async Task<ResponseMessage> QueueAndWaitAsync(RequestMessage request, Can
return result;
}

public async Task ApplyRawResponse(ResponseBytesAndDataStreams response, Guid nextRequestActivityId)
{
await inner.ApplyRawResponse(response, nextRequestActivityId);
}

public ValueTask DisposeAsync()
{
return this.inner.DisposeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ static byte[] DeflateString(string s)

async Task<T> ReadMessage<T>(MessageSerializer messageSerializer, RewindableBufferStream rewindableBufferStream)
{
return (await messageSerializer.ReadMessageAsync<T>(rewindableBufferStream, CancellationToken)).Message;
return (await messageSerializer.ReadMessageAsync<T>(rewindableBufferStream, false, CancellationToken)).Message;
}

async Task WriteMessage(MessageSerializer messageSerializer, Stream stream, string message)
Expand Down
10 changes: 10 additions & 0 deletions source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,11 @@ public async Task SendAsync<T>(T message, CancellationToken cancellationToken)
output.AppendLine("--> " + typeof(T).Name);
}

public Task SendPrePreparedRequestAsync(PreparedRequestMessage preparedRequestMessage, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public Task<RequestMessage?> ReceiveRequestAsync(TimeSpan timeoutForReceivingTheFirstByte, CancellationToken cancellationToken)
{
return ReceiveAsync<RequestMessage>();
Expand All @@ -398,6 +403,11 @@ public async Task SendAsync<T>(T message, CancellationToken cancellationToken)
return ReceiveAsync<ResponseMessage>();
}

public Task<ResponseBytesAndDataStreams?> ReceiveResponseBytesAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

async Task<T?> ReceiveAsync<T>()
{
await Task.CompletedTask;
Expand Down
33 changes: 33 additions & 0 deletions source/Halibut.Tests/Util/DelayWithoutExceptionTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading;
using System.Threading.Tasks;
using Halibut.Util;
using NUnit.Framework;

namespace Halibut.Tests.Util
{
public class DelayWithoutExceptionTest : BaseTest
{
[Test]
public async Task DelayWithoutException_ShouldNotThrow()
{
using var cts = new CancellationTokenSource();
cts.CancelAfter(10);
await DelayWithoutException.Delay(TimeSpan.FromDays(1), cts.Token);
}
}
}
86 changes: 86 additions & 0 deletions source/Halibut/Queue/QueueMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Halibut.Queue.MessageStreamWrapping;
using Halibut.Transport.Protocol;
using Halibut.Util;
using Newtonsoft.Json;
using Newtonsoft.Json.Bson;

namespace Halibut.Queue
{
Expand All @@ -28,6 +31,49 @@ public QueueMessageSerializer(Func<StreamCapturingJsonSerializer> createStreamCa
this.createStreamCapturingSerializer = createStreamCapturingSerializer;
this.messageStreamWrappers = messageStreamWrappers;
}

public async Task<(byte[], IReadOnlyList<DataStream>)> PrepareMessageForWireTransferAndForQueue<T>(T message)
{
IReadOnlyList<DataStream> dataStreams;

using var ms = new MemoryStream();
Stream stream = ms;
await using (var wrappedStreamDisposables = new DisposableCollection())
{
stream = WrapInMessageSerialisationStreams(messageStreamWrappers, stream, wrappedStreamDisposables);

// TODO instead store

using (var zip = new DeflateStream(stream, CompressionMode.Compress, true))
using (var buf = new BufferedStream(zip))
{

using (var jsonTextWriter = new BsonDataWriter(buf) { CloseOutput = false })
{
var streamCapturingSerializer = createStreamCapturingSerializer();
streamCapturingSerializer.Serializer.Serialize(jsonTextWriter, new MessageEnvelope<T>(message));
dataStreams = streamCapturingSerializer.DataStreams;
}
}
}

return (ms.ToArray(), dataStreams);
}

public async Task<byte[]> ReadBytesForWireTransfer(byte[] dataStoredInRedis)
{
using var ms = new MemoryStream(dataStoredInRedis);
Stream stream = ms;
await using var disposables = new DisposableCollection();
stream = WrapStreamInMessageDeserialisationStreams(messageStreamWrappers, stream, disposables);

using var output = new MemoryStream();

stream.CopyTo(output);

return output.ToArray();
}


public async Task<(byte[], IReadOnlyList<DataStream>)> WriteMessage<T>(T message)
{
Expand Down Expand Up @@ -121,5 +167,45 @@ public MessageEnvelope(T message)

public T Message { get; private set; }
}

public async Task<byte[]> PrepareBytesFromWire(byte[] responseBytes)
{

using var outputStream = new MemoryStream();

Stream wrappedStream = outputStream;
await using (var disposables = new DisposableCollection())
{
wrappedStream = WrapInMessageSerialisationStreams(messageStreamWrappers, wrappedStream, disposables);
await wrappedStream.WriteAsync(responseBytes, CancellationToken.None);
await wrappedStream.FlushAsync();
}

return outputStream.ToArray();
}

public async Task<(T response, IReadOnlyList<DataStream> dataStreams)> ConvertStoredResponseToResponseMessage<T>(byte[] storedMessageMessage)
{
using var ms = new MemoryStream(storedMessageMessage);
Stream stream = ms;
await using var disposables = new DisposableCollection();
stream = WrapStreamInMessageDeserialisationStreams(messageStreamWrappers, stream, disposables);

using var deflateStream = new DeflateStream(stream, CompressionMode.Decompress, true);
using var buf = new BufferedStream(deflateStream);
using (var bson = new BsonDataReader(buf) { CloseInput = false })
{
var streamCapturingSerializer = createStreamCapturingSerializer();
var result = streamCapturingSerializer.Serializer.Deserialize<MessageEnvelope<T>>(bson);

if (result == null)
{
throw new Exception("messageEnvelope is null");
}

return (result.Message, streamCapturingSerializer.DataStreams);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class HeartBeatDrivenDataStreamProgressReporter : IAsyncDisposable, IGetN
{
readonly ImmutableDictionary<Guid, IDataStreamWithFileUploadProgress> dataStreamsToReportProgressOn;

readonly ConcurrentBag<Guid> completedDataStreams = new();
readonly HashSet<Guid> completedDataStreams = new();

HeartBeatDrivenDataStreamProgressReporter(ImmutableDictionary<Guid, IDataStreamWithFileUploadProgress> dataStreamsToReportProgressOn)
{
Expand All @@ -30,7 +30,11 @@ public async Task HeartBeatReceived(HeartBeatMessage heartBeatMessage, Cancellat

foreach (var keyValuePair in heartBeatMessage.DataStreamProgress)
{
if(completedDataStreams.Contains(keyValuePair.Key)) continue;
lock (completedDataStreams)
{
if(completedDataStreams.Contains(keyValuePair.Key)) continue;
}


if (dataStreamsToReportProgressOn.TryGetValue(keyValuePair.Key, out var dataStreamWithTransferProgress))
{
Expand All @@ -39,7 +43,10 @@ public async Task HeartBeatReceived(HeartBeatMessage heartBeatMessage, Cancellat

if (dataStreamWithTransferProgress.Length == keyValuePair.Value)
{
completedDataStreams.Add(keyValuePair.Key);
lock (completedDataStreams)
{
completedDataStreams.Add(keyValuePair.Key);
}
}
}
}
Expand All @@ -57,13 +64,21 @@ public async ValueTask DisposeAsync()
// this object is disposable and on dispose we note that file will no longer be uploading. Which
// for the normal percentage based file transfer progress will result in marking the DataStreams as 100% uploaded.
// If we don't do this at the end of a successful call we may find DataStream progress is reported as less than 100%.
var localCopyCompletedDataStreams = new List<Guid>();

// Because of where this is used, it is hard to be sure this object won't be used while disposing,
// so take a copy of streams we have already completed.
lock (completedDataStreams)
{
localCopyCompletedDataStreams.AddRange(completedDataStreams);
}
foreach (var keyValuePair in dataStreamsToReportProgressOn)
{
if (!completedDataStreams.Contains(keyValuePair.Key))
if (!localCopyCompletedDataStreams.Contains(keyValuePair.Key))
{
var progress = keyValuePair.Value.DataStreamTransferProgress;
// Thus may be called twice if HeartBeats are received while disposing.
await progress.NoLongerUploading(CancellationToken.None);
completedDataStreams.Add(keyValuePair.Key);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Halibut.Util;

namespace Halibut.Queue.Redis.Cancellation
{
Expand All @@ -15,14 +16,15 @@ public DelayBeforeSubscribingToRequestCancellation(TimeSpan delay)

public async Task WaitBeforeHeartBeatSendingOrReceiving(CancellationToken cancellationToken)
{
try
{
await Task.Delay(Delay, cancellationToken);
}
catch
{
// If only Delay had an option to not throw.
}
await DelayWithoutException.Delay(Delay, cancellationToken);
// try
// {
// await DelayWithoutException.Delay(Delay, cancellationToken);
// }
// catch
// {
// // If only Delay had an option to not throw.
// }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async Task WatchForCancellation(
// Also poll to see if the request is cancelled since we can miss the publication.
while (!token.IsCancellationRequested)
{
await Try.IgnoringError(async () => await Task.Delay(TimeSpan.FromSeconds(60), token));
await DelayWithoutException.Delay(TimeSpan.FromSeconds(60), token);

if(token.IsCancellationRequested) return;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Threading;
using System.Threading.Tasks;
using Halibut.Queue.QueuedDataStreams;
using Halibut.Queue.Redis.RedisHelpers;
using Halibut.Transport.Protocol;

namespace Halibut.Queue.Redis.MessageStorage
Expand All @@ -18,8 +17,8 @@ namespace Halibut.Queue.Redis.MessageStorage
public interface IMessageSerialiserAndDataStreamStorage
{
Task<(RedisStoredMessage, HeartBeatDrivenDataStreamProgressReporter)> PrepareRequest(RequestMessage request, CancellationToken cancellationToken);
Task<(RequestMessage, RequestDataStreamsTransferProgress)> ReadRequest(RedisStoredMessage jsonRequest, CancellationToken cancellationToken);
Task<RedisStoredMessage> PrepareResponse(ResponseMessage response, CancellationToken cancellationToken);
Task<ResponseMessage> ReadResponse(RedisStoredMessage jsonResponse, CancellationToken cancellationToken);
Task<(PreparedRequestMessage, RequestDataStreamsTransferProgress)> ReadRequest(RedisStoredMessage jsonRequest, CancellationToken cancellationToken);
Task<RedisStoredMessage> PrepareResponseForStorageInRedis(Guid activityId, ResponseBytesAndDataStreams response, CancellationToken cancellationToken);
Task<ResponseMessage> ReadResponseFromRedisStoredMessage(RedisStoredMessage jsonResponse, CancellationToken cancellationToken);
}
}
Loading