Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 45 additions & 27 deletions src/integration-tests/test_client_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,33 @@ def test_client_timeout_open(multi_node: Cluster, domain_urls: tc.DomainUrls):
# Ensure the producer is connected to a replica
producer_broker = brokers[0] if brokers[0] is not leader else brokers[1]

# Start a producer
# Start a producer with a long timeout and open queue, to ensure the queue
# is assigned.
longTimeoutProducer = leader.create_client("producer_longtimeout")
longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True)

# Start a producer with a short timeout
producer = producer_broker.create_client("producer", options=["--timeout=1"])

# Suspend all replicas to force future open to timeout
# Suspend rest of the cluster to force future open to timeout
for broker in brokers:
if broker is not leader and broker is not producer_broker:
if broker is not producer_broker:
broker.suspend()

# Open a queue while the cluster does not have quorum
producer.open(uri_priority, flags=["write,ack"], succeed=False)

# Wait past the open timeout
time.sleep(2)
# Open a queue while the cluster does not have quorum should timeout.
assert (
producer.open(uri_priority, flags=["write,ack"], succeed=False)
== Client.e_TIMEOUT
)

# Release suspended replicas
# Release suspended brokers
for broker in brokers:
if broker is not leader and broker is not producer_broker:
if broker is not producer_broker:
broker.resume()
producer_broker.capture("is back to healthy state")

# Post after timeout should result in failed Ack
# Post after short timeout should result in failed Ack (open is not
# retried).
assert (
producer.post(
uri_priority, ["1"], wait_ack=False, succeed=False, no_except=True
Expand Down Expand Up @@ -88,31 +94,37 @@ def test_client_timeout_reopen(multi_node: Cluster, domain_urls: tc.DomainUrls):
# Ensure the producer is connected to a replica
producer_broker = brokers[0] if brokers[0] is not leader else brokers[1]

# Start a producer and open several queues
# Start a producer with a long timeout and open queue, to ensure the queues
# are assigned.
longTimeoutProducer = leader.create_client("producer_longtimeout")
longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True)
longTimeoutProducer.open(uri_priority2, flags=["write,ack"], succeed=True)

# Start a producer with a short timeout and open several queues
producer = producer_broker.create_client("producer", options=["--timeout=1"])
producer.open(uri_priority, flags=["write,ack"], succeed=True)
producer.open(uri_priority2, flags=["write,ack"], succeed=True)

# Suspend all replicas to force future reopen to timeout
# Suspend rest of the cluster to force future reopen to timeout
for broker in brokers:
if broker is not leader and broker is not producer_broker:
if broker is not producer_broker:
broker.suspend()

# Crash connected broker and restart to force reopen
producer_broker.stop()
producer_broker.wait()
producer_broker.start()

# Wait past the reopen timeout
# Wait past the short reopen timeout
time.sleep(2)

# Release suspended replicas
# Release suspended brokers
for broker in brokers:
if broker is not leader and broker is not producer_broker:
if broker is not producer_broker:
broker.resume()
producer_broker.capture("is back to healthy state")

# Post after timeout should result in successful Ack
# Post after short timeout should result in successful Ack
assert (
producer.post(uri_priority, ["1"], wait_ack=True, succeed=True)
== Client.e_SUCCESS
Expand Down Expand Up @@ -141,28 +153,34 @@ def test_client_timeout_close(multi_node: Cluster, domain_urls: tc.DomainUrls):
# Ensure the producer is connected to a replica
producer_broker = brokers[0] if brokers[0] is not leader else brokers[1]

# Start a producer and open the queue
# Start a producer with a long timeout and open queue, to ensure the queues
# are assigned.
longTimeoutProducer = leader.create_client("producer_longtimeout")
longTimeoutProducer.open(uri_priority, flags=["write,ack"], succeed=True)

# Start a producer with a short timeout and open the queue
producer = producer_broker.create_client("producer", options=["--timeout=1"])
producer.open(uri_priority, flags=["write,ack"], succeed=True)

# Suspend all replicas to force future close to timeout
# Suspend rest of the cluster to force future short timeout close to timeout
for broker in brokers:
if broker is not leader and broker is not producer_broker:
if broker is not producer_broker:
broker.suspend()

# Close a queue while the cluster does not have quorum should succeed.
producer.close(uri_priority, succeed=True)
# Close a queue with short timeout while the cluster does not have quorum
# should correctly report failure, but should reset the connection.
producer.close(uri_priority, succeed=False)

# Wait past the close timeout
# Wait past the short close timeout
time.sleep(2)

# Release suspended replicas
# Release suspended brokers
for broker in brokers:
if broker is not leader and broker is not producer_broker:
if broker is not producer_broker:
broker.resume()
producer_broker.capture("is back to healthy state")

# Post after timeout should result in failed Ack
# Post after short timeout should result in failed Ack
assert (
producer.post(
uri_priority, ["1"], wait_ack=False, succeed=False, no_except=True
Expand Down
Loading