diff --git a/src/integration-tests/test_client_timeout.py b/src/integration-tests/test_client_timeout.py index 6cbc43ea94..caae0a0404 100644 --- a/src/integration-tests/test_client_timeout.py +++ b/src/integration-tests/test_client_timeout.py @@ -38,27 +38,33 @@ def test_client_timeout_open(multi_node: Cluster, domain_urls: tc.DomainUrls): # Ensure the producer is connected to a replica producer_broker = brokers[0] if brokers[0] is not leader else brokers[1] - # Start a producer + # Start a producer with a long timeout and open queue, to ensure the queue + # is assigned. + longTimeoutProducer = leader.create_client("producer_longtimeout") + longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True) + + # Start a producer with a short timeout producer = producer_broker.create_client("producer", options=["--timeout=1"]) - # Suspend all replicas to force future open to timeout + # Suspend rest of the cluster to force future open to timeout for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.suspend() - # Open a queue while the cluster does not have quorum - producer.open(uri_priority, flags=["write,ack"], succeed=False) - - # Wait past the open timeout - time.sleep(2) + # Open a queue while the cluster does not have quorum should timeout. + assert ( + producer.open(uri_priority, flags=["write,ack"], succeed=False) + == Client.e_TIMEOUT + ) - # Release suspended replicas + # Release suspended brokers for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.resume() producer_broker.capture("is back to healthy state") - # Post after timeout should result in failed Ack + # Post after short timeout should result in failed Ack (open is not + # retried). assert ( producer.post( uri_priority, ["1"], wait_ack=False, succeed=False, no_except=True @@ -88,14 +94,20 @@ def test_client_timeout_reopen(multi_node: Cluster, domain_urls: tc.DomainUrls): # Ensure the producer is connected to a replica producer_broker = brokers[0] if brokers[0] is not leader else brokers[1] - # Start a producer and open several queues + # Start a producer with a long timeout and open queue, to ensure the queues + # are assigned. + longTimeoutProducer = leader.create_client("producer_longtimeout") + longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True) + longTimeoutProducer.open(uri_priority2, flags=["write,ack"], succeed=True) + + # Start a producer with a short timeout and open several queues producer = producer_broker.create_client("producer", options=["--timeout=1"]) producer.open(uri_priority, flags=["write,ack"], succeed=True) producer.open(uri_priority2, flags=["write,ack"], succeed=True) - # Suspend all replicas to force future reopen to timeout + # Suspend rest of the cluster to force future reopen to timeout for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.suspend() # Crash connected broker and restart to force reopen @@ -103,16 +115,16 @@ def test_client_timeout_reopen(multi_node: Cluster, domain_urls: tc.DomainUrls): producer_broker.wait() producer_broker.start() - # Wait past the reopen timeout + # Wait past the short reopen timeout time.sleep(2) - # Release suspended replicas + # Release suspended brokers for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.resume() producer_broker.capture("is back to healthy state") - # Post after timeout should result in successful Ack + # Post after short timeout should result in successful Ack assert ( producer.post(uri_priority, ["1"], wait_ack=True, succeed=True) == Client.e_SUCCESS @@ -141,28 +153,34 @@ def test_client_timeout_close(multi_node: Cluster, domain_urls: tc.DomainUrls): # Ensure the producer is connected to a replica producer_broker = brokers[0] if brokers[0] is not leader else brokers[1] - # Start a producer and open the queue + # Start a producer with a long timeout and open queue, to ensure the queues + # are assigned. + longTimeoutProducer = leader.create_client("producer_longtimeout") + longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True) + + # Start a producer with a short timeout and open the queue producer = producer_broker.create_client("producer", options=["--timeout=1"]) producer.open(uri_priority, flags=["write,ack"], succeed=True) - # Suspend all replicas to force future close to timeout + # Suspend rest of the cluster to force future short timeout close to timeout for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.suspend() - # Close a queue while the cluster does not have quorum should succeed. - producer.close(uri_priority, succeed=True) + # Close a queue with short timeout while the cluster does not have quorum + # should correctly report failure, but should reset the connection. + producer.close(uri_priority, succeed=False) - # Wait past the close timeout + # Wait past the short close timeout time.sleep(2) - # Release suspended replicas + # Release suspended brokers for broker in brokers: - if broker is not leader and broker is not producer_broker: + if broker is not producer_broker: broker.resume() producer_broker.capture("is back to healthy state") - # Post after timeout should result in failed Ack + # Post after short timeout should result in failed Ack assert ( producer.post( uri_priority, ["1"], wait_ack=False, succeed=False, no_except=True