Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion tests/rptest/tests/cluster_linking_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ def __init__(
self.consumer_properties: dict[str, Any] = (
consumer_properties if consumer_properties else {}
)
# When using compaction, the completion criteria examines per-partition
# offsets, which may be at odds with having a max_msgs set.
assert not (self.use_compaction and "max_msgs" in self.consumer_properties), (
"max_msgs is incompatible with use_compaction: completion requires "
"per-partition offset parity, which a bounded read may never reach. "
"Let the consumer tail (continuous) instead."
)
Comment on lines +235 to +241
self.timeout_sec = timeout_sec
self.validate_number_of_messages_on_target = (
validate_number_of_messages_on_target
Expand Down Expand Up @@ -275,12 +282,14 @@ def start(self):
)
self.source_consumer.start(clean=False)

# NOTE: when using compaction, the completion criteria examines
# per-partition offsets, which is at odds with having a max_msgs.
self.target_consumer = KgoVerifierConsumerGroupConsumer(
context=self.test_context,
redpanda=self.target_cluster.service,
topic=self.topic,
msg_size=self.msg_size,
max_msgs=self.msg_count,
max_msgs=None if self.use_compaction else self.msg_count,
readers=readers,
use_transactions=self.use_transactions,
group_name=f"target-cg-{self._instance_id}",
Expand Down