Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/IProducer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ type IProducer<'T> =
abstract member LastDisconnectedTimestamp: unit -> Task<TimeStamp>
/// Return true if the consumer is connected to the broker
abstract member IsConnected: unit -> Task<bool>
/// Flush all pending messages and wait for their acknowledgements
abstract member FlushAsync: unit -> Task<unit>
12 changes: 12 additions & 0 deletions src/Pulsar.Client/Internal/PartitionedProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,18 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi

member this.IsConnected() = postAndAsyncReply mb IsConnected

member this.FlushAsync() =
backgroundTask {
// Flush all partition producers
let flushTasks =
producers
|> Seq.map (fun producer -> producer.FlushAsync())
|> Seq.toArray
if flushTasks.Length > 0 then
let! _ = Task.WhenAll(flushTasks)
()
}


interface IAsyncDisposable with
member this.DisposeAsync() =
Expand Down
41 changes: 41 additions & 0 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type internal ProducerMessage<'T> =
| Close of TaskCompletionSource<ResultOrException<unit>>
| Tick of ProducerTickType
| GetStats of TaskCompletionSource<ProducerStats>
| Flush of TaskCompletionSource<unit>

type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, clientConfig: PulsarClientConfiguration, connectionPool: ConnectionPool,
partitionIndex: int, lookup: ILookupService, schema: ISchema<'T>,
Expand Down Expand Up @@ -754,6 +755,42 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
| ProducerMessage.GetStats channel ->
channel.SetResult <| stats.GetStats()

| ProducerMessage.Flush channel ->
Log.Logger.LogDebug("{0} Flush requested, pendingMessages count: {1}", prefix, pendingMessages.Count)
// First, send all batched messages
batchMessageAndSend()

// If pendingMessages queue is empty, return immediately
if pendingMessages.Count = 0 then
Log.Logger.LogDebug("{0} Flush completed immediately, no pending messages", prefix)
channel.SetResult()
else
// Get the last element from the queue by iterating through it
let mutable lastMessage = Unchecked.defaultof<PendingMessage<'T>>
for msg in pendingMessages do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole loop is not needed anymore

lastMessage <- msg

Log.Logger.LogDebug("{0} Flush waiting for last message callback, sequenceId: {1}", prefix, %lastMessage.SequenceId)
// Wait for the last message's callback to complete asynchronously
backgroundTask {
match lastMessage.Callback with
| SingleCallback (_, _, tcsOption) ->
match tcsOption with
| Some tcs ->
let! _ = tcs.Task
()
| None -> ()
| BatchCallbacks batchCallbacks ->
// Wait for all TaskCompletionSource in the batch
let tasks =
batchCallbacks
|> Array.choose (fun struct(_, _, tcsOption) ->
tcsOption |> Option.map (fun tcs -> tcs.Task :> Task))
if tasks.Length > 0 then
do! Task.WhenAll(tasks)
channel.SetResult()
} |> ignore

| ProducerMessage.Close channel ->

match connectionHandler.ConnectionState with
Expand Down Expand Up @@ -941,6 +978,10 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
| Ready _ -> trueTask
| _ -> falseTask

member this.FlushAsync() =
connectionHandler.CheckIfActive() |> throwIfNotNull
postAndAsyncReply mb (fun channel -> ProducerMessage.Flush channel)

interface IAsyncDisposable with

member this.DisposeAsync() =
Expand Down
116 changes: 116 additions & 0 deletions tests/IntegrationTests/Flush.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
module Pulsar.Client.IntegrationTests.Flush

open System
open System.Collections.Generic
open System.Text
open System.Threading
open Expecto
open Pulsar.Client.Api
open Pulsar.Client.Common
open Serilog
open Pulsar.Client.IntegrationTests.Common


let testMessageOrderAndDuplicates (messageSet: HashSet<string>) (receivedMessage: string) (expectedMessage: string) =
if messageSet.Contains(receivedMessage) then
failwith $"Duplicate message received: {receivedMessage}"
messageSet.Add(receivedMessage) |> ignore
if receivedMessage <> expectedMessage then
failwith $"Incorrect message order. Expected: {expectedMessage}, Received: {receivedMessage}"

[<Tests>]
let tests =

testList "Flush" [

testTask "Flush with batch enabled" {
Log.Debug("Started Flush with batch enabled")
let client = getClient()
let topicName = "persistent://public/default/test-flush-batch-enabled-" + Guid.NewGuid().ToString("N")

let! (consumer : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.SubscriptionName("my-subscriber-name")
.SubscribeAsync()

let! (producer : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName)
.EnableBatching(true)
.BatchingMaxPublishDelay(TimeSpan.FromHours(1.0))
.BatchingMaxMessages(10000)
.CreateAsync()

// Send 10 messages asynchronously without waiting
for i in 0..9 do
let message = $"my-message-{i}"
producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore

// Flush to ensure all messages are sent and acknowledged
do! producer.FlushAsync()

// Dispose producer
do! (producer :> IAsyncDisposable).DisposeAsync().AsTask()

// Receive and verify messages
let messageSet = HashSet<string>()
let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0))

for i in 0..9 do
let! (msg : Message<byte[]>) = consumer.ReceiveAsync(cts.Token)
let receivedMessage = Encoding.UTF8.GetString(msg.GetValue())
Log.Debug("Received message: [{0}]", receivedMessage)
let expectedMessage = $"my-message-{i}"
testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage

do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask()

Log.Debug("Finished Started Flush with batch enabled")
}

testTask "Flush with batch disabled" {
Log.Debug("Started Flush with batch disabled")
let client = getClient()
let topicName = "persistent://public/default/test-flush-batch-disabled-" + Guid.NewGuid().ToString("N")

let! (consumer : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.SubscriptionName("my-subscriber-name")
.SubscribeAsync()

let! (producer : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName)
.EnableBatching(false)
.CreateAsync()

// Send 10 messages asynchronously without waiting
for i in 0..9 do
let message = $"my-message-{i}"
producer.SendAsync(Encoding.UTF8.GetBytes(message)) |> ignore

// Flush to ensure all messages are sent and acknowledged
do! producer.FlushAsync()

// Dispose producer
do! (producer :> IAsyncDisposable).DisposeAsync().AsTask()

// Receive and verify messages
let messageSet = HashSet<string>()
let cts = new CancellationTokenSource(TimeSpan.FromSeconds(5.0))

for i in 0..9 do
let! (msg : Message<byte[]>) = consumer.ReceiveAsync(cts.Token)
let receivedMessage = Encoding.UTF8.GetString(msg.GetValue())
Log.Debug("Received message: [{0}]", receivedMessage)
let expectedMessage = $"my-message-{i}"
testMessageOrderAndDuplicates messageSet receivedMessage expectedMessage

do! (consumer :> IAsyncDisposable).DisposeAsync().AsTask()

Log.Debug("Finished Flush with batch disabled")
}
]

1 change: 1 addition & 0 deletions tests/IntegrationTests/IntegrationTests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<Compile Include="Failover.fs" />
<Compile Include="Transaction.fs" />
<Compile Include="MessageCrypto.fs" />
<Compile Include="Flush.fs" />
<Compile Include="HttpLookupService.fs" />
<Compile Include="Main.fs" />
</ItemGroup>
Expand Down
Loading