diff --git a/src/config.rs b/src/config.rs index 296d9f867..d4eeb6f60 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,7 +23,7 @@ //! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md use std::collections::HashMap; -use std::ffi::{CStr, CString}; +use std::ffi::CString; use std::iter::FromIterator; use std::os::raw::c_char; use std::ptr; diff --git a/src/groups.rs b/src/groups.rs index 2c805dc79..69ed060c7 100644 --- a/src/groups.rs +++ b/src/groups.rs @@ -2,12 +2,11 @@ use std::ffi::CStr; use std::fmt; -use std::slice; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -use crate::util::{KafkaDrop, NativePtr}; +use crate::util::{self, KafkaDrop, NativePtr}; /// Group member information container. pub struct GroupMemberInfo(RDKafkaGroupMemberInfo); @@ -43,28 +42,20 @@ impl GroupMemberInfo { /// Return the metadata of the member. pub fn metadata(&self) -> Option<&[u8]> { unsafe { - if self.0.member_metadata.is_null() { - None - } else { - Some(slice::from_raw_parts::( - self.0.member_metadata as *const u8, - self.0.member_metadata_size as usize, - )) - } + util::ptr_to_opt_slice( + self.0.member_metadata as *const u8, + self.0.member_metadata_size as usize, + ) } } /// Return the partition assignment of the member. pub fn assignment(&self) -> Option<&[u8]> { unsafe { - if self.0.member_assignment.is_null() { - None - } else { - Some(slice::from_raw_parts::( - self.0.member_assignment as *const u8, - self.0.member_assignment_size as usize, - )) - } + util::ptr_to_opt_slice( + self.0.member_assignment as *const u8, + self.0.member_assignment_size as usize, + ) } } } @@ -85,7 +76,7 @@ impl GroupInfo { /// Returns the members of the group. pub fn members(&self) -> &[GroupMemberInfo] { unsafe { - slice::from_raw_parts( + util::ptr_to_slice( self.0.members as *const GroupMemberInfo, self.0.member_cnt as usize, ) @@ -149,8 +140,6 @@ impl GroupList { /// Returns all the groups in the list. pub fn groups(&self) -> &[GroupInfo] { - unsafe { - slice::from_raw_parts(self.0.groups as *const GroupInfo, self.0.group_cnt as usize) - } + unsafe { util::ptr_to_slice(self.0.groups as *const GroupInfo, self.0.group_cnt as usize) } } } diff --git a/src/message.rs b/src/message.rs index 76bac9c39..96f0195c9 100644 --- a/src/message.rs +++ b/src/message.rs @@ -283,7 +283,7 @@ impl Headers for BorrowedHeaders { Some(Header { key: CStr::from_ptr(name_ptr).to_str().unwrap(), value: (!value_ptr.is_null()) - .then(|| util::ptr_to_slice(value_ptr, value_size)), + .then(|| util::ptr_to_slice(value_ptr as *const u8, value_size)), }) } } @@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> { type Headers = BorrowedHeaders; fn key(&self) -> Option<&[u8]> { - unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) } + unsafe { util::ptr_to_opt_slice(self.ptr.key as *const u8, self.ptr.key_len) } } fn payload(&self) -> Option<&[u8]> { - unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) } + unsafe { util::ptr_to_opt_slice(self.ptr.payload as *const u8, self.ptr.len) } } unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> { - util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len) + util::ptr_to_opt_mut_slice(self.ptr.payload as *mut u8, self.ptr.len) } fn topic(&self) -> &str { unsafe { - CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt)) + CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt)) .to_str() .expect("Topic name is not valid UTF-8") } diff --git a/src/metadata.rs b/src/metadata.rs index a70ab1787..2e4320751 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -1,13 +1,12 @@ //! Cluster metadata. use std::ffi::CStr; -use std::slice; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; use crate::error::IsError; -use crate::util::{KafkaDrop, NativePtr}; +use crate::util::{self, KafkaDrop, NativePtr}; /// Broker metadata information. pub struct MetadataBroker(RDKafkaMetadataBroker); @@ -60,12 +59,12 @@ impl MetadataPartition { /// Returns the broker IDs of the replicas. pub fn replicas(&self) -> &[i32] { - unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) } + unsafe { util::ptr_to_slice(self.0.replicas, self.0.replica_cnt as usize) } } /// Returns the broker IDs of the in-sync replicas. pub fn isr(&self) -> &[i32] { - unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) } + unsafe { util::ptr_to_slice(self.0.isrs, self.0.isr_cnt as usize) } } } @@ -85,7 +84,7 @@ impl MetadataTopic { /// Returns the partition metadata information for all the partitions. pub fn partitions(&self) -> &[MetadataPartition] { unsafe { - slice::from_raw_parts( + ptr_to_slice( self.0.partitions as *const MetadataPartition, self.0.partition_cnt as usize, ) @@ -141,7 +140,7 @@ impl Metadata { /// Returns the metadata information for all the brokers in the cluster. pub fn brokers(&self) -> &[MetadataBroker] { unsafe { - slice::from_raw_parts( + util::ptr_to_slice( self.0.brokers as *const MetadataBroker, self.0.broker_cnt as usize, ) @@ -151,7 +150,7 @@ impl Metadata { /// Returns the metadata information for all the topics in the cluster. pub fn topics(&self) -> &[MetadataTopic] { unsafe { - slice::from_raw_parts( + util::ptr_to_slice( self.0.topics as *const MetadataTopic, self.0.topic_cnt as usize, ) diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 1cc6e05ce..472d72562 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -46,7 +46,6 @@ use std::marker::PhantomData; use std::mem; use std::os::raw::c_void; use std::ptr; -use std::slice; use std::str; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -67,7 +66,7 @@ use crate::producer::{ DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig, }; use crate::topic_partition_list::TopicPartitionList; -use crate::util::{IntoOpaque, NativePtr, Timeout}; +use crate::util::{self, IntoOpaque, NativePtr, Timeout}; pub use crate::message::DeliveryResult; @@ -217,11 +216,7 @@ unsafe extern "C" fn partitioner_cb> let is_partition_available = |p: i32| rdsys::rd_kafka_topic_partition_available(topic, p) == 1; - let key = if keydata.is_null() { - None - } else { - Some(slice::from_raw_parts(keydata as *const u8, keylen)) - }; + let key = util::ptr_to_opt_slice(keydata, keylen); let producer_context = &mut *(rkt_opaque as *mut C); diff --git a/src/topic_partition_list.rs b/src/topic_partition_list.rs index 1d8e77ce9..71c28eacb 100644 --- a/src/topic_partition_list.rs +++ b/src/topic_partition_list.rs @@ -5,7 +5,6 @@ use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::fmt; -use std::slice; use std::str; use libc::c_void; @@ -139,7 +138,8 @@ impl<'a> TopicPartitionListElem<'a> { /// Returns the optional metadata associated with the entry. pub fn metadata(&self) -> &str { - let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) }; + let bytes = + unsafe { util::ptr_to_slice(self.ptr.metadata as *const u8, self.ptr.metadata_size) }; str::from_utf8(bytes).expect("Metadata is not UTF-8") } @@ -317,7 +317,7 @@ impl TopicPartitionList { /// Sets all partitions in the list to the specified offset. pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) }; for elem_ptr in slice { let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr); elem.set_offset(offset)?; @@ -327,7 +327,7 @@ impl TopicPartitionList { /// Returns all the elements of the list. pub fn elements(&self) -> Vec> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) }; let mut vec = Vec::with_capacity(slice.len()); for elem_ptr in slice { vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr)); @@ -337,7 +337,7 @@ impl TopicPartitionList { /// Returns all the elements of the list that belong to the specified topic. pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec> { - let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) }; + let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) }; let mut vec = Vec::with_capacity(slice.len()); for elem_ptr in slice { let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr); diff --git a/src/util.rs b/src/util.rs index 543481d3f..e319859b2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -105,32 +105,39 @@ pub fn current_time_millis() -> i64 { /// Converts a pointer to an array to an optional slice. If the pointer is null, /// returns `None`. -pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> { +pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const T, size: usize) -> Option<&'a [T]> { if ptr.is_null() { None } else { - Some(slice::from_raw_parts::(ptr as *const T, size)) + Some(slice::from_raw_parts(ptr, size)) } } -pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>( - ptr: *const c_void, - size: usize, -) -> Option<&'a mut [T]> { +pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(ptr: *mut T, size: usize) -> Option<&'a mut [T]> { if ptr.is_null() { None } else { - Some(slice::from_raw_parts_mut::(ptr as *mut T, size)) + Some(slice::from_raw_parts_mut(ptr, size)) } } /// Converts a pointer to an array to a slice. If the pointer is null or the /// size is zero, returns a zero-length slice.. -pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] { +pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const T, size: usize) -> &'a [T] { if ptr.is_null() || size == 0 { &[][..] } else { - slice::from_raw_parts::(ptr as *const T, size) + slice::from_raw_parts(ptr, size) + } +} + +/// Converts a pointer to an array to a mutable slice. If the pointer is null +/// or the size is zero, returns a zero-length slice. +pub(crate) unsafe fn ptr_to_mut_slice<'a, T>(ptr: *mut T, size: usize) -> &'a mut [T] { + if ptr.is_null() || size == 0 { + &mut [][..] + } else { + slice::from_raw_parts_mut(ptr, size) } }