@@ -38,27 +38,33 @@ def test_client_timeout_open(multi_node: Cluster, domain_urls: tc.DomainUrls):
3838 # Ensure the producer is connected to a replica
3939 producer_broker = brokers [0 ] if brokers [0 ] is not leader else brokers [1 ]
4040
41- # Start a producer
41+ # Start a producer with a long timeout and open queue, to ensure the queue
42+ # is assigned.
43+ longTimeoutProducer = leader .create_client ("producer_longtimeout" )
44+ longTimeoutProducer .open (uri_priority , flags = ["write,ack" ], succeed = True )
45+
46+ # Start a producer with a short timeout
4247 producer = producer_broker .create_client ("producer" , options = ["--timeout=1" ])
4348
44- # Suspend all replicas to force future open to timeout
49+ # Suspend rest of the cluster to force future open to timeout
4550 for broker in brokers :
46- if broker is not leader and broker is not producer_broker :
51+ if broker is not producer_broker :
4752 broker .suspend ()
4853
49- # Open a queue while the cluster does not have quorum
50- producer . open ( uri_priority , flags = [ "write,ack" ], succeed = False )
51-
52- # Wait past the open timeout
53- time . sleep ( 2 )
54+ # Open a queue while the cluster does not have quorum should timeout.
55+ assert (
56+ producer . open ( uri_priority , flags = [ "write,ack" ], succeed = False )
57+ == Client . e_TIMEOUT
58+ )
5459
55- # Release suspended replicas
60+ # Release suspended brokers
5661 for broker in brokers :
57- if broker is not leader and broker is not producer_broker :
62+ if broker is not producer_broker :
5863 broker .resume ()
5964 producer_broker .capture ("is back to healthy state" )
6065
61- # Post after timeout should result in failed Ack
66+ # Post after short timeout should result in failed Ack (open is not
67+ # retried).
6268 assert (
6369 producer .post (
6470 uri_priority , ["1" ], wait_ack = False , succeed = False , no_except = True
@@ -88,31 +94,37 @@ def test_client_timeout_reopen(multi_node: Cluster, domain_urls: tc.DomainUrls):
8894 # Ensure the producer is connected to a replica
8995 producer_broker = brokers [0 ] if brokers [0 ] is not leader else brokers [1 ]
9096
91- # Start a producer and open several queues
97+ # Start a producer with a long timeout and open queue, to ensure the queues
98+ # are assigned.
99+ longTimeoutProducer = leader .create_client ("producer_longtimeout" )
100+ longTimeoutProducer .open (uri_priority , flags = ["write,ack" ], succeed = True )
101+ longTimeoutProducer .open (uri_priority2 , flags = ["write,ack" ], succeed = True )
102+
103+ # Start a producer with a short timeout and open several queues
92104 producer = producer_broker .create_client ("producer" , options = ["--timeout=1" ])
93105 producer .open (uri_priority , flags = ["write,ack" ], succeed = True )
94106 producer .open (uri_priority2 , flags = ["write,ack" ], succeed = True )
95107
96- # Suspend all replicas to force future reopen to timeout
108+ # Suspend rest of the cluster to force future reopen to timeout
97109 for broker in brokers :
98- if broker is not leader and broker is not producer_broker :
110+ if broker is not producer_broker :
99111 broker .suspend ()
100112
101113 # Crash connected broker and restart to force reopen
102114 producer_broker .stop ()
103115 producer_broker .wait ()
104116 producer_broker .start ()
105117
106- # Wait past the reopen timeout
118+ # Wait past the short reopen timeout
107119 time .sleep (2 )
108120
109- # Release suspended replicas
121+ # Release suspended brokers
110122 for broker in brokers :
111- if broker is not leader and broker is not producer_broker :
123+ if broker is not producer_broker :
112124 broker .resume ()
113125 producer_broker .capture ("is back to healthy state" )
114126
115- # Post after timeout should result in successful Ack
127+ # Post after short timeout should result in successful Ack
116128 assert (
117129 producer .post (uri_priority , ["1" ], wait_ack = True , succeed = True )
118130 == Client .e_SUCCESS
@@ -141,28 +153,34 @@ def test_client_timeout_close(multi_node: Cluster, domain_urls: tc.DomainUrls):
141153 # Ensure the producer is connected to a replica
142154 producer_broker = brokers [0 ] if brokers [0 ] is not leader else brokers [1 ]
143155
144- # Start a producer and open the queue
156+ # Start a producer with a long timeout and open queue, to ensure the queues
157+ # are assigned.
158+ longTimeoutProducer = leader .create_client ("producer_longtimeout" )
159+ longTimeoutProducer .open (uri_priority , flags = ["write,ack" ], succeed = True )
160+
161+ # Start a producer with a short timeout and open the queue
145162 producer = producer_broker .create_client ("producer" , options = ["--timeout=1" ])
146163 producer .open (uri_priority , flags = ["write,ack" ], succeed = True )
147164
148- # Suspend all replicas to force future close to timeout
165+ # Suspend rest of the cluster to force future short timeout close to timeout
149166 for broker in brokers :
150- if broker is not leader and broker is not producer_broker :
167+ if broker is not producer_broker :
151168 broker .suspend ()
152169
153- # Close a queue while the cluster does not have quorum should succeed.
154- producer .close (uri_priority , succeed = True )
170+ # Close a queue with short timeout while the cluster does not have quorum
171+ # should correctly report failure, but should reset the connection.
172+ producer .close (uri_priority , succeed = False )
155173
156- # Wait past the close timeout
174+ # Wait past the short close timeout
157175 time .sleep (2 )
158176
159- # Release suspended replicas
177+ # Release suspended brokers
160178 for broker in brokers :
161- if broker is not leader and broker is not producer_broker :
179+ if broker is not producer_broker :
162180 broker .resume ()
163181 producer_broker .capture ("is back to healthy state" )
164182
165- # Post after timeout should result in failed Ack
183+ # Post after short timeout should result in failed Ack
166184 assert (
167185 producer .post (
168186 uri_priority , ["1" ], wait_ack = False , succeed = False , no_except = True
0 commit comments