From f2981ff3533907f7ef41044d343a658812980aa1 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 6 Nov 2025 17:23:20 +0800 Subject: [PATCH 1/4] Support producer flush --- src/Pulsar.Client/Api/IProducer.fs | 2 + .../Internal/PartitionedProducerImpl.fs | 12 ++ src/Pulsar.Client/Internal/ProducerImpl.fs | 41 ++++++ tests/IntegrationTests/Flush.fs | 124 ++++++++++++++++++ .../IntegrationTests/IntegrationTests.fsproj | 1 + 5 files changed, 180 insertions(+) create mode 100644 tests/IntegrationTests/Flush.fs diff --git a/src/Pulsar.Client/Api/IProducer.fs b/src/Pulsar.Client/Api/IProducer.fs index c0aa8183..4897c4c4 100644 --- a/src/Pulsar.Client/Api/IProducer.fs +++ b/src/Pulsar.Client/Api/IProducer.fs @@ -75,3 +75,5 @@ type IProducer<'T> = abstract member LastDisconnectedTimestamp: unit -> Task /// Return true if the consumer is connected to the broker abstract member IsConnected: unit -> Task + /// Flush all pending messages and wait for their acknowledgements + abstract member FlushAsync: unit -> Task diff --git a/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs b/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs index 24ed2be1..3a7063ec 100644 --- a/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs +++ b/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs @@ -385,6 +385,18 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi member this.IsConnected() = postAndAsyncReply mb IsConnected + member this.FlushAsync() = + backgroundTask { + // Flush all partition producers + let flushTasks = + producers + |> Seq.map (fun producer -> producer.FlushAsync()) + |> Seq.toArray + if flushTasks.Length > 0 then + let! _ = Task.WhenAll(flushTasks) + () + } + interface IAsyncDisposable with member this.DisposeAsync() = diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index f964226a..990324e4 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -40,6 +40,7 @@ type internal ProducerMessage<'T> = | Close of TaskCompletionSource> | Tick of ProducerTickType | GetStats of TaskCompletionSource + | Flush of TaskCompletionSource type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, clientConfig: PulsarClientConfiguration, connectionPool: ConnectionPool, partitionIndex: int, lookup: ILookupService, schema: ISchema<'T>, @@ -754,6 +755,42 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c | ProducerMessage.GetStats channel -> channel.SetResult <| stats.GetStats() + | ProducerMessage.Flush channel -> + Log.Logger.LogDebug("{0} Flush requested, pendingMessages count: {1}", prefix, pendingMessages.Count) + // First, send all batched messages + batchMessageAndSend() + + // If pendingMessages queue is empty, return immediately + if pendingMessages.Count = 0 then + Log.Logger.LogDebug("{0} Flush completed immediately, no pending messages", prefix) + channel.SetResult() + else + // Get the last element from the queue by iterating through it + let mutable lastMessage = Unchecked.defaultof> + for msg in pendingMessages do + lastMessage <- msg + + Log.Logger.LogDebug("{0} Flush waiting for last message callback, sequenceId: {1}", prefix, %lastMessage.SequenceId) + // Wait for the last message's callback to complete asynchronously + backgroundTask { + match lastMessage.Callback with + | SingleCallback (_, _, tcsOption) -> + match tcsOption with + | Some tcs -> + let! _ = tcs.Task + () + | None -> () + | BatchCallbacks batchCallbacks -> + // Wait for all TaskCompletionSource in the batch + let tasks = + batchCallbacks + |> Array.choose (fun struct(_, _, tcsOption) -> + tcsOption |> Option.map (fun tcs -> tcs.Task :> Task)) + if tasks.Length > 0 then + do! Task.WhenAll(tasks) + channel.SetResult() + } |> ignore + | ProducerMessage.Close channel -> match connectionHandler.ConnectionState with @@ -941,6 +978,10 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c | Ready _ -> trueTask | _ -> falseTask + member this.FlushAsync() = + connectionHandler.CheckIfActive() |> throwIfNotNull + postAndAsyncReply mb (fun channel -> ProducerMessage.Flush channel) + interface IAsyncDisposable with member this.DisposeAsync() = diff --git a/tests/IntegrationTests/Flush.fs b/tests/IntegrationTests/Flush.fs new file mode 100644 index 00000000..ad3634df --- /dev/null +++ b/tests/IntegrationTests/Flush.fs @@ -0,0 +1,124 @@ +module Pulsar.Client.IntegrationTests.Flush + +open System +open System.Collections.Generic +open System.Text +open System.Threading +open Expecto +open Pulsar.Client.Api +open Pulsar.Client.Common +open Serilog +open Pulsar.Client.IntegrationTests.Common +open Microsoft.Extensions.Logging + + +let testMessageOrderAndDuplicates (messageSet: HashSet) (receivedMessage: string) (expectedMessage: string) = + if messageSet.Contains(receivedMessage) then + failwith <| sprintf "Duplicate message received: %s" receivedMessage + messageSet.Add(receivedMessage) |> ignore + if receivedMessage <> expectedMessage then + failwith <| sprintf "Incorrect message order. Expected: %s, Received: %s" expectedMessage receivedMessage + +[] +let tests = + + testList "Flush" [ + + testTask "Flush with batch enabled" { + let loggerFactory = + LoggerFactory.Create(fun builder -> + builder + .SetMinimumLevel(LogLevel.Debug) + .AddConsole() |> ignore + ) + PulsarClient.Logger <- loggerFactory.CreateLogger("PulsarLogger") + Log.Debug("-- Starting testFlushBatchEnabled test --") + let client = getClient() + let topicName = "persistent://public/default/test-flush-batch-enabled-" + Guid.NewGuid().ToString("N") + + let! (consumer : IConsumer) = + client.NewConsumer() + .Topic(topicName) + .SubscriptionName("my-subscriber-name") + .SubscribeAsync() + + let! (producer : IProducer) = + client.NewProducer() + .Topic(topicName) + .EnableBatching(true) + .BatchingMaxPublishDelay(TimeSpan.FromHours(1.0)) + .BatchingMaxMessages(10000) + .CreateAsync() + + // Send 10 messages asynchronously without waiting + for i in 0..9 do + let message = sprintf "my-message-%i" i + producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore + + // Flush to ensure all messages are sent and acknowledged + do! producer.FlushAsync() + + // Dispose producer + do! (producer :> IAsyncDisposable).DisposeAsync().AsTask() + + // Receive and verify messages + let messageSet = HashSet() + let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0)) + + for i in 0..9 do + let! (msg : Message) = consumer.ReceiveAsync(cts.Token) + let receivedMessage = Encoding.UTF8.GetString(msg.GetValue()) + Log.Debug("Received message: [{0}]", receivedMessage) + let expectedMessage = sprintf "my-message-%i" i + testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage + + do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask() + + Log.Debug("-- Exiting testFlushBatchEnabled test --") + } + + testTask "Flush with batch disabled" { + Log.Debug("-- Starting testFlushBatchDisabled test --") + let client = getClient() + let topicName = "persistent://public/default/test-flush-batch-disabled-" + Guid.NewGuid().ToString("N") + + let! (consumer : IConsumer) = + client.NewConsumer() + .Topic(topicName) + .SubscriptionName("my-subscriber-name") + .SubscribeAsync() + + let! (producer : IProducer) = + client.NewProducer() + .Topic(topicName) + .EnableBatching(false) + .CreateAsync() + + // Send 10 messages asynchronously without waiting + for i in 0..9 do + let message = sprintf "my-message-%i" i + producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore + + // Flush to ensure all messages are sent and acknowledged + do! producer.FlushAsync() + + // Dispose producer + do! (producer :> IAsyncDisposable).DisposeAsync().AsTask() + + // Receive and verify messages + let messageSet = HashSet() + let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0)) + + for i in 0..9 do + let! (msg : Message) = consumer.ReceiveAsync(cts.Token) + let receivedMessage = Encoding.UTF8.GetString(msg.GetValue()) + Log.Debug("Received message: [{0}]", receivedMessage) + let expectedMessage = sprintf "my-message-%i" i + testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage + + do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask() + + Log.Debug("-- Exiting testFlushBatchDisabled test --") + } + ] + diff --git a/tests/IntegrationTests/IntegrationTests.fsproj b/tests/IntegrationTests/IntegrationTests.fsproj index 3716434d..ef5e005f 100755 --- a/tests/IntegrationTests/IntegrationTests.fsproj +++ b/tests/IntegrationTests/IntegrationTests.fsproj @@ -36,6 +36,7 @@ + From 77e866df115ed6aa038843671ba1aaf1fecc16a3 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 6 Nov 2025 17:27:52 +0800 Subject: [PATCH 2/4] Cleanup codes --- tests/IntegrationTests/Flush.fs | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/tests/IntegrationTests/Flush.fs b/tests/IntegrationTests/Flush.fs index ad3634df..c89607f1 100644 --- a/tests/IntegrationTests/Flush.fs +++ b/tests/IntegrationTests/Flush.fs @@ -9,15 +9,14 @@ open Pulsar.Client.Api open Pulsar.Client.Common open Serilog open Pulsar.Client.IntegrationTests.Common -open Microsoft.Extensions.Logging let testMessageOrderAndDuplicates (messageSet: HashSet) (receivedMessage: string) (expectedMessage: string) = if messageSet.Contains(receivedMessage) then - failwith <| sprintf "Duplicate message received: %s" receivedMessage + failwith $"Duplicate message received: {receivedMessage}" messageSet.Add(receivedMessage) |> ignore if receivedMessage <> expectedMessage then - failwith <| sprintf "Incorrect message order. Expected: %s, Received: %s" expectedMessage receivedMessage + failwith $"Incorrect message order. Expected: {expectedMessage}, Received: {receivedMessage}" [] let tests = @@ -25,14 +24,7 @@ let tests = testList "Flush" [ testTask "Flush with batch enabled" { - let loggerFactory = - LoggerFactory.Create(fun builder -> - builder - .SetMinimumLevel(LogLevel.Debug) - .AddConsole() |> ignore - ) - PulsarClient.Logger <- loggerFactory.CreateLogger("PulsarLogger") - Log.Debug("-- Starting testFlushBatchEnabled test --") + Log.Debug("Started Flush with batch enabled") let client = getClient() let topicName = "persistent://public/default/test-flush-batch-enabled-" + Guid.NewGuid().ToString("N") @@ -52,7 +44,7 @@ let tests = // Send 10 messages asynchronously without waiting for i in 0..9 do - let message = sprintf "my-message-%i" i + let message = $"my-message-{i}" producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore // Flush to ensure all messages are sent and acknowledged @@ -69,16 +61,16 @@ let tests = let! (msg : Message) = consumer.ReceiveAsync(cts.Token) let receivedMessage = Encoding.UTF8.GetString(msg.GetValue()) Log.Debug("Received message: [{0}]", receivedMessage) - let expectedMessage = sprintf "my-message-%i" i + let expectedMessage = $"my-message-{i}" testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask() - Log.Debug("-- Exiting testFlushBatchEnabled test --") + Log.Debug("Finished Started Flush with batch enabled") } testTask "Flush with batch disabled" { - Log.Debug("-- Starting testFlushBatchDisabled test --") + Log.Debug("Started Flush with batch disabled") let client = getClient() let topicName = "persistent://public/default/test-flush-batch-disabled-" + Guid.NewGuid().ToString("N") @@ -96,7 +88,7 @@ let tests = // Send 10 messages asynchronously without waiting for i in 0..9 do - let message = sprintf "my-message-%i" i + let message = $"my-message-{i}" producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore // Flush to ensure all messages are sent and acknowledged @@ -113,12 +105,12 @@ let tests = let! (msg : Message) = consumer.ReceiveAsync(cts.Token) let receivedMessage = Encoding.UTF8.GetString(msg.GetValue()) Log.Debug("Received message: [{0}]", receivedMessage) - let expectedMessage = sprintf "my-message-%i" i + let expectedMessage = $"my-message-{i}" testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask() - Log.Debug("-- Exiting testFlushBatchDisabled test --") + Log.Debug("Finished Flush with batch disabled") } ] From 4e82455cb7a9f5ac37a586c75522baff415309a2 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 7 Nov 2025 17:29:21 +0800 Subject: [PATCH 3/4] Fix flush won't be failed when the callback failed and improve the code and logs --- src/Pulsar.Client/Internal/ProducerImpl.fs | 45 ++++++++++++---------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index 990324e4..968be9d7 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -765,30 +765,35 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c Log.Logger.LogDebug("{0} Flush completed immediately, no pending messages", prefix) channel.SetResult() else - // Get the last element from the queue by iterating through it - let mutable lastMessage = Unchecked.defaultof> + // Get the last message from the queue + let mutable lastMessage = pendingMessages |> Seq.last for msg in pendingMessages do lastMessage <- msg - Log.Logger.LogDebug("{0} Flush waiting for last message callback, sequenceId: {1}", prefix, %lastMessage.SequenceId) + Log.Logger.LogDebug("{0} Flush waiting for last message callback, last sequenceId: {1}", prefix, %lastMessage.SequenceId) // Wait for the last message's callback to complete asynchronously backgroundTask { - match lastMessage.Callback with - | SingleCallback (_, _, tcsOption) -> - match tcsOption with - | Some tcs -> - let! _ = tcs.Task - () - | None -> () - | BatchCallbacks batchCallbacks -> - // Wait for all TaskCompletionSource in the batch - let tasks = - batchCallbacks - |> Array.choose (fun struct(_, _, tcsOption) -> - tcsOption |> Option.map (fun tcs -> tcs.Task :> Task)) - if tasks.Length > 0 then - do! Task.WhenAll(tasks) - channel.SetResult() + try + match lastMessage.Callback with + | SingleCallback (_, _, tcsOption) -> + match tcsOption with + | Some tcs -> + let! _ = tcs.Task + () + | None -> () + | BatchCallbacks batchCallbacks -> + // Wait for all TaskCompletionSource in the batch + let tasks = + batchCallbacks + |> Array.choose (fun struct(_, _, tcsOption) -> + tcsOption |> Option.map (fun tcs -> tcs.Task :> Task)) + if tasks.Length > 0 then + do! Task.WhenAll(tasks) + Log.Logger.LogDebug("{0} Flush completed, last sequenceId: {1}", prefix, %lastMessage.SequenceId) + channel.SetResult() + with Flatten ex -> + Log.Logger.LogError(ex, "{0} Flush failed, last sequenceId: {1}", prefix, %lastMessage.SequenceId) + channel.SetException ex } |> ignore | ProducerMessage.Close channel -> @@ -980,7 +985,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c member this.FlushAsync() = connectionHandler.CheckIfActive() |> throwIfNotNull - postAndAsyncReply mb (fun channel -> ProducerMessage.Flush channel) + postAndAsyncReply mb ProducerMessage.Flush interface IAsyncDisposable with From 5dc51d029d43c3826da730082da8fdba3a81fcee Mon Sep 17 00:00:00 2001 From: Vladimir Shchur Date: Sat, 8 Nov 2025 11:31:22 -0800 Subject: [PATCH 4/4] Review comments --- src/Pulsar.Client/Internal/ProducerImpl.fs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index 968be9d7..e6ff787a 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -766,9 +766,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c channel.SetResult() else // Get the last message from the queue - let mutable lastMessage = pendingMessages |> Seq.last - for msg in pendingMessages do - lastMessage <- msg + let lastMessage = pendingMessages |> Seq.last Log.Logger.LogDebug("{0} Flush waiting for last message callback, last sequenceId: {1}", prefix, %lastMessage.SequenceId) // Wait for the last message's callback to complete asynchronously