Skip to content

Commit eb2850a

Browse files
committed
Trigger rollover during ITs
Signed-off-by: Dmitrii Petukhov <[email protected]>
1 parent 47a89d5 commit eb2850a

File tree

2 files changed

+79
-26
lines changed

2 files changed

+79
-26
lines changed

src/integration-tests/test_restart_between_modes.py

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
# simulate_csl_rollover,
4848
)
4949

50+
MAX_JOURNAL_FILE_SIZE = 4567
5051
pytestmark = order(2)
5152
timeout = 20
5253

@@ -659,20 +660,20 @@ def test_restart_between_Legacy_and_FSM_unassign_queue(
659660
@pytest.fixture(
660661
params=[
661662
(restart_as_fsm_mode, restart_as_legacy_mode, False),
662-
(restart_as_legacy_mode, restart_as_fsm_mode, False),
663-
(restart_to_fsm_single_node_with_quorum_one, restart_as_legacy_mode, True),
664-
(
665-
restart_to_fsm_single_node_with_quorum_one_and_start_others,
666-
restart_as_legacy_mode,
667-
False,
668-
),
669-
],
670-
ids=[
671-
"to_fsm/from_legacy",
672-
"to_legacy/from_fsm",
673-
"to_fsm_quorum_1/from_legacy",
674-
"to_fsm_quorum_1_then_start_all/from_legacy",
663+
# (restart_as_legacy_mode, restart_as_fsm_mode, False),
664+
# (restart_to_fsm_single_node_with_quorum_one, restart_as_legacy_mode, True),
665+
# (
666+
# restart_to_fsm_single_node_with_quorum_one_and_start_others,
667+
# restart_as_legacy_mode,
668+
# False,
669+
# ),
675670
],
671+
# ids=[
672+
# "to_fsm/from_legacy",
673+
# "to_legacy/from_fsm",
674+
# "to_fsm_quorum_1/from_legacy",
675+
# "to_fsm_quorum_1_then_start_all/from_legacy",
676+
# ],
676677
)
677678
def switch_cluster_mode(request):
678679
"""
@@ -688,21 +689,47 @@ def switch_cluster_mode(request):
688689

689690

690691
def without_rollover(
691-
du: tc.DomainUrls, # pylint: disable=unused-argument
692-
leader: Broker, # pylint: disable=unused-argument
692+
queue_uri: str, # pylint: disable=unused-argument
693+
cluster: Cluster, # pylint: disable=unused-argument
693694
producer: Client, # pylint: disable=unused-argument
695+
consumer: Client, # pylint: disable=unused-argument
694696
):
695697
"""
696698
Simulate fixture scenario without rollover. Just do nothing.
697699
"""
698700

699701

700-
@pytest.fixture(
701-
params=[
702-
without_rollover,
703-
# simulate_journal_rollover
704-
]
705-
)
702+
def with_rollover(queue_uri: str, cluster: Cluster, producer: Client, consumer: Client):
703+
"""
704+
Simulate fixture scenario with rollover
705+
"""
706+
707+
consumer.open(
708+
queue_uri,
709+
flags=["read"],
710+
succeed=True,
711+
)
712+
713+
leader = cluster.last_known_leader
714+
715+
i = 0
716+
while not leader.outputs_substr("Initiating rollover", 0.01):
717+
i += 1
718+
assert i < 9999, "Failed to trigger Rollover"
719+
producer.post(queue_uri, [f"msg{i}"], succeed=True, wait_ack=True)
720+
consumer.wait_push_event()
721+
consumer.confirm(queue_uri, "*", succeed=True)
722+
723+
# log that rollover was detected
724+
test_logger.info("Rollover detected on queue %s after %d messages", queue_uri, i)
725+
726+
for node in cluster.nodes():
727+
node.wait_rollover_complete()
728+
729+
consumer.close(queue_uri, succeed=True)
730+
731+
732+
@pytest.fixture(params=[without_rollover, with_rollover])
706733
def optional_rollover(request):
707734
"""
708735
Fixture to optionally simulate rollover of CSL file.
@@ -711,6 +738,10 @@ def optional_rollover(request):
711738
return request.param
712739

713740

741+
# Set number of partitions to 1 to keep all queues on the same partition
742+
# to actually test rollover on the partition with actual data
743+
@tweak.cluster.partition_config.num_partitions(1)
744+
@tweak.cluster.partition_config.max_journal_file_size(MAX_JOURNAL_FILE_SIZE)
714745
def test_restart_between_legacy_and_fsm_add_remove_app(
715746
cluster: Cluster,
716747
domain_urls: tc.DomainUrls,
@@ -770,9 +801,10 @@ def test_restart_between_legacy_and_fsm_add_remove_app(
770801
# 1. PROLOGUE
771802
priority_queue = f"bmq://{du.domain_priority}/{test_queue}"
772803
fanout_queue = f"bmq://{du.domain_fanout}/{test_queue}"
804+
rollover_queue = f"bmq://{du.domain_priority}/rollover_test_queue"
773805

774806
# post two messages
775-
for queue in [priority_queue, fanout_queue]:
807+
for queue in [priority_queue, fanout_queue, rollover_queue]:
776808
producer.open(queue, flags=["write,ack"], succeed=True)
777809

778810
for queue in [priority_queue, fanout_queue]:
@@ -792,7 +824,7 @@ def test_restart_between_legacy_and_fsm_add_remove_app(
792824

793825
# 2. SWITCH
794826
# 2.1 Optional rollover
795-
optional_rollover(du, cluster.last_known_leader, producer)
827+
optional_rollover(rollover_queue, cluster, producer, consumer)
796828
# 2.2 Switch cluster mode
797829
switch_cluster_mode[0](cluster, producer)
798830

@@ -831,6 +863,10 @@ def test_restart_between_legacy_and_fsm_add_remove_app(
831863
check_if_queue_has_n_messages(consumer, fanout_queue + "?id=corge", 0 + 2)
832864

833865

866+
# Set number of partitions to 1 to keep all queues on the same partition
867+
# to actually test rollover on the partition with actual data
868+
@tweak.cluster.partition_config.num_partitions(1)
869+
@tweak.cluster.partition_config.max_journal_file_size(MAX_JOURNAL_FILE_SIZE)
834870
def test_restart_between_legacy_and_fsm_purge_queue_app(
835871
cluster: Cluster,
836872
domain_urls: tc.DomainUrls,
@@ -895,9 +931,10 @@ def test_restart_between_legacy_and_fsm_purge_queue_app(
895931
# 1. PROLOGUE
896932
priority_queue = f"bmq://{du.domain_priority}/{test_queue}"
897933
fanout_queue = f"bmq://{du.domain_fanout}/{test_queue}"
934+
rollover_queue = f"bmq://{du.domain_priority}/rollover_test_queue"
898935

899936
# Post one message
900-
for queue in [priority_queue, fanout_queue]:
937+
for queue in [priority_queue, fanout_queue, rollover_queue]:
901938
producer.open(queue, flags=["write,ack"], succeed=True)
902939

903940
for queue in [priority_queue, fanout_queue]:
@@ -926,7 +963,7 @@ def test_restart_between_legacy_and_fsm_purge_queue_app(
926963

927964
# 2. SWITCH
928965
# 2.1 Optional rollover
929-
optional_rollover(du, cluster.last_known_leader, producer)
966+
optional_rollover(rollover_queue, cluster, producer, consumer)
930967
# 2.2 Switch cluster mode
931968
switch_cluster_mode[0](cluster, producer)
932969

src/python/blazingmq/dev/it/process/broker.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,10 @@ def get_active_node(self):
181181
return self.last_known_active
182182

183183
def wait_status(
184-
self, wait_leader: bool, wait_ready: bool, cluster: str = "itCluster"
184+
self,
185+
wait_leader: bool,
186+
wait_ready: bool,
187+
cluster: str = "itCluster",
185188
):
186189
"""
187190
Wait until this node has an active leader if 'wait_leader' is True, and
@@ -230,6 +233,19 @@ def wait_status(
230233
self._logger.error(error)
231234
raise RuntimeError(error)
232235

236+
def wait_rollover_complete(self):
237+
"""
238+
Wait until rollover is complete on this broker.
239+
"""
240+
241+
self._logger.info(f"Waiting for rollover to complete on broker {self.name}...")
242+
243+
with internal_use(self):
244+
if not self.outputs_substr("ROLLOVER COMPLETE", timeout=BLOCK_TIMEOUT):
245+
raise RuntimeError(
246+
f"Rollover did not complete on broker {self.name} within {BLOCK_TIMEOUT}s"
247+
)
248+
233249
def dump_queue_internals(self, domain, queue):
234250
"""
235251
Dump state of the specified 'queue' in the specified 'domain'.

0 commit comments

Comments
 (0)