Skip to content

Fix panic in GroupInfo::members #680

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::ffi::CString;
use std::iter::FromIterator;
use std::os::raw::c_char;
use std::ptr;
Expand Down
33 changes: 11 additions & 22 deletions src/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

use std::ffi::CStr;
use std::fmt;
use std::slice;

use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::util::{KafkaDrop, NativePtr};
use crate::util::{self, KafkaDrop, NativePtr};

/// Group member information container.
pub struct GroupMemberInfo(RDKafkaGroupMemberInfo);
Expand Down Expand Up @@ -43,28 +42,20 @@ impl GroupMemberInfo {
/// Return the metadata of the member.
pub fn metadata(&self) -> Option<&[u8]> {
unsafe {
if self.0.member_metadata.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>(
self.0.member_metadata as *const u8,
self.0.member_metadata_size as usize,
))
}
util::ptr_to_opt_slice(
self.0.member_metadata as *const u8,
self.0.member_metadata_size as usize,
)
}
}

/// Return the partition assignment of the member.
pub fn assignment(&self) -> Option<&[u8]> {
unsafe {
if self.0.member_assignment.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>(
self.0.member_assignment as *const u8,
self.0.member_assignment_size as usize,
))
}
util::ptr_to_opt_slice(
self.0.member_assignment as *const u8,
self.0.member_assignment_size as usize,
)
}
}
}
Expand All @@ -85,7 +76,7 @@ impl GroupInfo {
/// Returns the members of the group.
pub fn members(&self) -> &[GroupMemberInfo] {
unsafe {
slice::from_raw_parts(
util::ptr_to_slice(
self.0.members as *const GroupMemberInfo,
self.0.member_cnt as usize,
)
Expand Down Expand Up @@ -149,8 +140,6 @@ impl GroupList {

/// Returns all the groups in the list.
pub fn groups(&self) -> &[GroupInfo] {
unsafe {
slice::from_raw_parts(self.0.groups as *const GroupInfo, self.0.group_cnt as usize)
}
unsafe { util::ptr_to_slice(self.0.groups as *const GroupInfo, self.0.group_cnt as usize) }
}
}
10 changes: 5 additions & 5 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl Headers for BorrowedHeaders {
Some(Header {
key: CStr::from_ptr(name_ptr).to_str().unwrap(),
value: (!value_ptr.is_null())
.then(|| util::ptr_to_slice(value_ptr, value_size)),
.then(|| util::ptr_to_slice(value_ptr as *const u8, value_size)),
})
}
}
Expand Down Expand Up @@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> {
type Headers = BorrowedHeaders;

fn key(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
unsafe { util::ptr_to_opt_slice(self.ptr.key as *const u8, self.ptr.key_len) }
}

fn payload(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
unsafe { util::ptr_to_opt_slice(self.ptr.payload as *const u8, self.ptr.len) }
}

unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
util::ptr_to_opt_mut_slice(self.ptr.payload as *mut u8, self.ptr.len)
}

fn topic(&self) -> &str {
unsafe {
CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt))
.to_str()
.expect("Topic name is not valid UTF-8")
}
Expand Down
13 changes: 6 additions & 7 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! Cluster metadata.

use std::ffi::CStr;
use std::slice;

use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::error::IsError;
use crate::util::{KafkaDrop, NativePtr};
use crate::util::{self, KafkaDrop, NativePtr};

/// Broker metadata information.
pub struct MetadataBroker(RDKafkaMetadataBroker);
Expand Down Expand Up @@ -60,12 +59,12 @@ impl MetadataPartition {

/// Returns the broker IDs of the replicas.
pub fn replicas(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
unsafe { util::ptr_to_slice(self.0.replicas, self.0.replica_cnt as usize) }
}

/// Returns the broker IDs of the in-sync replicas.
pub fn isr(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
unsafe { util::ptr_to_slice(self.0.isrs, self.0.isr_cnt as usize) }
}
}

Expand All @@ -85,7 +84,7 @@ impl MetadataTopic {
/// Returns the partition metadata information for all the partitions.
pub fn partitions(&self) -> &[MetadataPartition] {
unsafe {
slice::from_raw_parts(
ptr_to_slice(
self.0.partitions as *const MetadataPartition,
self.0.partition_cnt as usize,
)
Expand Down Expand Up @@ -141,7 +140,7 @@ impl Metadata {
/// Returns the metadata information for all the brokers in the cluster.
pub fn brokers(&self) -> &[MetadataBroker] {
unsafe {
slice::from_raw_parts(
util::ptr_to_slice(
self.0.brokers as *const MetadataBroker,
self.0.broker_cnt as usize,
)
Expand All @@ -151,7 +150,7 @@ impl Metadata {
/// Returns the metadata information for all the topics in the cluster.
pub fn topics(&self) -> &[MetadataTopic] {
unsafe {
slice::from_raw_parts(
util::ptr_to_slice(
self.0.topics as *const MetadataTopic,
self.0.topic_cnt as usize,
)
Expand Down
9 changes: 2 additions & 7 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use std::marker::PhantomData;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::slice;
use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -67,7 +66,7 @@ use crate::producer::{
DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
};
use crate::topic_partition_list::TopicPartitionList;
use crate::util::{IntoOpaque, NativePtr, Timeout};
use crate::util::{self, IntoOpaque, NativePtr, Timeout};

pub use crate::message::DeliveryResult;

Expand Down Expand Up @@ -217,11 +216,7 @@ unsafe extern "C" fn partitioner_cb<Part: Partitioner, C: ProducerContext<Part>>

let is_partition_available = |p: i32| rdsys::rd_kafka_topic_partition_available(topic, p) == 1;

let key = if keydata.is_null() {
None
} else {
Some(slice::from_raw_parts(keydata as *const u8, keylen))
};
let key = util::ptr_to_opt_slice(keydata, keylen);

let producer_context = &mut *(rkt_opaque as *mut C);

Expand Down
10 changes: 5 additions & 5 deletions src/topic_partition_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::slice;
use std::str;

use libc::c_void;
Expand Down Expand Up @@ -139,7 +138,8 @@ impl<'a> TopicPartitionListElem<'a> {

/// Returns the optional metadata associated with the entry.
pub fn metadata(&self) -> &str {
let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
let bytes =
unsafe { util::ptr_to_slice(self.ptr.metadata as *const u8, self.ptr.metadata_size) };
str::from_utf8(bytes).expect("Metadata is not UTF-8")
}

Expand Down Expand Up @@ -317,7 +317,7 @@ impl TopicPartitionList {

/// Sets all partitions in the list to the specified offset.
pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) };
for elem_ptr in slice {
let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
elem.set_offset(offset)?;
Expand All @@ -327,7 +327,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list.
pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
Expand All @@ -337,7 +337,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list that belong to the specified topic.
pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
Expand Down
25 changes: 16 additions & 9 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,32 +105,39 @@ pub fn current_time_millis() -> i64 {

/// Converts a pointer to an array to an optional slice. If the pointer is null,
/// returns `None`.
pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const T, size: usize) -> Option<&'a [T]> {
if ptr.is_null() {
None
} else {
Some(slice::from_raw_parts::<T>(ptr as *const T, size))
Some(slice::from_raw_parts(ptr, size))
}
}

pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
ptr: *const c_void,
size: usize,
) -> Option<&'a mut [T]> {
pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(ptr: *mut T, size: usize) -> Option<&'a mut [T]> {
if ptr.is_null() {
None
} else {
Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
Some(slice::from_raw_parts_mut(ptr, size))
}
}

/// Converts a pointer to an array to a slice. If the pointer is null or the
/// size is zero, returns a zero-length slice..
pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const T, size: usize) -> &'a [T] {
if ptr.is_null() || size == 0 {
&[][..]
} else {
slice::from_raw_parts::<T>(ptr as *const T, size)
slice::from_raw_parts(ptr, size)
}
}

/// Converts a pointer to an array to a mutable slice. If the pointer is null
/// or the size is zero, returns a zero-length slice.
pub(crate) unsafe fn ptr_to_mut_slice<'a, T>(ptr: *mut T, size: usize) -> &'a mut [T] {
if ptr.is_null() || size == 0 {
&mut [][..]
} else {
slice::from_raw_parts_mut(ptr, size)
}
}

Expand Down