Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ducktape.mark.resource import cluster

from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.kafka import TopicPartition, quorum, consumer_group

import signal
Expand Down Expand Up @@ -74,6 +75,20 @@ def setup_consumer(self, topic, **kwargs):
self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
return consumer

def _node_failed_with_unreleased_instance_id(self, node):
cmd = "grep -q 'UnreleasedInstanceIdException' %s" % VerifiableConsumer.LOG_FILE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the error is subject to change, would it be more reliable to check the process ID (PID) directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712

Thanks for the suggestion!
Given backward compatibility, checking the PID makes more sense.
I’ve made the corresponding adjustment.

return node.account.ssh(cmd, allow_fail=True) == 0

def await_conflict_consumers_fenced(self, conflict_consumer):
wait_until(lambda: len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
timeout_sec=60,
err_msg="Timed out waiting for conflict consumers to terminate after fencing")

for node in conflict_consumer.nodes:
wait_until(lambda: self._node_failed_with_unreleased_instance_id(node),
timeout_sec=30,
err_msg="Conflict consumer %s did not fail with UnreleasedInstanceIdException" % node.account.hostname)

@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
Expand Down Expand Up @@ -326,7 +341,10 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
assert len(consumer.joined_nodes()) == len(consumer.nodes)
assert len(conflict_consumer.joined_nodes()) == 0


# ensure the conflict consumers terminate
self.await_conflict_consumers_fenced(conflict_consumer)

# Stop existing nodes, so conflicting ones should be able to join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
Expand Down