Skip to content

Commit 82f93ad

Browse files
Single active consumer implementation (#248)
* implementing Consumer_Update command * implementing ConsumerUpdateRequest command * SAC: starting implementation * Implementing callback support and consumer_update response * adding basic test * improved test * expand unit test scope * adding example * Adding README * enabling naming for super_stream consumers and setting up sac properties internally * expanding test * few improvements and test for simple SAC * making consumer_update callback able to call async methods * making Delivery export client in order to use store_offset and review super_stream example
1 parent ecd54d7 commit 82f93ad

File tree

18 files changed

+962
-36
lines changed

18 files changed

+962
-36
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
Single active consumer
2+
---
3+
4+
This is an example to enable single active consumer functionality for superstream:
5+
https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams
6+
https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
7+
8+
This folder contains a consumer and a super-stream consumer configured to enable it.
9+
You can use the example in the super-stream folder to produce messages for a super-stream.
10+
11+
You can then run the single_active_consumer_super_stream.rs in this folder.
12+
Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream.
13+
14+
You can then run another consumer in parallel.
15+
now you'll see that one of the two consumers will consume from 2 streams while the other on one stream.
16+
17+
If you run another you'll see that every Consumer will read from a single stream.
18+
19+
If you then stop one of the Consumer you'll notice that the related stream is now read from on the Consumer which is still running.
20+
21+
22+
23+
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::error::StreamCreateError;
3+
use rabbitmq_stream_client::types::{
4+
ByteCapacity, OffsetSpecification, ResponseCode,
5+
};
6+
7+
8+
#[tokio::main]
9+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
10+
use rabbitmq_stream_client::Environment;
11+
let environment = Environment::builder().build().await?;
12+
let message_count = 1000000;
13+
let stream = "hello-rust-super-stream-2";
14+
15+
let create_response = environment
16+
.stream_creator()
17+
.max_length(ByteCapacity::GB(5))
18+
.create(stream)
19+
.await;
20+
21+
if let Err(e) = create_response {
22+
if let StreamCreateError::Create { stream, status } = e {
23+
match status {
24+
// we can ignore this error because the stream already exists
25+
ResponseCode::StreamAlreadyExists => {}
26+
err => {
27+
println!("Error creating stream: {:?} {:?}", stream, err);
28+
}
29+
}
30+
}
31+
}
32+
println!(
33+
"Super stream consumer example, consuming messages from the super stream {}",
34+
stream
35+
);
36+
37+
let mut consumer = environment
38+
.consumer()
39+
// Mandatory if sac is enabled
40+
.name("consumer-group-1")
41+
.offset(OffsetSpecification::First)
42+
.enable_single_active_consumer(true)
43+
.client_provided_name("my super stream consumer for hello rust")
44+
.consumer_update(move |active, message_context| async move {
45+
let name = message_context.name();
46+
let stream = message_context.stream();
47+
let client = message_context.client();
48+
49+
println!(
50+
"single active consumer: is active: {} on stream: {} with consumer_name: {}",
51+
active, stream, name
52+
);
53+
let stored_offset = client.query_offset(name, stream.as_str()).await;
54+
55+
if let Err(e) = stored_offset {
56+
return OffsetSpecification::First;
57+
}
58+
59+
let stored_offset_u = stored_offset.unwrap();
60+
println!("restarting from stored_offset: {}", stored_offset_u);
61+
OffsetSpecification::Offset(stored_offset_u)
62+
63+
})
64+
.build(stream)
65+
.await
66+
.unwrap();
67+
68+
for i in 0..message_count {
69+
let delivery = consumer.next().await.unwrap();
70+
{
71+
let delivery = delivery.unwrap();
72+
println!(
73+
"Got message: {:#?} from stream: {} with offset: {}",
74+
delivery
75+
.message()
76+
.data()
77+
.map(|data| String::from_utf8(data.to_vec()).unwrap())
78+
.unwrap(),
79+
delivery.stream(),
80+
delivery.offset()
81+
);
82+
83+
//store an offset
84+
if i == 10000 {
85+
let _ = consumer
86+
.store_offset(i)
87+
.await
88+
.unwrap_or_else(|e| println!("Err: {}", e));
89+
}
90+
}
91+
}
92+
93+
println!("Stopping consumer...");
94+
let _ = consumer.handle().close().await;
95+
println!("consumer stopped");
96+
Ok(())
97+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::error::StreamCreateError;
3+
use rabbitmq_stream_client::types::{
4+
ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer,
5+
};
6+
use std::collections::HashMap;
7+
8+
#[tokio::main]
9+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
10+
use rabbitmq_stream_client::Environment;
11+
let environment = Environment::builder().build().await?;
12+
let message_count = 1000000;
13+
let super_stream = "hello-rust-super-stream";
14+
15+
let create_response = environment
16+
.stream_creator()
17+
.max_length(ByteCapacity::GB(5))
18+
.create_super_stream(super_stream, 3, None)
19+
.await;
20+
21+
if let Err(e) = create_response {
22+
if let StreamCreateError::Create { stream, status } = e {
23+
match status {
24+
// we can ignore this error because the stream already exists
25+
ResponseCode::StreamAlreadyExists => {}
26+
err => {
27+
println!("Error creating stream: {:?} {:?}", stream, err);
28+
}
29+
}
30+
}
31+
}
32+
println!(
33+
"Super stream consumer example, consuming messages from the super stream {}",
34+
super_stream
35+
);
36+
37+
let mut super_stream_consumer: SuperStreamConsumer = environment
38+
.super_stream_consumer()
39+
// Mandatory if sac is enabled
40+
.name("consumer-group-1")
41+
.offset(OffsetSpecification::First)
42+
.enable_single_active_consumer(true)
43+
.client_provided_name("my super stream consumer for hello rust")
44+
.consumer_update(move |active, message_context| async move {
45+
let name = message_context.name();
46+
let stream = message_context.stream();
47+
let client = message_context.client();
48+
49+
println!(
50+
"single active consumer: is active: {} on stream: {} with consumer_name: {}",
51+
active, stream, name
52+
);
53+
let stored_offset = client.query_offset(name, stream.as_str()).await;
54+
55+
if let Err(e) = stored_offset {
56+
return OffsetSpecification::First;
57+
}
58+
let stored_offset_u = stored_offset.unwrap();
59+
println!("stored_offset_u {}", stored_offset_u.clone());
60+
OffsetSpecification::Offset(stored_offset_u)
61+
62+
})
63+
.build(super_stream)
64+
.await
65+
.unwrap();
66+
67+
for _ in 0..message_count {
68+
let delivery = super_stream_consumer.next().await.unwrap();
69+
{
70+
let delivery = delivery.unwrap();
71+
println!(
72+
"Got message: {:#?} from stream: {} with offset: {}",
73+
delivery
74+
.message()
75+
.data()
76+
.map(|data| String::from_utf8(data.to_vec()).unwrap())
77+
.unwrap(),
78+
delivery.stream(),
79+
delivery.offset()
80+
);
81+
82+
// Store an offset for every consumer
83+
if delivery.consumer_name().is_some() && delivery.offset() == 1000 {
84+
super_stream_consumer.client().store_offset(delivery.consumer_name().unwrap().as_str(), delivery.stream().as_str(), delivery.offset()).await;
85+
}
86+
87+
}
88+
}
89+
90+
println!("Stopping super stream consumer...");
91+
let _ = super_stream_consumer.handle().close().await;
92+
println!("Super stream consumer stopped");
93+
Ok(())
94+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_CONSUMER_UPDATE,
10+
};
11+
12+
use super::Command;
13+
14+
#[cfg_attr(test, derive(fake::Dummy))]
15+
#[derive(PartialEq, Eq, Debug)]
16+
pub struct ConsumerUpdateCommand {
17+
pub(crate) correlation_id: u32,
18+
subscription_id: u8,
19+
active: u8,
20+
}
21+
22+
impl ConsumerUpdateCommand {
23+
pub fn new(correlation_id: u32, subscription_id: u8, active: u8) -> Self {
24+
Self {
25+
correlation_id,
26+
subscription_id,
27+
active,
28+
}
29+
}
30+
31+
pub fn get_correlation_id(&self) -> u32 {
32+
self.correlation_id
33+
}
34+
35+
pub fn is_active(&self) -> u8 {
36+
self.active
37+
}
38+
}
39+
40+
impl Encoder for ConsumerUpdateCommand {
41+
fn encoded_size(&self) -> u32 {
42+
self.correlation_id.encoded_size()
43+
+ self.subscription_id.encoded_size()
44+
+ self.active.encoded_size()
45+
}
46+
47+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
48+
self.correlation_id.encode(writer)?;
49+
self.subscription_id.encode(writer)?;
50+
self.active.encode(writer)?;
51+
Ok(())
52+
}
53+
}
54+
55+
impl Decoder for ConsumerUpdateCommand {
56+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
57+
let (input, correlation_id) = u32::decode(input)?;
58+
let (input, subscription_id) = u8::decode(input)?;
59+
let (input, active) = u8::decode(input)?;
60+
61+
Ok((
62+
input,
63+
ConsumerUpdateCommand {
64+
correlation_id,
65+
subscription_id,
66+
active,
67+
},
68+
))
69+
}
70+
}
71+
72+
impl Command for ConsumerUpdateCommand {
73+
fn key(&self) -> u16 {
74+
COMMAND_CONSUMER_UPDATE
75+
}
76+
}
77+
78+
#[cfg(test)]
79+
mod tests {
80+
use crate::commands::tests::command_encode_decode_test;
81+
82+
use super::ConsumerUpdateCommand;
83+
84+
#[test]
85+
fn consumer_update_response_test() {
86+
command_encode_decode_test::<ConsumerUpdateCommand>();
87+
}
88+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST,
10+
};
11+
12+
use crate::commands::subscribe::OffsetSpecification;
13+
14+
use super::Command;
15+
16+
#[cfg_attr(test, derive(fake::Dummy))]
17+
#[derive(PartialEq, Eq, Debug)]
18+
pub struct ConsumerUpdateRequestCommand {
19+
pub(crate) correlation_id: u32,
20+
response_code: u16,
21+
offset_specification: OffsetSpecification,
22+
}
23+
24+
impl ConsumerUpdateRequestCommand {
25+
pub fn new(
26+
correlation_id: u32,
27+
response_code: u16,
28+
offset_specification: OffsetSpecification,
29+
) -> Self {
30+
Self {
31+
correlation_id,
32+
response_code,
33+
offset_specification,
34+
}
35+
}
36+
}
37+
38+
impl Encoder for ConsumerUpdateRequestCommand {
39+
fn encoded_size(&self) -> u32 {
40+
self.correlation_id.encoded_size()
41+
+ self.response_code.encoded_size()
42+
+ self.offset_specification.encoded_size()
43+
}
44+
45+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
46+
self.correlation_id.encode(writer)?;
47+
self.response_code.encode(writer)?;
48+
self.offset_specification.encode(writer)?;
49+
Ok(())
50+
}
51+
}
52+
53+
impl Decoder for ConsumerUpdateRequestCommand {
54+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
55+
let (input, correlation_id) = u32::decode(input)?;
56+
let (input, response_code) = u16::decode(input)?;
57+
let (input, offset_specification) = OffsetSpecification::decode(input)?;
58+
59+
Ok((
60+
input,
61+
ConsumerUpdateRequestCommand {
62+
correlation_id,
63+
response_code,
64+
offset_specification,
65+
},
66+
))
67+
}
68+
}
69+
70+
impl Command for ConsumerUpdateRequestCommand {
71+
fn key(&self) -> u16 {
72+
COMMAND_CONSUMER_UPDATE_REQUEST
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
mod tests {
78+
use crate::commands::tests::command_encode_decode_test;
79+
80+
use super::ConsumerUpdateRequestCommand;
81+
82+
#[test]
83+
fn consumer_update_request_test() {
84+
command_encode_decode_test::<ConsumerUpdateRequestCommand>();
85+
}
86+
}

0 commit comments

Comments
 (0)