diff --git a/src/Pulsar.Client/Api/IProducer.fs b/src/Pulsar.Client/Api/IProducer.fs index c0aa8183..4897c4c4 100644 --- a/src/Pulsar.Client/Api/IProducer.fs +++ b/src/Pulsar.Client/Api/IProducer.fs @@ -75,3 +75,5 @@ type IProducer<'T> = abstract member LastDisconnectedTimestamp: unit -> Task /// Return true if the consumer is connected to the broker abstract member IsConnected: unit -> Task + /// Flush all pending messages and wait for their acknowledgements + abstract member FlushAsync: unit -> Task diff --git a/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs b/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs index 24ed2be1..3a7063ec 100644 --- a/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs +++ b/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs @@ -385,6 +385,18 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi member this.IsConnected() = postAndAsyncReply mb IsConnected + member this.FlushAsync() = + backgroundTask { + // Flush all partition producers + let flushTasks = + producers + |> Seq.map (fun producer -> producer.FlushAsync()) + |> Seq.toArray + if flushTasks.Length > 0 then + let! _ = Task.WhenAll(flushTasks) + () + } + interface IAsyncDisposable with member this.DisposeAsync() = diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index f964226a..e6ff787a 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -40,6 +40,7 @@ type internal ProducerMessage<'T> = | Close of TaskCompletionSource> | Tick of ProducerTickType | GetStats of TaskCompletionSource + | Flush of TaskCompletionSource type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, clientConfig: PulsarClientConfiguration, connectionPool: ConnectionPool, partitionIndex: int, lookup: ILookupService, schema: ISchema<'T>, @@ -754,6 +755,45 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c | ProducerMessage.GetStats channel -> channel.SetResult <| stats.GetStats() + | ProducerMessage.Flush channel -> + Log.Logger.LogDebug("{0} Flush requested, pendingMessages count: {1}", prefix, pendingMessages.Count) + // First, send all batched messages + batchMessageAndSend() + + // If pendingMessages queue is empty, return immediately + if pendingMessages.Count = 0 then + Log.Logger.LogDebug("{0} Flush completed immediately, no pending messages", prefix) + channel.SetResult() + else + // Get the last message from the queue + let lastMessage = pendingMessages |> Seq.last + + Log.Logger.LogDebug("{0} Flush waiting for last message callback, last sequenceId: {1}", prefix, %lastMessage.SequenceId) + // Wait for the last message's callback to complete asynchronously + backgroundTask { + try + match lastMessage.Callback with + | SingleCallback (_, _, tcsOption) -> + match tcsOption with + | Some tcs -> + let! _ = tcs.Task + () + | None -> () + | BatchCallbacks batchCallbacks -> + // Wait for all TaskCompletionSource in the batch + let tasks = + batchCallbacks + |> Array.choose (fun struct(_, _, tcsOption) -> + tcsOption |> Option.map (fun tcs -> tcs.Task :> Task)) + if tasks.Length > 0 then + do! Task.WhenAll(tasks) + Log.Logger.LogDebug("{0} Flush completed, last sequenceId: {1}", prefix, %lastMessage.SequenceId) + channel.SetResult() + with Flatten ex -> + Log.Logger.LogError(ex, "{0} Flush failed, last sequenceId: {1}", prefix, %lastMessage.SequenceId) + channel.SetException ex + } |> ignore + | ProducerMessage.Close channel -> match connectionHandler.ConnectionState with @@ -941,6 +981,10 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c | Ready _ -> trueTask | _ -> falseTask + member this.FlushAsync() = + connectionHandler.CheckIfActive() |> throwIfNotNull + postAndAsyncReply mb ProducerMessage.Flush + interface IAsyncDisposable with member this.DisposeAsync() = diff --git a/tests/IntegrationTests/Flush.fs b/tests/IntegrationTests/Flush.fs new file mode 100644 index 00000000..c89607f1 --- /dev/null +++ b/tests/IntegrationTests/Flush.fs @@ -0,0 +1,116 @@ +module Pulsar.Client.IntegrationTests.Flush + +open System +open System.Collections.Generic +open System.Text +open System.Threading +open Expecto +open Pulsar.Client.Api +open Pulsar.Client.Common +open Serilog +open Pulsar.Client.IntegrationTests.Common + + +let testMessageOrderAndDuplicates (messageSet: HashSet) (receivedMessage: string) (expectedMessage: string) = + if messageSet.Contains(receivedMessage) then + failwith $"Duplicate message received: {receivedMessage}" + messageSet.Add(receivedMessage) |> ignore + if receivedMessage <> expectedMessage then + failwith $"Incorrect message order. Expected: {expectedMessage}, Received: {receivedMessage}" + +[] +let tests = + + testList "Flush" [ + + testTask "Flush with batch enabled" { + Log.Debug("Started Flush with batch enabled") + let client = getClient() + let topicName = "persistent://public/default/test-flush-batch-enabled-" + Guid.NewGuid().ToString("N") + + let! (consumer : IConsumer) = + client.NewConsumer() + .Topic(topicName) + .SubscriptionName("my-subscriber-name") + .SubscribeAsync() + + let! (producer : IProducer) = + client.NewProducer() + .Topic(topicName) + .EnableBatching(true) + .BatchingMaxPublishDelay(TimeSpan.FromHours(1.0)) + .BatchingMaxMessages(10000) + .CreateAsync() + + // Send 10 messages asynchronously without waiting + for i in 0..9 do + let message = $"my-message-{i}" + producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore + + // Flush to ensure all messages are sent and acknowledged + do! producer.FlushAsync() + + // Dispose producer + do! (producer :> IAsyncDisposable).DisposeAsync().AsTask() + + // Receive and verify messages + let messageSet = HashSet() + let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0)) + + for i in 0..9 do + let! (msg : Message) = consumer.ReceiveAsync(cts.Token) + let receivedMessage = Encoding.UTF8.GetString(msg.GetValue()) + Log.Debug("Received message: [{0}]", receivedMessage) + let expectedMessage = $"my-message-{i}" + testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage + + do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask() + + Log.Debug("Finished Started Flush with batch enabled") + } + + testTask "Flush with batch disabled" { + Log.Debug("Started Flush with batch disabled") + let client = getClient() + let topicName = "persistent://public/default/test-flush-batch-disabled-" + Guid.NewGuid().ToString("N") + + let! (consumer : IConsumer) = + client.NewConsumer() + .Topic(topicName) + .SubscriptionName("my-subscriber-name") + .SubscribeAsync() + + let! (producer : IProducer) = + client.NewProducer() + .Topic(topicName) + .EnableBatching(false) + .CreateAsync() + + // Send 10 messages asynchronously without waiting + for i in 0..9 do + let message = $"my-message-{i}" + producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore + + // Flush to ensure all messages are sent and acknowledged + do! producer.FlushAsync() + + // Dispose producer + do! (producer :> IAsyncDisposable).DisposeAsync().AsTask() + + // Receive and verify messages + let messageSet = HashSet() + let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0)) + + for i in 0..9 do + let! (msg : Message) = consumer.ReceiveAsync(cts.Token) + let receivedMessage = Encoding.UTF8.GetString(msg.GetValue()) + Log.Debug("Received message: [{0}]", receivedMessage) + let expectedMessage = $"my-message-{i}" + testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage + + do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask() + + Log.Debug("Finished Flush with batch disabled") + } + ] + diff --git a/tests/IntegrationTests/IntegrationTests.fsproj b/tests/IntegrationTests/IntegrationTests.fsproj index 3716434d..ef5e005f 100755 --- a/tests/IntegrationTests/IntegrationTests.fsproj +++ b/tests/IntegrationTests/IntegrationTests.fsproj @@ -36,6 +36,7 @@ +