Skip to content

Commit 5b562ca

Browse files
committed
Fix for BatchReceivePolicy.maxNumMessages not working #259
1 parent 9711c64 commit 5b562ca

File tree

3 files changed

+27
-8
lines changed

3 files changed

+27
-8
lines changed

src/Pulsar.Client/Common/DTO.fs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -325,18 +325,16 @@ type Messages<'T> internal(maxNumberOfMessages: int, maxSizeOfMessages: int64) =
325325

326326
let messageList = if maxNumberOfMessages > 0 then ResizeArray<Message<'T>>(maxNumberOfMessages) else ResizeArray<Message<'T>>()
327327

328-
member this.Count with get() =
329-
currentNumberOfMessages
330-
member this.Size with get() =
331-
currentSizeOfMessages
328+
member this.Count = currentNumberOfMessages
329+
member this.Size = currentSizeOfMessages
332330

333-
member internal this.IsFull with get() =
331+
member internal this.IsFull =
334332
currentNumberOfMessages = maxNumberOfMessages
335333
|| currentSizeOfMessages = maxSizeOfMessages
336334

337335
member internal this.CanAdd(message: Message<'T>) =
338-
(maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages)
339-
|| (maxSizeOfMessages > 0L && currentSizeOfMessages + (int64 message.Data.Length) > maxSizeOfMessages)
336+
((maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages)
337+
|| (maxSizeOfMessages > 0L && currentSizeOfMessages + (int64 message.Data.Length) > maxSizeOfMessages))
340338
|> not
341339

342340
member internal this.Add(message: Message<'T>) =

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
528528
messages.Add(interceptors.BeforeConsume(this, msg))
529529
else
530530
shouldContinue <- false
531-
Log.Logger.LogDebug("{0} BatchFormed with size {1}", prefix, messages.Size)
531+
Log.Logger.LogDebug("{0} BatchFormed with size {1} and count {2}", prefix, messages.Size, messages.Count)
532532
channel.SetResult messages
533533

534534
let removeExpiredMessagesFromQueue (msgIds: RedeliverSet) =

tests/UnitTests/Common/MessageTests.fs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
module Pulsar.Client.UnitTests.Common.MessageTests
22

3+
open System
34
open Expecto
45
open Expecto.Flip
56
open Pulsar.Client.Common
@@ -64,4 +65,24 @@ let tests =
6465
let deserialized = MessageId.FromByteArray msgIdData
6566
Expect.equal "" msgId deserialized
6667
}
68+
69+
test "Message batching by count works correctly" {
70+
let messages = Messages(2, -1)
71+
let message = Message(MessageId.Earliest, [||], %"", false, EmptyProps, None, [||], %0L, [||], %0L, Nullable(), 0, "", "", fun () -> failwith "not implemented")
72+
messages.CanAdd(message) |> Expect.isTrue ""
73+
messages.Add(message)
74+
messages.CanAdd(message) |> Expect.isTrue ""
75+
messages.Add(message)
76+
messages.CanAdd(message) |> Expect.isFalse ""
77+
}
78+
79+
test "Message batching by size works correctly" {
80+
let messages = Messages(-1, 2)
81+
let message = Message(MessageId.Earliest, [| 0uy |], %"", false, EmptyProps, None, [||], %0L, [||], %0L, Nullable(), 0, "", "", fun () -> failwith "not implemented")
82+
messages.CanAdd(message) |> Expect.isTrue ""
83+
messages.Add(message)
84+
messages.CanAdd(message) |> Expect.isTrue ""
85+
messages.Add(message)
86+
messages.CanAdd(message) |> Expect.isFalse ""
87+
}
6788
]

0 commit comments

Comments
 (0)