Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/IProducer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ type IProducer<'T> =
abstract member LastDisconnectedTimestamp: unit -> Task<TimeStamp>
/// Return true if the consumer is connected to the broker
abstract member IsConnected: unit -> Task<bool>
/// Flush all pending messages and wait for their acknowledgements
abstract member FlushAsync: unit -> Task<unit>
12 changes: 12 additions & 0 deletions src/Pulsar.Client/Internal/PartitionedProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,18 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi

member this.IsConnected() = postAndAsyncReply mb IsConnected

member this.FlushAsync() =
backgroundTask {
// Flush all partition producers
let flushTasks =
producers
|> Seq.map (fun producer -> producer.FlushAsync())
|> Seq.toArray
if flushTasks.Length > 0 then
let! _ = Task.WhenAll(flushTasks)
()
}


interface IAsyncDisposable with
member this.DisposeAsync() =
Expand Down
44 changes: 44 additions & 0 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type internal ProducerMessage<'T> =
| Close of TaskCompletionSource<ResultOrException<unit>>
| Tick of ProducerTickType
| GetStats of TaskCompletionSource<ProducerStats>
| Flush of TaskCompletionSource<unit>

type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, clientConfig: PulsarClientConfiguration, connectionPool: ConnectionPool,
partitionIndex: int, lookup: ILookupService, schema: ISchema<'T>,
Expand Down Expand Up @@ -754,6 +755,45 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
| ProducerMessage.GetStats channel ->
channel.SetResult <| stats.GetStats()

| ProducerMessage.Flush channel ->
Log.Logger.LogDebug("{0} Flush requested, pendingMessages count: {1}", prefix, pendingMessages.Count)
// First, send all batched messages
batchMessageAndSend()

// If pendingMessages queue is empty, return immediately
if pendingMessages.Count = 0 then
Log.Logger.LogDebug("{0} Flush completed immediately, no pending messages", prefix)
channel.SetResult()
else
// Get the last message from the queue
let lastMessage = pendingMessages |> Seq.last

Log.Logger.LogDebug("{0} Flush waiting for last message callback, last sequenceId: {1}", prefix, %lastMessage.SequenceId)
// Wait for the last message's callback to complete asynchronously
backgroundTask {
try
match lastMessage.Callback with
| SingleCallback (_, _, tcsOption) ->
match tcsOption with
| Some tcs ->
let! _ = tcs.Task
()
| None -> ()
| BatchCallbacks batchCallbacks ->
// Wait for all TaskCompletionSource in the batch
let tasks =
batchCallbacks
|> Array.choose (fun struct(_, _, tcsOption) ->
tcsOption |> Option.map (fun tcs -> tcs.Task :> Task))
if tasks.Length > 0 then
do! Task.WhenAll(tasks)
Log.Logger.LogDebug("{0} Flush completed, last sequenceId: {1}", prefix, %lastMessage.SequenceId)
channel.SetResult()
with Flatten ex ->
Log.Logger.LogError(ex, "{0} Flush failed, last sequenceId: {1}", prefix, %lastMessage.SequenceId)
channel.SetException ex
} |> ignore

| ProducerMessage.Close channel ->

match connectionHandler.ConnectionState with
Expand Down Expand Up @@ -941,6 +981,10 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
| Ready _ -> trueTask
| _ -> falseTask

member this.FlushAsync() =
connectionHandler.CheckIfActive() |> throwIfNotNull
postAndAsyncReply mb ProducerMessage.Flush

interface IAsyncDisposable with

member this.DisposeAsync() =
Expand Down
116 changes: 116 additions & 0 deletions tests/IntegrationTests/Flush.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
module Pulsar.Client.IntegrationTests.Flush

open System
open System.Collections.Generic
open System.Text
open System.Threading
open Expecto
open Pulsar.Client.Api
open Pulsar.Client.Common
open Serilog
open Pulsar.Client.IntegrationTests.Common


let testMessageOrderAndDuplicates (messageSet: HashSet<string>) (receivedMessage: string) (expectedMessage: string) =
if messageSet.Contains(receivedMessage) then
failwith $"Duplicate message received: {receivedMessage}"
messageSet.Add(receivedMessage) |> ignore
if receivedMessage <> expectedMessage then
failwith $"Incorrect message order. Expected: {expectedMessage}, Received: {receivedMessage}"

[<Tests>]
let tests =

testList "Flush" [

testTask "Flush with batch enabled" {
Log.Debug("Started Flush with batch enabled")
let client = getClient()
let topicName = "persistent://public/default/test-flush-batch-enabled-" + Guid.NewGuid().ToString("N")

let! (consumer : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.SubscriptionName("my-subscriber-name")
.SubscribeAsync()

let! (producer : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName)
.EnableBatching(true)
.BatchingMaxPublishDelay(TimeSpan.FromHours(1.0))
.BatchingMaxMessages(10000)
.CreateAsync()

// Send 10 messages asynchronously without waiting
for i in 0..9 do
let message = $"my-message-{i}"
producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore

// Flush to ensure all messages are sent and acknowledged
do! producer.FlushAsync()

// Dispose producer
do! (producer :> IAsyncDisposable).DisposeAsync().AsTask()

// Receive and verify messages
let messageSet = HashSet<string>()
let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0))

for i in 0..9 do
let! (msg : Message<byte[]>) = consumer.ReceiveAsync(cts.Token)
let receivedMessage = Encoding.UTF8.GetString(msg.GetValue())
Log.Debug("Received message: [{0}]", receivedMessage)
let expectedMessage = $"my-message-{i}"
testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage

do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask()

Log.Debug("Finished Started Flush with batch enabled")
}

testTask "Flush with batch disabled" {
Log.Debug("Started Flush with batch disabled")
let client = getClient()
let topicName = "persistent://public/default/test-flush-batch-disabled-" + Guid.NewGuid().ToString("N")

let! (consumer : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.SubscriptionName("my-subscriber-name")
.SubscribeAsync()

let! (producer : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName)
.EnableBatching(false)
.CreateAsync()

// Send 10 messages asynchronously without waiting
for i in 0..9 do
let message = $"my-message-{i}"
producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore

// Flush to ensure all messages are sent and acknowledged
do! producer.FlushAsync()

// Dispose producer
do! (producer :> IAsyncDisposable).DisposeAsync().AsTask()

// Receive and verify messages
let messageSet = HashSet<string>()
let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0))

for i in 0..9 do
let! (msg : Message<byte[]>) = consumer.ReceiveAsync(cts.Token)
let receivedMessage = Encoding.UTF8.GetString(msg.GetValue())
Log.Debug("Received message: [{0}]", receivedMessage)
let expectedMessage = $"my-message-{i}"
testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage

do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask()

Log.Debug("Finished Flush with batch disabled")
}
]

1 change: 1 addition & 0 deletions tests/IntegrationTests/IntegrationTests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<Compile Include="Failover.fs" />
<Compile Include="Transaction.fs" />
<Compile Include="MessageCrypto.fs" />
<Compile Include="Flush.fs" />
<Compile Include="HttpLookupService.fs" />
<Compile Include="Main.fs" />
</ItemGroup>
Expand Down
Loading