From 79d7191da38aa0ec2f3f1f5530f1ae5c287ddc9e Mon Sep 17 00:00:00 2001 From: "Patrick M. Niedzielski" Date: Fri, 21 Nov 2025 21:03:20 +0000 Subject: [PATCH] IT[test_client_timeout]: Ensure queues are assigned The `test_client_timeout.py` integration test creates producers with artificially short timeouts in order to test the behavior of clients on open, reopen, and close timeouts. However, because bmqtool only supports a single timeout flag, the short timeouts can cause the initial open queue operation to fail on slow test runners. This patch makes these test more robust, by: 1) introducing a client with a long timeout to each test, which connects and opens the necessary queues, assigning them on the primary; and 2) suspending the primary in each test as well as all other brokers in the cluster, to ensure that the queue already being assigned does not interfere with the short timeouts on the operations we want them for. Signed-off-by: Patrick M. Niedzielski --- src/integration-tests/test_client_timeout.py | 72 ++++++++++++-------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/src/integration-tests/test_client_timeout.py b/src/integration-tests/test_client_timeout.py index 6cbc43ea94..caae0a0404 100644 --- a/src/integration-tests/test_client_timeout.py +++ b/src/integration-tests/test_client_timeout.py @@ -38,27 +38,33 @@ def test_client_timeout_open(multi_node: Cluster, domain_urls: tc.DomainUrls): # Ensure the producer is connected to a replica producer_broker = brokers[0] if brokers[0] is not leader else brokers[1] - # Start a producer + # Start a producer with a long timeout and open queue, to ensure the queue + # is assigned. + longTimeoutProducer = leader.create_client("producer_longtimeout") + longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True) + + # Start a producer with a short timeout producer = producer_broker.create_client("producer", options=["--timeout=1"]) - # Suspend all replicas to force future open to timeout + # Suspend rest of the cluster to force future open to timeout for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.suspend() - # Open a queue while the cluster does not have quorum - producer.open(uri_priority, flags=["write,ack"], succeed=False) - - # Wait past the open timeout - time.sleep(2) + # Open a queue while the cluster does not have quorum should timeout. + assert ( + producer.open(uri_priority, flags=["write,ack"], succeed=False) + == Client.e_TIMEOUT + ) - # Release suspended replicas + # Release suspended brokers for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.resume() producer_broker.capture("is back to healthy state") - # Post after timeout should result in failed Ack + # Post after short timeout should result in failed Ack (open is not + # retried). assert ( producer.post( uri_priority, ["1"], wait_ack=False, succeed=False, no_except=True @@ -88,14 +94,20 @@ def test_client_timeout_reopen(multi_node: Cluster, domain_urls: tc.DomainUrls): # Ensure the producer is connected to a replica producer_broker = brokers[0] if brokers[0] is not leader else brokers[1] - # Start a producer and open several queues + # Start a producer with a long timeout and open queue, to ensure the queues + # are assigned. + longTimeoutProducer = leader.create_client("producer_longtimeout") + longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True) + longTimeoutProducer.open(uri_priority2, flags=["write,ack"], succeed=True) + + # Start a producer with a short timeout and open several queues producer = producer_broker.create_client("producer", options=["--timeout=1"]) producer.open(uri_priority, flags=["write,ack"], succeed=True) producer.open(uri_priority2, flags=["write,ack"], succeed=True) - # Suspend all replicas to force future reopen to timeout + # Suspend rest of the cluster to force future reopen to timeout for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.suspend() # Crash connected broker and restart to force reopen @@ -103,16 +115,16 @@ def test_client_timeout_reopen(multi_node: Cluster, domain_urls: tc.DomainUrls): producer_broker.wait() producer_broker.start() - # Wait past the reopen timeout + # Wait past the short reopen timeout time.sleep(2) - # Release suspended replicas + # Release suspended brokers for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.resume() producer_broker.capture("is back to healthy state") - # Post after timeout should result in successful Ack + # Post after short timeout should result in successful Ack assert ( producer.post(uri_priority, ["1"], wait_ack=True, succeed=True) == Client.e_SUCCESS @@ -141,28 +153,34 @@ def test_client_timeout_close(multi_node: Cluster, domain_urls: tc.DomainUrls): # Ensure the producer is connected to a replica producer_broker = brokers[0] if brokers[0] is not leader else brokers[1] - # Start a producer and open the queue + # Start a producer with a long timeout and open queue, to ensure the queues + # are assigned. + longTimeoutProducer = leader.create_client("producer_longtimeout") + longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True) + + # Start a producer with a short timeout and open the queue producer = producer_broker.create_client("producer", options=["--timeout=1"]) producer.open(uri_priority, flags=["write,ack"], succeed=True) - # Suspend all replicas to force future close to timeout + # Suspend rest of the cluster to force future short timeout close to timeout for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.suspend() - # Close a queue while the cluster does not have quorum should succeed. - producer.close(uri_priority, succeed=True) + # Close a queue with short timeout while the cluster does not have quorum + # should correctly report failure, but should reset the connection. + producer.close(uri_priority, succeed=False) - # Wait past the close timeout + # Wait past the short close timeout time.sleep(2) - # Release suspended replicas + # Release suspended brokers for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.resume() producer_broker.capture("is back to healthy state") - # Post after timeout should result in failed Ack + # Post after short timeout should result in failed Ack assert ( producer.post( uri_priority, ["1"], wait_ack=False, succeed=False, no_except=True