diff --git a/.circleci/config.yml b/.circleci/config.yml index 54ac46f7..b084cf31 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -267,6 +267,10 @@ jobs: TEST_INTEGRATION: 1 # Kafka support DeleteRecords TEST_DELETE_RECORDS: 1 + # Kafka supports ElectLeaders + TEST_ELECT_LEADERS: 1 + # Kafka supports ReassignPartitions + TEST_REASSIGN_PARTITIONS: 1 TEST_JAVA_INTEROPT: 1 # Don't use the first node here since this is likely the controller and we want to ensure that we automatically # pick the controller for certain actions (e.g. topic creation) and don't just get lucky. diff --git a/src/client/controller.rs b/src/client/controller.rs index 34a12857..142e1901 100644 --- a/src/client/controller.rs +++ b/src/client/controller.rs @@ -11,12 +11,32 @@ use crate::{ messenger::RequestError, protocol::{ error::Error as ProtocolError, - messages::{CreateTopicRequest, CreateTopicsRequest}, - primitives::{Int16, Int32, NullableString, String_}, + messages::{ + AlterPartitionReassignmentsPartitionRequest, AlterPartitionReassignmentsRequest, + AlterPartitionReassignmentsTopicRequest, CreateTopicRequest, CreateTopicsRequest, + ElectLeadersRequest, ElectLeadersTopicRequest, + }, + primitives::{ + Array, CompactArray, CompactString, Int16, Int32, Int8, NullableString, String_, + TaggedFields, + }, }, validation::ExactlyOne, }; +/// Election type of [`ControllerClient::elect_leaders`]. +/// +/// The names in this enum are borrowed from the +/// [Kafka source code](https://github.com/a0x8o/kafka/blob/5383311a5cfbdaf147411004106449e3ad8081fb/core/src/main/scala/kafka/controller/KafkaController.scala#L2186-L2194>). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ElectionType { + /// Elects the preferred replica. + Preferred, + + /// Elects the first live replica if there are no in-sync replica. + Unclean, +} + #[derive(Debug)] pub struct ControllerClient { brokers: Arc, @@ -78,6 +98,113 @@ impl ControllerClient { .await } + /// Re-assign partitions. + pub async fn reassign_partitions( + &self, + topic: impl Into + Send, + partition: i32, + replicas: Vec, + timeout_ms: i32, + ) -> Result<()> { + let request = &AlterPartitionReassignmentsRequest { + topics: vec![AlterPartitionReassignmentsTopicRequest { + name: CompactString(topic.into()), + partitions: vec![AlterPartitionReassignmentsPartitionRequest { + partition_index: Int32(partition), + replicas: CompactArray(Some(replicas.into_iter().map(Int32).collect())), + tagged_fields: TaggedFields::default(), + }], + tagged_fields: TaggedFields::default(), + }], + timeout_ms: Int32(timeout_ms), + tagged_fields: TaggedFields::default(), + }; + + maybe_retry( + &self.backoff_config, + self, + "reassign_partitions", + || async move { + let broker = self.get().await?; + let response = broker.request(request).await?; + + if let Some(protocol_error) = response.error { + return Err(Error::ServerError(protocol_error, Default::default())); + } + + let topic = response + .responses + .exactly_one() + .map_err(Error::exactly_one_topic)?; + + let partition = topic + .partitions + .exactly_one() + .map_err(Error::exactly_one_partition)?; + + match partition.error { + None => Ok(()), + Some(protocol_error) => Err(Error::ServerError( + protocol_error, + partition.error_message.0.unwrap_or_default(), + )), + } + }, + ) + .await + } + + /// Elect leaders for given topic and partition. + pub async fn elect_leaders( + &self, + topic: impl Into + Send, + partition: i32, + election_type: ElectionType, + timeout_ms: i32, + ) -> Result<()> { + let request = &ElectLeadersRequest { + election_type: Int8(match election_type { + ElectionType::Preferred => 0, + ElectionType::Unclean => 1, + }), + topic_partitions: vec![ElectLeadersTopicRequest { + topic: String_(topic.into()), + partitions: Array(Some(vec![Int32(partition)])), + tagged_fields: None, + }], + timeout_ms: Int32(timeout_ms), + tagged_fields: None, + }; + + maybe_retry(&self.backoff_config, self, "elect_leaders", || async move { + let broker = self.get().await?; + let response = broker.request(request).await?; + + if let Some(protocol_error) = response.error { + return Err(Error::ServerError(protocol_error, Default::default())); + } + + let topic = response + .replica_election_results + .exactly_one() + .map_err(Error::exactly_one_topic)?; + + let partition = topic + .partition_results + .exactly_one() + .map_err(Error::exactly_one_partition)?; + + match partition.error { + None => Ok(()), + Some(protocol_error) => Err(Error::ServerError( + protocol_error, + partition.error_message.0.unwrap_or_default(), + )), + } + }) + .await + } + /// Retrieve the broker ID of the controller async fn get_controller_id(&self) -> Result { let metadata = self.brokers.request_metadata(None, Some(vec![])).await?; diff --git a/src/client/mod.rs b/src/client/mod.rs index 9bddeda8..40ff8e58 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -5,6 +5,7 @@ use thiserror::Error; use crate::{ client::partition::PartitionClient, connection::{BrokerConnector, TlsConfig}, + metadata::{Metadata, MetadataBroker, MetadataPartition, MetadataTopic}, protocol::primitives::Boolean, topic::Topic, }; @@ -145,4 +146,53 @@ impl Client { }) .collect()) } + + /// Return cluster-wide metadata. + pub async fn metadata(&self) -> Result { + let response = self.brokers.request_metadata(None, None).await?; + + Ok(Metadata { + brokers: response + .brokers + .into_iter() + .map(|response| MetadataBroker { + node_id: response.node_id.0, + host: response.host.0, + port: response.port.0, + rack: response.rack.and_then(|s| s.0), + }) + .collect(), + controller_id: response.controller_id.map(|id| id.0), + topics: response + .topics + .into_iter() + .map(|response| MetadataTopic { + name: response.name.0, + is_internal: response.is_internal.map(|b| b.0), + partitions: response + .partitions + .into_iter() + .map(|response| MetadataPartition { + partition_index: response.partition_index.0, + leader_id: response.leader_id.0, + replica_nodes: response + .replica_nodes + .0 + .unwrap_or_default() + .into_iter() + .map(|i| i.0) + .collect(), + isr_nodes: response + .isr_nodes + .0 + .unwrap_or_default() + .into_iter() + .map(|i| i.0) + .collect(), + }) + .collect(), + }) + .collect(), + }) + } } diff --git a/src/lib.rs b/src/lib.rs index b7543fdd..dbb0524c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,11 +22,14 @@ mod backoff; pub mod client; mod connection; + #[cfg(feature = "unstable-fuzzing")] pub mod messenger; #[cfg(not(feature = "unstable-fuzzing"))] mod messenger; +pub mod metadata; + #[cfg(feature = "unstable-fuzzing")] pub mod protocol; #[cfg(not(feature = "unstable-fuzzing"))] diff --git a/src/metadata.rs b/src/metadata.rs new file mode 100644 index 00000000..db653adb --- /dev/null +++ b/src/metadata.rs @@ -0,0 +1,59 @@ +//! Cluster-wide Kafka metadata. + +/// Metadata container for the entire cluster. +#[derive(Debug, PartialEq)] +pub struct Metadata { + /// Brokers. + pub brokers: Vec, + + /// The ID of the controller broker. + pub controller_id: Option, + + /// Topics. + pub topics: Vec, +} + +/// Metadata for a certain broker. +#[derive(Debug, PartialEq)] +pub struct MetadataBroker { + /// The broker ID + pub node_id: i32, + + /// The broker hostname + pub host: String, + + /// The broker port + pub port: i32, + + /// Rack. + pub rack: Option, +} + +/// Metadata for a certain topic. +#[derive(Debug, PartialEq)] +pub struct MetadataTopic { + /// The topic name + pub name: String, + + /// True if the topic is internal + pub is_internal: Option, + + /// Each partition in the topic + pub partitions: Vec, +} + +/// Metadata for a certain partition. +#[derive(Debug, PartialEq)] +pub struct MetadataPartition { + /// The partition index + pub partition_index: i32, + + /// The ID of the leader broker + pub leader_id: i32, + + /// The set of all nodes that host this partition + pub replica_nodes: Vec, + + /// The set of all nodes that are in sync with the leader for this partition + pub isr_nodes: Vec, +} diff --git a/src/protocol/messages/alter_partition_reassignments.rs b/src/protocol/messages/alter_partition_reassignments.rs new file mode 100644 index 00000000..45216db1 --- /dev/null +++ b/src/protocol/messages/alter_partition_reassignments.rs @@ -0,0 +1,234 @@ +use std::io::{Read, Write}; + +use crate::protocol::{ + api_key::ApiKey, + api_version::{ApiVersion, ApiVersionRange}, + error::Error, + messages::{read_compact_versioned_array, write_compact_versioned_array}, + primitives::{CompactArray, CompactNullableString, CompactString, Int16, Int32, TaggedFields}, + traits::{ReadType, WriteType}, +}; + +use super::{ + ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType, +}; + +#[derive(Debug)] +pub struct AlterPartitionReassignmentsRequest { + /// The time in ms to wait for the request to complete. + pub timeout_ms: Int32, + + /// The topics to reassign. + pub topics: Vec, + + /// The tagged fields. + pub tagged_fields: TaggedFields, +} + +impl WriteVersionedType for AlterPartitionReassignmentsRequest +where + W: Write, +{ + fn write_versioned( + &self, + writer: &mut W, + version: ApiVersion, + ) -> Result<(), WriteVersionedError> { + let v = version.0 .0; + assert!(v <= 0); + + self.timeout_ms.write(writer)?; + write_compact_versioned_array(writer, version, Some(&self.topics))?; + self.tagged_fields.write(writer)?; + + Ok(()) + } +} + +impl RequestBody for AlterPartitionReassignmentsRequest { + type ResponseBody = AlterPartitionReassignmentsResponse; + + const API_KEY: ApiKey = ApiKey::AlterPartitionReassignments; + + /// All versions. + const API_VERSION_RANGE: ApiVersionRange = + ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(0))); + + const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(0)); +} + +#[derive(Debug)] +pub struct AlterPartitionReassignmentsTopicRequest { + /// The topic name. + pub name: CompactString, + + /// The partitions to reassign. + pub partitions: Vec, + + /// The tagged fields. + pub tagged_fields: TaggedFields, +} + +impl WriteVersionedType for AlterPartitionReassignmentsTopicRequest +where + W: Write, +{ + fn write_versioned( + &self, + writer: &mut W, + version: ApiVersion, + ) -> Result<(), WriteVersionedError> { + let v = version.0 .0; + assert!(v <= 0); + + self.name.write(writer)?; + write_compact_versioned_array(writer, version, Some(&self.partitions))?; + self.tagged_fields.write(writer)?; + + Ok(()) + } +} + +#[derive(Debug)] +pub struct AlterPartitionReassignmentsPartitionRequest { + /// The partition index. + pub partition_index: Int32, + + /// The replicas to place the partitions on, or null to cancel a pending reassignment for this partition. + pub replicas: CompactArray, + + /// The tagged fields. + pub tagged_fields: TaggedFields, +} + +impl WriteVersionedType for AlterPartitionReassignmentsPartitionRequest +where + W: Write, +{ + fn write_versioned( + &self, + writer: &mut W, + version: ApiVersion, + ) -> Result<(), WriteVersionedError> { + let v = version.0 .0; + assert!(v <= 0); + + self.partition_index.write(writer)?; + self.replicas.write(writer)?; + self.tagged_fields.write(writer)?; + + Ok(()) + } +} + +#[derive(Debug)] +pub struct AlterPartitionReassignmentsResponse { + /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the + /// request did not violate any quota. + pub throttle_time_ms: Int32, + + /// The top-level error code, or 0 if there was no error. + pub error: Option, + + /// The top-level error message, or null if there was no error. + pub error_message: CompactNullableString, + + /// The responses to topics to reassign. + pub responses: Vec, + + /// The tagged fields. + pub tagged_fields: TaggedFields, +} + +impl ReadVersionedType for AlterPartitionReassignmentsResponse +where + R: Read, +{ + fn read_versioned(reader: &mut R, version: ApiVersion) -> Result { + let v = version.0 .0; + assert!(v <= 0); + + let throttle_time_ms = Int32::read(reader)?; + let error = Error::new(Int16::read(reader)?.0); + let error_message = CompactNullableString::read(reader)?; + let responses = read_compact_versioned_array(reader, version)?.unwrap_or_default(); + let tagged_fields = TaggedFields::read(reader)?; + + Ok(Self { + throttle_time_ms, + error, + error_message, + responses, + tagged_fields, + }) + } +} + +#[derive(Debug)] +pub struct AlterPartitionReassignmentsTopicResponse { + /// The topic name + pub name: CompactString, + + /// The responses to partitions to reassign + pub partitions: Vec, + + /// The tagged fields. + pub tagged_fields: TaggedFields, +} + +impl ReadVersionedType for AlterPartitionReassignmentsTopicResponse +where + R: Read, +{ + fn read_versioned(reader: &mut R, version: ApiVersion) -> Result { + let v = version.0 .0; + assert!(v <= 0); + + let name = CompactString::read(reader)?; + let partitions = read_compact_versioned_array(reader, version)?.unwrap_or_default(); + let tagged_fields = TaggedFields::read(reader)?; + + Ok(Self { + name, + partitions, + tagged_fields, + }) + } +} + +#[derive(Debug)] +pub struct AlterPartitionReassignmentsPartitionResponse { + /// The partition index. + pub partition_index: Int32, + + /// The error code for this partition, or 0 if there was no error. + pub error: Option, + + /// The error message for this partition, or null if there was no error. + pub error_message: CompactNullableString, + + /// The tagged fields. + pub tagged_fields: TaggedFields, +} + +impl ReadVersionedType for AlterPartitionReassignmentsPartitionResponse +where + R: Read, +{ + fn read_versioned(reader: &mut R, version: ApiVersion) -> Result { + let v = version.0 .0; + assert!(v <= 0); + + let partition_index = Int32::read(reader)?; + let error = Error::new(Int16::read(reader)?.0); + let error_message = CompactNullableString::read(reader)?; + let tagged_fields = TaggedFields::read(reader)?; + + Ok(Self { + partition_index, + error, + error_message, + tagged_fields, + }) + } +} diff --git a/src/protocol/messages/elect_leaders.rs b/src/protocol/messages/elect_leaders.rs new file mode 100644 index 00000000..97959b23 --- /dev/null +++ b/src/protocol/messages/elect_leaders.rs @@ -0,0 +1,281 @@ +use std::io::{Read, Write}; + +use crate::protocol::{ + api_key::ApiKey, + api_version::{ApiVersion, ApiVersionRange}, + error::Error, + messages::{ + read_compact_versioned_array, read_versioned_array, write_compact_versioned_array, + write_versioned_array, + }, + primitives::{ + Array, CompactArrayRef, CompactNullableString, CompactString, CompactStringRef, Int16, + Int32, Int8, NullableString, String_, TaggedFields, + }, + traits::{ReadType, WriteType}, +}; + +use super::{ + ReadVersionedError, ReadVersionedType, RequestBody, WriteVersionedError, WriteVersionedType, +}; + +#[derive(Debug)] +pub struct ElectLeadersRequest { + /// Type of elections to conduct for the partition. + /// + /// A value of `0` elects the preferred replica. A value of `1` elects the first live replica if there are no + /// in-sync replica. + /// + /// Added in version 1. + pub election_type: Int8, + + /// The topic partitions to elect leaders. + pub topic_partitions: Vec, + + /// The time in ms to wait for the election to complete. + pub timeout_ms: Int32, + + /// The tagged fields. + /// + /// Added in version 2 + pub tagged_fields: Option, +} + +impl WriteVersionedType for ElectLeadersRequest +where + W: Write, +{ + fn write_versioned( + &self, + writer: &mut W, + version: ApiVersion, + ) -> Result<(), WriteVersionedError> { + let v = version.0 .0; + assert!(v <= 2); + + if v >= 1 { + self.election_type.write(writer)?; + } + + if v >= 2 { + write_compact_versioned_array(writer, version, Some(&self.topic_partitions))?; + } else { + write_versioned_array(writer, version, Some(&self.topic_partitions))?; + } + + self.timeout_ms.write(writer)?; + + if v >= 2 { + match self.tagged_fields.as_ref() { + Some(tagged_fields) => { + tagged_fields.write(writer)?; + } + None => { + TaggedFields::default().write(writer)?; + } + } + } + + Ok(()) + } +} + +impl RequestBody for ElectLeadersRequest { + type ResponseBody = ElectLeadersResponse; + + const API_KEY: ApiKey = ApiKey::ElectLeaders; + + /// All versions. + const API_VERSION_RANGE: ApiVersionRange = + ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(2))); + + const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(2)); +} + +#[derive(Debug)] +pub struct ElectLeadersTopicRequest { + /// The name of a topic. + pub topic: String_, + + /// The partitions of this topic whose leader should be elected. + pub partitions: Array, + + /// The tagged fields. + /// + /// Added in version 2 + pub tagged_fields: Option, +} + +impl WriteVersionedType for ElectLeadersTopicRequest +where + W: Write, +{ + fn write_versioned( + &self, + writer: &mut W, + version: ApiVersion, + ) -> Result<(), WriteVersionedError> { + let v = version.0 .0; + assert!(v <= 2); + + if v >= 2 { + CompactStringRef(&self.topic.0).write(writer)?; + } else { + self.topic.write(writer)?; + } + + if v >= 2 { + CompactArrayRef(self.partitions.0.as_deref()).write(writer)?; + } else { + self.partitions.write(writer)?; + } + + if v >= 2 { + match self.tagged_fields.as_ref() { + Some(tagged_fields) => { + tagged_fields.write(writer)?; + } + None => { + TaggedFields::default().write(writer)?; + } + } + } + + Ok(()) + } +} + +#[derive(Debug)] +pub struct ElectLeadersResponse { + /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the + /// request did not violate any quota. + pub throttle_time_ms: Int32, + + /// The top level response error code. + /// + /// Added in version 1. + pub error: Option, + + /// The election results, or an empty array if the requester did not have permission and the request asks for all + /// partitions. + pub replica_election_results: Vec, + + /// The tagged fields. + /// + /// Added in version 2 + pub tagged_fields: Option, +} + +impl ReadVersionedType for ElectLeadersResponse +where + R: Read, +{ + fn read_versioned(reader: &mut R, version: ApiVersion) -> Result { + let v = version.0 .0; + assert!(v <= 2); + + let throttle_time_ms = Int32::read(reader)?; + let error = (v >= 1) + .then(|| Int16::read(reader)) + .transpose()? + .and_then(|e| Error::new(e.0)); + let replica_election_results = if v >= 2 { + read_compact_versioned_array(reader, version)?.unwrap_or_default() + } else { + read_versioned_array(reader, version)?.unwrap_or_default() + }; + let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?; + + Ok(Self { + throttle_time_ms, + error, + replica_election_results, + tagged_fields, + }) + } +} + +#[derive(Debug)] +pub struct ElectLeadersTopicResponse { + /// The topic name. + pub topic: String_, + + /// The results for each partition. + pub partition_results: Vec, + + /// The tagged fields. + /// + /// Added in version 2 + pub tagged_fields: Option, +} + +impl ReadVersionedType for ElectLeadersTopicResponse +where + R: Read, +{ + fn read_versioned(reader: &mut R, version: ApiVersion) -> Result { + let v = version.0 .0; + assert!(v <= 2); + + let topic = if v >= 2 { + String_(CompactString::read(reader)?.0) + } else { + String_::read(reader)? + }; + let partition_results = if v >= 2 { + read_compact_versioned_array(reader, version)?.unwrap_or_default() + } else { + read_versioned_array(reader, version)?.unwrap_or_default() + }; + let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?; + + Ok(Self { + topic, + partition_results, + tagged_fields, + }) + } +} + +#[derive(Debug)] +pub struct ElectLeadersPartitionResponse { + /// The partition id. + pub partition_id: Int32, + + /// The result error, or zero if there was no error. + pub error: Option, + + /// The result message, or null if there was no error. + pub error_message: NullableString, + + /// The tagged fields. + /// + /// Added in version 2 + pub tagged_fields: Option, +} + +impl ReadVersionedType for ElectLeadersPartitionResponse +where + R: Read, +{ + fn read_versioned(reader: &mut R, version: ApiVersion) -> Result { + let v = version.0 .0; + assert!(v <= 2); + + let partition_id = Int32::read(reader)?; + let error = Error::new(Int16::read(reader)?.0); + let error_message = if v >= 2 { + NullableString(CompactNullableString::read(reader)?.0) + } else { + NullableString::read(reader)? + }; + let tagged_fields = (v >= 2).then(|| TaggedFields::read(reader)).transpose()?; + + Ok(Self { + partition_id, + error, + error_message, + tagged_fields, + }) + } +} diff --git a/src/protocol/messages/mod.rs b/src/protocol/messages/mod.rs index c7c1d4ca..e06b48b9 100644 --- a/src/protocol/messages/mod.rs +++ b/src/protocol/messages/mod.rs @@ -16,6 +16,8 @@ use super::{ vec_builder::VecBuilder, }; +mod alter_partition_reassignments; +pub use alter_partition_reassignments::*; mod api_versions; pub use api_versions::*; mod constants; @@ -24,6 +26,8 @@ mod create_topics; pub use create_topics::*; mod delete_records; pub use delete_records::*; +mod elect_leaders; +pub use elect_leaders::*; mod fetch; pub use fetch::*; mod header; diff --git a/tests/client.rs b/tests/client.rs index 8871858c..abb6f99e 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,7 +1,8 @@ use assert_matches::assert_matches; use rskafka::{ client::{ - error::{Error as ClientError, ProtocolError}, + controller::ElectionType, + error::{Error as ClientError, ProtocolError, RequestError}, partition::{Compression, OffsetAt}, ClientBuilder, }, @@ -540,6 +541,99 @@ async fn test_delete_records() { ); } +#[tokio::test] +async fn test_metadata() { + maybe_start_logging(); + + let connection = maybe_skip_kafka_integration!(); + let topic_name = random_topic_name(); + + let client = ClientBuilder::new(connection).build().await.unwrap(); + + let controller_client = client.controller_client().unwrap(); + controller_client + .create_topic(&topic_name, 1, 1, 5_000) + .await + .unwrap(); + + let md = client.metadata().await.unwrap(); + assert!(!md.brokers.is_empty()); + + // topic metadata might take a while to converge + tokio::time::timeout(Duration::from_millis(1_000), async { + loop { + let md = client.metadata().await.unwrap(); + let topic = md.topics.into_iter().find(|topic| topic.name == topic_name); + if topic.is_some() { + return; + } + + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_reassign_partitions() { + maybe_start_logging(); + + let connection = maybe_skip_kafka_integration!(); + let topic_name = random_topic_name(); + + let client = ClientBuilder::new(connection).build().await.unwrap(); + + let controller_client = client.controller_client().unwrap(); + controller_client + .create_topic(&topic_name, 1, 1, 5_000) + .await + .unwrap(); + + let res = controller_client + .reassign_partitions(&topic_name, 0, vec![0, 1], 5_000) + .await; + match res { + Ok(()) => {} + Err(ClientError::Request(RequestError::NoVersionMatch { .. })) + if std::env::var("TEST_REASSIGN_PARTITIONS").is_err() => + { + println!("Skip test_elect_leaders"); + } + Err(e) => panic!("Unexpected error: {e}"), + } +} + +#[tokio::test] +async fn test_elect_leaders() { + maybe_start_logging(); + + let connection = maybe_skip_kafka_integration!(); + let topic_name = random_topic_name(); + + let client = ClientBuilder::new(connection).build().await.unwrap(); + + let controller_client = client.controller_client().unwrap(); + controller_client + .create_topic(&topic_name, 1, 3, 5_000) + .await + .unwrap(); + + let res = controller_client + .elect_leaders(&topic_name, 0, ElectionType::Preferred, 5_000) + .await; + match res { + Ok(()) => {} + Err(ClientError::ServerError(ProtocolError::ElectionNotNeeded, _)) => {} + Err(ClientError::Request(RequestError::NoVersionMatch { .. })) + if std::env::var("TEST_ELECT_LEADERS").is_err() => + { + println!("Skip test_elect_leaders"); + } + Err(e) => panic!("Unexpected error: {e}"), + } +} + pub fn large_record() -> Record { Record { key: Some(b"".to_vec()),