Skip to content

Commit e894e66

Browse files
authored
Fix wrong results of hasMessageAvailable after seeking by timestamp (#333)
* Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp * Remove HasSoughtByTimestamp and improve test
1 parent a554121 commit e894e66

File tree

2 files changed

+97
-6
lines changed

2 files changed

+97
-6
lines changed

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
9797
let mutable lastMessageIdInBroker = MessageId.Earliest
9898
let mutable lastDequeuedMessageId = MessageId.Earliest
9999
let mutable duringSeek = None
100+
let mutable hasSoughtByTimestamp = false
100101
let initialStartMessageId = startMessageId
101102
let mutable incomingMessagesSize = 0L
102103
let deadLettersProcessor = consumerConfig.DeadLetterProcessor topicName
@@ -1092,7 +1093,9 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
10921093
Log.Logger.LogInformation("{0} Seek subscription to {1}", prefix, seekData)
10931094
let payload, seekMessageId =
10941095
match seekData with
1095-
| SeekType.Timestamp timestamp -> Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
1096+
| SeekType.Timestamp timestamp ->
1097+
hasSoughtByTimestamp <- true
1098+
Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
10961099
| SeekType.MessageId messageId ->
10971100
match messageId.ChunkMessageIds with
10981101
| Some chunkMessageIds when chunkMessageIds.Length >0 ->
@@ -1132,9 +1135,11 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
11321135

11331136
// we haven't read yet. use startMessageId for comparison
11341137
if lastDequeuedMessageId = MessageId.Earliest then
1138+
// If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we
1139+
// have to get the mark-delete position from the GetLastMessageId response to compare as well.
11351140
// if we are starting from latest, we should seek to the actual last message first.
11361141
// allow the last one to be read when read head inclusively.
1137-
if startMessageId = MessageId.Latest then
1142+
if startMessageId = MessageId.Latest || hasSoughtByTimestamp then
11381143
backgroundTask {
11391144
try
11401145
let! lastMessageIdResult = getLastMessageIdAsync()
@@ -1431,6 +1436,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
14311436
with get() = Volatile.Read(&lastMessageIdInBroker)
14321437
and private set value = Volatile.Write(&lastMessageIdInBroker, value)
14331438

1439+
14341440
override this.Equals consumer =
14351441
consumerId = (consumer :?> IConsumer<'T>).ConsumerId
14361442

tests/IntegrationTests/Reader.fs

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,85 @@ let tests =
272272

273273

274274

275+
let checkHasMessageAvailableAfterSeekTimestamp (initializeLastMessageIdInBroker: bool) =
276+
task {
277+
Log.Debug("Started HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
278+
let client = getClient()
279+
let topicName = "public/default/test-has-message-available-after-seek-timestamp-" + Guid.NewGuid().ToString("N")
280+
281+
let! (producer : IProducer<string>) =
282+
client.NewProducer(Schema.STRING())
283+
.Topic(topicName)
284+
.CreateAsync()
285+
286+
let timestampBeforeSend = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
287+
let! sentMsgId = producer.SendAsync("msg")
288+
do! producer.DisposeAsync()
289+
290+
let messageIds = [
291+
MessageId.Earliest
292+
sentMsgId
293+
MessageId.Latest
294+
]
295+
296+
// Test 1: Seek to future timestamp
297+
for messageId in messageIds do
298+
let! (reader : IReader<string>) =
299+
client.NewReader(Schema.STRING())
300+
.Topic(topicName)
301+
.ReceiverQueueSize(1)
302+
.StartMessageId(messageId)
303+
.CreateAsync()
304+
305+
if initializeLastMessageIdInBroker then
306+
if messageId = MessageId.Earliest then
307+
let! hasMessage = reader.HasMessageAvailableAsync()
308+
Expect.isTrue "should have message" hasMessage
309+
else
310+
let! hasMessage = reader.HasMessageAvailableAsync()
311+
Expect.isFalse "should not have message" hasMessage
312+
313+
let futureTimestamp = %(DateTimeOffset.UtcNow.AddMinutes(1.0).ToUnixTimeMilliseconds())
314+
315+
// The Seek operation does not implement a backoff mechanism. It will fail if the connection is not
316+
// ready, so wait for a short period until the connection becomes available.
317+
do! Task.Delay(1000)
318+
do! reader.SeekAsync(futureTimestamp)
319+
// HasMessageAvailableAsync does not implement a backoff mechanism. The operation may fail due to the
320+
// broker not finding the consumer during the seek operation. So wait for a short period here.
321+
do! Task.Delay(1000)
322+
let! hasMessage = reader.HasMessageAvailableAsync()
323+
Expect.isFalse "after seek to future should not have message" hasMessage
324+
do! reader.DisposeAsync()
325+
326+
// Test 2: Seek to timestamp before send
327+
for messageId in messageIds do
328+
let! (reader : IReader<string>) =
329+
client.NewReader(Schema.STRING())
330+
.Topic(topicName)
331+
.ReceiverQueueSize(1)
332+
.StartMessageId(messageId)
333+
.CreateAsync()
334+
335+
336+
if initializeLastMessageIdInBroker then
337+
if messageId = MessageId.Earliest then
338+
let! hasMessage = reader.HasMessageAvailableAsync()
339+
Expect.isTrue "should have message" hasMessage
340+
else
341+
let! hasMessage = reader.HasMessageAvailableAsync()
342+
Expect.isFalse "should not have message" hasMessage
343+
344+
do! Task.Delay(1000)
345+
do! reader.SeekAsync(timestampBeforeSend)
346+
do! Task.Delay(1000)
347+
let! hasMessage = reader.HasMessageAvailableAsync()
348+
Expect.isTrue "after seek to before send should have message" hasMessage
349+
do! reader.DisposeAsync()
350+
351+
Log.Debug("Finished HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
352+
}
353+
275354
testList "Reader" [
276355

277356
testTask "Reader non-batching configuration works fine" {
@@ -314,13 +393,19 @@ let tests =
314393
do! checkReadingFromRollback true
315394
}
316395

317-
// uncomment when https://github.com/apache/pulsar/issues/10515 is ready
318-
ptestTask "Check StartMessageFromFuturePoint without batching" {
396+
testTask "Check StartMessageFromFuturePoint without batching" {
319397
do! checkReadingFromFuture false
320398
}
321399

322-
// uncomment when https://github.com/apache/pulsar/issues/10515 is ready
323-
ptestTask "Check StartMessageFromFuturePoint with batching" {
400+
testTask "Check StartMessageFromFuturePoint with batching" {
324401
do! checkReadingFromFuture true
325402
}
403+
404+
testTask "HasMessageAvailable after SeekTimestamp without initializeLastMessageIdInBroker" {
405+
do! checkHasMessageAvailableAfterSeekTimestamp false
406+
}
407+
408+
testTask "HasMessageAvailable after SeekTimestamp with initializeLastMessageIdInBroker" {
409+
do! checkHasMessageAvailableAfterSeekTimestamp true
410+
}
326411
]

0 commit comments

Comments
 (0)