Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions src/integration-tests/test_restart_between_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
# simulate_csl_rollover,
)

MAX_JOURNAL_FILE_SIZE = 4567
pytestmark = order(2)
timeout = 20

Expand Down Expand Up @@ -688,21 +689,47 @@ def switch_cluster_mode(request):


def without_rollover(
du: tc.DomainUrls, # pylint: disable=unused-argument
leader: Broker, # pylint: disable=unused-argument
queue_uri: str, # pylint: disable=unused-argument
cluster: Cluster, # pylint: disable=unused-argument
producer: Client, # pylint: disable=unused-argument
consumer: Client, # pylint: disable=unused-argument
):
"""
Simulate fixture scenario without rollover. Just do nothing.
"""


@pytest.fixture(
params=[
without_rollover,
# simulate_journal_rollover
]
)
def with_rollover(queue_uri: str, cluster: Cluster, producer: Client, consumer: Client):
"""
Simulate fixture scenario with rollover
"""

consumer.open(
queue_uri,
flags=["read"],
succeed=True,
)

leader = cluster.last_known_leader

i = 0
while not leader.outputs_substr("Initiating rollover", 0.01):
i += 1
assert i < 9999, "Failed to trigger Rollover"
producer.post(queue_uri, [f"msg{i}"], succeed=True, wait_ack=True)
consumer.wait_push_event()
consumer.confirm(queue_uri, "*", succeed=True)

# log that rollover was detected
test_logger.info("Rollover detected on queue %s after %d messages", queue_uri, i)

for node in cluster.nodes():
node.wait_rollover_complete()

consumer.close(queue_uri, succeed=True)


@pytest.fixture(params=[without_rollover, with_rollover])
def optional_rollover(request):
"""
Fixture to optionally simulate rollover of CSL file.
Expand All @@ -711,6 +738,10 @@ def optional_rollover(request):
return request.param


# Set number of partitions to 1 to keep all queues on the same partition
# to actually test rollover on the partition with actual data
@tweak.cluster.partition_config.num_partitions(1)
@tweak.cluster.partition_config.max_journal_file_size(MAX_JOURNAL_FILE_SIZE)
def test_restart_between_legacy_and_fsm_add_remove_app(
cluster: Cluster,
domain_urls: tc.DomainUrls,
Expand Down Expand Up @@ -770,9 +801,10 @@ def test_restart_between_legacy_and_fsm_add_remove_app(
# 1. PROLOGUE
priority_queue = f"bmq://{du.domain_priority}/{test_queue}"
fanout_queue = f"bmq://{du.domain_fanout}/{test_queue}"
rollover_queue = f"bmq://{du.domain_priority}/rollover_test_queue"

# post two messages
for queue in [priority_queue, fanout_queue]:
for queue in [priority_queue, fanout_queue, rollover_queue]:
producer.open(queue, flags=["write,ack"], succeed=True)

for queue in [priority_queue, fanout_queue]:
Expand All @@ -792,7 +824,7 @@ def test_restart_between_legacy_and_fsm_add_remove_app(

# 2. SWITCH
# 2.1 Optional rollover
optional_rollover(du, cluster.last_known_leader, producer)
optional_rollover(rollover_queue, cluster, producer, consumer)
# 2.2 Switch cluster mode
switch_cluster_mode[0](cluster, producer)

Expand Down Expand Up @@ -831,6 +863,10 @@ def test_restart_between_legacy_and_fsm_add_remove_app(
check_if_queue_has_n_messages(consumer, fanout_queue + "?id=corge", 0 + 2)


# Set number of partitions to 1 to keep all queues on the same partition
# to actually test rollover on the partition with actual data
@tweak.cluster.partition_config.num_partitions(1)
@tweak.cluster.partition_config.max_journal_file_size(MAX_JOURNAL_FILE_SIZE)
def test_restart_between_legacy_and_fsm_purge_queue_app(
cluster: Cluster,
domain_urls: tc.DomainUrls,
Expand Down Expand Up @@ -895,9 +931,10 @@ def test_restart_between_legacy_and_fsm_purge_queue_app(
# 1. PROLOGUE
priority_queue = f"bmq://{du.domain_priority}/{test_queue}"
fanout_queue = f"bmq://{du.domain_fanout}/{test_queue}"
rollover_queue = f"bmq://{du.domain_priority}/rollover_test_queue"

# Post one message
for queue in [priority_queue, fanout_queue]:
for queue in [priority_queue, fanout_queue, rollover_queue]:
producer.open(queue, flags=["write,ack"], succeed=True)

for queue in [priority_queue, fanout_queue]:
Expand Down Expand Up @@ -926,7 +963,7 @@ def test_restart_between_legacy_and_fsm_purge_queue_app(

# 2. SWITCH
# 2.1 Optional rollover
optional_rollover(du, cluster.last_known_leader, producer)
optional_rollover(rollover_queue, cluster, producer, consumer)
# 2.2 Switch cluster mode
switch_cluster_mode[0](cluster, producer)

Expand Down
18 changes: 17 additions & 1 deletion src/python/blazingmq/dev/it/process/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ def get_active_node(self):
return self.last_known_active

def wait_status(
self, wait_leader: bool, wait_ready: bool, cluster: str = "itCluster"
self,
wait_leader: bool,
wait_ready: bool,
cluster: str = "itCluster",
):
"""
Wait until this node has an active leader if 'wait_leader' is True, and
Expand Down Expand Up @@ -230,6 +233,19 @@ def wait_status(
self._logger.error(error)
raise RuntimeError(error)

def wait_rollover_complete(self):
"""
Wait until rollover is complete on this broker.
"""

self._logger.info(f"Waiting for rollover to complete on broker {self.name}...")

with internal_use(self):
if not self.outputs_substr("ROLLOVER COMPLETE", timeout=BLOCK_TIMEOUT):
raise RuntimeError(
f"Rollover did not complete on broker {self.name} within {BLOCK_TIMEOUT}s"
)

def dump_queue_internals(self, domain, queue):
"""
Dump state of the specified 'queue' in the specified 'domain'.
Expand Down
Loading