diff --git a/Cargo.lock b/Cargo.lock index 708bf88b1..33ff75b18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1095,6 +1095,7 @@ dependencies = [ "smol", "tokio", "tracing", + "uuid", ] [[package]] @@ -1412,6 +1413,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" + [[package]] name = "vcpkg" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index b9045ce58..80e4dae6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ serde_json = "1.0.0" slab = "0.4" tokio = { version = "1.18", features = ["rt", "time"], optional = true } tracing = { version = "0.1.30", optional = true } +uuid = "1.10.0" [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 0005073ba..3d35f0cc8 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -99,6 +99,18 @@ pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t; /// Native rdkafka mock cluster. pub type RDKafkaMockCluster = bindings::rd_kafka_mock_cluster_t; +/// Native rdkafka topic collection. +pub type RDKafkaTopicCollection = bindings::rd_kafka_TopicCollection_t; + +/// Native rdkafka node. +pub type RDKafkaNode = bindings::rd_kafka_Node_t; + +/// Native rdkafka topic description. +pub type RDKafkaTopicDescription = bindings::rd_kafka_TopicDescription_t; + +/// Native rdkafka topic partition info. +pub type RDKafkaTopicPartitionInfo = bindings::rd_kafka_TopicPartitionInfo_t; + // ENUMS /// Client types. @@ -119,6 +131,15 @@ pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType; /// Config source. pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource; +/// Isolation level. +pub use bindings::rd_kafka_IsolationLevel_t as RDKafkaIsolationLevel; + +/// Consumer group state. +pub use bindings::rd_kafka_consumer_group_state_t as RDKafkaConsumerGroupState; + +/// ACL operation. +pub use bindings::rd_kafka_AclOperation_t as RDKafkaAclOperation; + // Errors enum /// Native rdkafka error code. diff --git a/src/admin.rs b/src/admin.rs index 0418f0cac..25decfab5 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -28,6 +28,7 @@ use crate::log::{trace, warn}; use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout}; use crate::TopicPartitionList; +use uuid::Uuid; // // ********** ADMIN CLIENT ********** // @@ -376,6 +377,88 @@ impl AdminClient { Ok(rx) } + /// Describe topics as specified by the `topic_names` array. + pub fn describe_topics<'a, I>( + &self, + topic_names: I, + opts: &AdminOptions, + ) -> impl Future>> + where + I: IntoIterator, + { + match self.describe_topics_inner(topic_names, opts) { + Ok(rx) => Either::Left(DescribeTopicsFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn describe_topics_inner<'a, I>( + &self, + topic_names: I, + opts: &AdminOptions, + ) -> KafkaResult> + where + I: IntoIterator, + { + let topic_names_cstrings = topic_names + .into_iter() + .map(CString::new) + .collect::, _>>()?; + + // Don't consume topic_names_cstrings here because pointers become invalid. + // Use .iter() instead of .into_iter() + let mut topic_names_ptrs = topic_names_cstrings + .iter() + .map(|s| s.as_ptr()) + .collect::>(); + + let native_topic_collection = unsafe { + NativeTopicCollection::from_ptr(rdsys::rd_kafka_TopicCollection_of_topic_names( + topic_names_ptrs.as_mut_ptr(), + topic_names_ptrs.len(), + )) + .unwrap() + }; + let mut err_buf = ErrBuf::new(); + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DescribeTopics( + self.client.native_ptr(), + native_topic_collection.ptr(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + + /// Describe the Kafka cluster. + pub fn describe_cluster( + &self, + opts: &AdminOptions, + ) -> impl Future> { + match self.describe_cluster_inner(opts) { + Ok(rx) => Either::Left(DescribeClusterFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn describe_cluster_inner( + &self, + opts: &AdminOptions, + ) -> KafkaResult> { + let mut err_buf = ErrBuf::new(); + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DescribeCluster( + self.client.native_ptr(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + /// Returns the client underlying this admin client. pub fn inner(&self) -> &Client { &self.client @@ -483,6 +566,10 @@ pub struct AdminOptions { operation_timeout: Option, validate_only: bool, broker_id: Option, + require_stable_offsets: bool, + include_authorized_operations: bool, + match_consumer_group_states: Option>, + isolation_level: Option, } impl AdminOptions { @@ -507,7 +594,7 @@ impl AdminOptions { /// If unset (the default), the API calls will return immediately after /// triggering the operation. /// - /// Only the CreateTopics, DeleteTopics, and CreatePartitions API calls + /// Only the CreatePartitions, CreateTopics, DeleteTopics, and DeleteRecords API calls /// respect this option. pub fn operation_timeout>(mut self, timeout: Option) -> Self { self.operation_timeout = timeout.map(Into::into); @@ -518,6 +605,9 @@ impl AdminOptions { /// requested operation. /// /// Defaults to false. + /// + /// Only the CreateTopics, CreatePartitions, AlterConfigs, and IncrementalAlterConfigs + /// API calls respect this option. pub fn validate_only(mut self, validate_only: bool) -> Self { self.validate_only = validate_only; self @@ -532,6 +622,46 @@ impl AdminOptions { self } + /// Whether the broker should return stable offsets (transaction-committed). + /// + /// Defaults to false. + /// + /// Only the ListConsumerGroupOffsets API call respects this option. + pub fn require_stable_offsets(mut self, require_stable_offsets: bool) -> Self { + self.require_stable_offsets = require_stable_offsets; + self + } + + /// Whether the broker should return authorized operations. + /// + /// Defaults to false. + /// + /// Only the DescribeConsumerGroups, DescribeCluster, and DescribeTopics API calls + /// respect this option. + pub fn include_authorized_operations(mut self, include_authorized_operations: bool) -> Self { + self.include_authorized_operations = include_authorized_operations; + self + } + + /// List of consumer group states to query for. + /// + /// Only the ListConsumerGroups API call respects this option. + pub fn match_consumer_group_states>>( + mut self, + match_consumer_group_states: T, + ) -> Self { + self.match_consumer_group_states = Some(match_consumer_group_states.into()); + self + } + + /// Isolation Level needed for list Offset to query for. + /// + /// Defaults to read uncommitted. + pub fn isolation_level>(mut self, isolation_level: T) -> Self { + self.isolation_level = Some(isolation_level.into()); + self + } + fn to_native( &self, client: *mut RDKafka, @@ -593,6 +723,48 @@ impl AdminOptions { check_rdkafka_invalid_arg(res, err_buf)?; } + if self.require_stable_offsets { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_require_stable_offsets( + native_opts.ptr(), + 1, // true + ) + }; + let res = unsafe { rdsys::rd_kafka_error_code(res) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if self.include_authorized_operations { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_include_authorized_operations( + native_opts.ptr(), + 1, // true + ) + }; + let res = unsafe { rdsys::rd_kafka_error_code(res) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if let Some(match_consumer_group_states) = &self.match_consumer_group_states { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_match_consumer_group_states( + native_opts.ptr(), + match_consumer_group_states.as_ptr(), + match_consumer_group_states.len(), + ) + }; + let res = unsafe { rdsys::rd_kafka_error_code(res) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if let Some(isolation_level) = self.isolation_level { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_isolation_level(native_opts.ptr(), isolation_level) + }; + let res = unsafe { rdsys::rd_kafka_error_code(res) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + let (tx, rx) = oneshot::channel(); let tx = Box::into_raw(Box::new(tx)) as *mut c_void; unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) }; @@ -1340,3 +1512,307 @@ impl Future for AlterConfigsFuture { Poll::Ready(Ok(out)) } } + +// +// Describe topics handling +// + +/// The result of a DescribeTopics operation. +pub type TopicDescriptionResult = Result; + +/// Node represents a broker. +#[derive(Debug)] +pub struct Node { + /// Node id. + pub id: i32, + /// Node host. + pub host: String, + /// Node port. + pub port: u16, + /// (Optional) Node rack id. + pub rack: Option, +} + +/// TopicPartition result type in DescribeTopics result. +#[derive(Debug)] +pub struct TopicPartitionInfo { + /// Partition id. + pub partition: i32, + /// Leader of the partition. + pub leader: Node, + /// List of in sync replica nodes. + pub isr: Vec, + /// List of replica nodes. + pub replicas: Vec, +} + +/// Apache Kafka ACL operation types. Common type for multiple Admin API functions. +#[derive(Debug, Eq, PartialEq)] +pub enum AclOperation { + /// Unknown + Unknown, + /// In a filter, matches any AclOperation + Any, + /// ALL operation + All, + /// READ operation + Read, + /// WRITE operation + Write, + /// CREATE operation + Create, + /// DELETE operation + Delete, + /// ALTER operation + Alter, + /// DESCRIBE operation + Describe, + /// CLUSTER_ACTION operation + ClusterAction, + /// DESCRIBE_CONFIGS operation + DescribeConfigs, + /// ALTER_CONFIGS operation + AlterConfigs, + /// IDEMPOTENT_WRITE operation + IdempotentWrite, +} + +/// DescribeTopics result. +#[derive(Debug)] +pub struct TopicDescription { + /// Topic name. + pub name: String, + /// Topic id. + pub topic_id: Uuid, + /// Partitions. + pub partitions: Vec, + /// Is the topic internal to Kafka? + pub is_internal: bool, + /// Operations allowed for topic. It may be None if operations were not requested. + pub authorized_operations: Option>, +} + +type NativeTopicCollection = NativePtr; + +unsafe impl KafkaDrop for RDKafkaTopicCollection { + const TYPE: &'static str = "topic collection"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_TopicCollection_destroy; +} + +struct DescribeTopicsFuture { + rx: oneshot::Receiver, +} + +impl Future for DescribeTopicsFuture { + type Output = KafkaResult>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DescribeTopics_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "describe topics request received response of incorrect type ({})", + typ + )))); + } + let mut n = 0; + let topic_descriptions = + unsafe { rdsys::rd_kafka_DescribeTopics_result_topics(res, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + let topic_description = unsafe { *topic_descriptions.add(i) }; + + let err = unsafe { + let err = rdsys::rd_kafka_TopicDescription_error(topic_description); + rdsys::rd_kafka_error_code(err) + }; + + let mut n_operations = 0; + let operations = unsafe { + rdsys::rd_kafka_TopicDescription_authorized_operations( + topic_description, + &mut n_operations, + ) + }; + + let topic_description = TopicDescription { + name: unsafe { + cstr_to_owned(rdsys::rd_kafka_TopicDescription_name(topic_description)) + }, + topic_id: extract_topic_id(topic_description), + partitions: extract_partitions(topic_description), + is_internal: unsafe { + rdsys::rd_kafka_TopicDescription_is_internal(topic_description) + } != 0, + authorized_operations: extract_authorized_operations(operations, n_operations)?, + }; + + if err.is_error() { + out.push(Err((topic_description, err.into()))); + } else { + out.push(Ok(topic_description)); + } + } + Poll::Ready(Ok(out)) + } +} + +fn extract_topic_id(topic_description: *const RDKafkaTopicDescription) -> Uuid { + let topic_id = unsafe { rdsys::rd_kafka_TopicDescription_topic_id(topic_description) }; + let high_bits = unsafe { rdsys::rd_kafka_Uuid_most_significant_bits(topic_id) } as u64; + let low_bits = unsafe { rdsys::rd_kafka_Uuid_least_significant_bits(topic_id) } as u64; + Uuid::from_u64_pair(high_bits, low_bits) +} + +fn extract_partitions( + topic_description: *const RDKafkaTopicDescription, +) -> Vec { + let mut n = 0; + let partitions = + unsafe { rdsys::rd_kafka_TopicDescription_partitions(topic_description, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + let partition = unsafe { *partitions.add(i) }; + + let mut n_isr = 0; + let isr = unsafe { rdsys::rd_kafka_TopicPartitionInfo_isr(partition, &mut n_isr) }; + + let mut n_replicas = 0; + let replicas = + unsafe { rdsys::rd_kafka_TopicPartitionInfo_replicas(partition, &mut n_replicas) }; + + out.push(TopicPartitionInfo { + partition: unsafe { rdsys::rd_kafka_TopicPartitionInfo_partition(partition) }, + leader: extract_node(unsafe { rdsys::rd_kafka_TopicPartitionInfo_leader(partition) }), + isr: extract_nodes(isr, n_isr), + replicas: extract_nodes(replicas, n_replicas), + }); + } + out +} + +fn extract_node(node: *const RDKafkaNode) -> Node { + let rack = unsafe { rdsys::rd_kafka_Node_rack(node) }; + let rack = if rack.is_null() { + None + } else { + Some(unsafe { cstr_to_owned(rack) }) + }; + Node { + id: unsafe { rdsys::rd_kafka_Node_id(node) }, + host: unsafe { cstr_to_owned(rdsys::rd_kafka_Node_host(node)) }, + port: unsafe { rdsys::rd_kafka_Node_port(node) }, + rack, + } +} + +fn extract_nodes(nodes: *mut *const RDKafkaNode, n: usize) -> Vec { + let mut out = Vec::with_capacity(n); + for i in 0..n { + out.push(extract_node(unsafe { *nodes.add(i) })); + } + out +} + +fn extract_authorized_operations( + operations: *const RDKafkaAclOperation, + n: usize, +) -> KafkaResult>> { + if operations.is_null() { + return Ok(None); + } + + let mut out = Vec::with_capacity(n); + for i in 0..n { + let operation = match unsafe { *operations.add(i) } { + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_UNKNOWN => AclOperation::Unknown, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ANY => AclOperation::Any, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALL => AclOperation::All, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_READ => AclOperation::Read, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_WRITE => AclOperation::Write, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_CREATE => AclOperation::Create, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DELETE => AclOperation::Delete, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALTER => AclOperation::Alter, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE => AclOperation::Describe, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION => { + AclOperation::ClusterAction + } + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS => { + AclOperation::DescribeConfigs + } + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS => AclOperation::AlterConfigs, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE => { + AclOperation::IdempotentWrite + } + _ => { + return Err(KafkaError::AdminOpCreation(format!( + "bogus acl operation in kafka response: {:?}", + unsafe { *operations.add(i) } + ))) + } + }; + out.push(operation); + } + Ok(Some(out)) +} + +// +// Describe cluster handling +// + +/// DescribeCluster result. +#[derive(Debug)] +pub struct ClusterDescription { + /// Cluster id. + pub cluster_id: String, + /// Current controller. + pub controller: Node, + /// Brokers in the cluster. + pub nodes: Vec, + /// Operations allowed for cluster. It may be None if operations were not requested. + pub authorized_operations: Option>, +} + +struct DescribeClusterFuture { + rx: oneshot::Receiver, +} + +impl Future for DescribeClusterFuture { + type Output = KafkaResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DescribeCluster_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "describe cluster request received response of incorrect type ({})", + typ + )))); + } + + let mut n_nodes = 0; + let nodes = unsafe { rdsys::rd_kafka_DescribeCluster_result_nodes(res, &mut n_nodes) }; + + let mut n_operations = 0; + let operations = unsafe { + rdsys::rd_kafka_DescribeCluster_result_authorized_operations(res, &mut n_operations) + }; + + let out = Ok(ClusterDescription { + cluster_id: unsafe { + cstr_to_owned(rdsys::rd_kafka_DescribeCluster_result_cluster_id(res)) + }, + controller: extract_node(unsafe { + rdsys::rd_kafka_DescribeCluster_result_controller(res) + }), + nodes: extract_nodes(nodes, n_nodes), + authorized_operations: extract_authorized_operations(operations, n_operations)?, + }); + + Poll::Ready(out) + } +} diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 4cf0e9a81..6b1c9121c 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -537,6 +537,103 @@ async fn test_configs() { assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]); } +#[tokio::test] +async fn test_describe_topics() { + let admin_client = create_admin_client(); + let opts = AdminOptions::new(); + + // Create a new topic with a single partition whose replication factor is 1. + let first_name = rand_test_topic("first_topic"); + let first_topic = NewTopic::new(&first_name, 1, TopicReplication::Fixed(1)); + let res = admin_client + .create_topics([&first_topic], &opts) + .await + .expect("topic creation failed"); + assert_eq!(res, &[Ok(first_name.clone())]); + + // Describe the created topic and verify its properties. + let res = admin_client + .describe_topics([first_name.as_ref()], &opts) + .await + .expect("describe topics failed"); + assert_eq!(res.len(), 1); + let topic_description = res[0].as_ref().expect("describe topics failed"); + assert_eq!(topic_description.name, first_name); + assert_eq!(topic_description.partitions.len(), 1); + assert_eq!(topic_description.partitions[0].replicas.len(), 1); + assert!(!topic_description.is_internal); + assert_eq!(topic_description.authorized_operations, None); + + // Create a second topic with 2 partitions whose replication factors are 1. + let second_name = rand_test_topic("second_topic"); + let second_topic = NewTopic::new(&second_name, 2, TopicReplication::Fixed(1)); + let res = admin_client + .create_topics([&second_topic], &opts) + .await + .expect("topic creation failed"); + assert_eq!(res, &[Ok(second_name.clone())]); + + // Describe both topics and verify their properties. + let res = admin_client + .describe_topics([first_name.as_ref(), second_name.as_ref()], &opts) + .await + .expect("describe topics failed"); + + assert_eq!(res.len(), 2); + + let first_topic_description = res[0].as_ref().expect("describe topics failed"); + assert_eq!(first_topic_description.name, first_name); + assert_eq!(first_topic_description.partitions.len(), 1); + assert_eq!(first_topic_description.partitions[0].replicas.len(), 1); + assert!(!first_topic_description.is_internal); + assert_eq!(first_topic_description.authorized_operations, None); + + let second_topic_description = res[1].as_ref().expect("describe topics failed"); + assert_eq!(second_topic_description.name, second_name); + assert_eq!(second_topic_description.partitions.len(), 2); + assert_eq!(second_topic_description.partitions[0].replicas.len(), 1); + assert_eq!(second_topic_description.partitions[1].replicas.len(), 1); + assert!(!second_topic_description.is_internal); + assert_eq!(second_topic_description.authorized_operations, None); + + // Include authorized operations in the description options and describe both topics again. + let opts = opts.include_authorized_operations(true); + let res = admin_client + .describe_topics([first_name.as_ref(), second_name.as_ref()], &opts) + .await + .expect("describe topics failed"); + + assert_eq!(res.len(), 2); + + let first_topic_description = res[0].as_ref().expect("describe topics failed"); + assert!(first_topic_description.authorized_operations.is_some()); + + let second_topic_description = res[1].as_ref().expect("describe topics failed"); + assert!(second_topic_description.authorized_operations.is_some()); +} + +#[tokio::test] +async fn test_describe_cluster() { + let admin_client = create_admin_client(); + let opts = AdminOptions::new(); + + // Describe the cluster and verify its properties. + let res = admin_client + .describe_cluster(&opts) + .await + .expect("describe cluster failed"); + assert!(!res.nodes.is_empty()); + assert_eq!(res.authorized_operations, None); + + // Describe the cluster with authorized operations and verify the properties. + let opts = opts.include_authorized_operations(true); + let res = admin_client + .describe_cluster(&opts) + .await + .expect("describe cluster failed"); + assert!(res.authorized_operations.is_some()); +} + #[tokio::test] async fn test_groups() { let admin_client = create_admin_client();