Skip to content

Commit 481d0f1

Browse files
reviewing sac example (#258)
* reviewing sac example * updating README
1 parent 30337e5 commit 481d0f1

File tree

7 files changed

+280
-133
lines changed

7 files changed

+280
-133
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide pro
4545
- [Publishing Messages](#publishing-messages)
4646
- [Consuming Messages](#consuming-messages)
4747
- [Super Stream](#super-stream)
48+
- [Single Active Consumer](#single-active-consumer)
4849
- [Filtering](#filtering)
4950
5. [Examples](#examples)
5051
6. [Development](#development)
@@ -171,6 +172,19 @@ See the [Super Stream Producer Example using Routing key mode](./examples/supers
171172

172173
See the [Super Stream Consumer Example](./examples/superstreams/receive_super_stream.rs)
173174

175+
## Single active consumer
176+
177+
The client supports the single-active-consumer feature:
178+
179+
[single-active-consumer feature](https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams)
180+
181+
See the Java doc for further information (Same concepts apply here):
182+
183+
[Single-Active-Consumer Java doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#single-active-consumer)
184+
185+
See the Rust full example here:
186+
187+
[Single-Active-Consumer-Full-Example](/examples/single_active_consumer)
174188

175189
## Filtering
176190

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[workspace]
2+
3+
[package]
4+
name = "single_active_consumer"
5+
version = "0.1.0"
6+
edition = "2021"
7+
8+
[dependencies]
9+
futures = "0.3.31"
10+
tokio = "1.41.1"
11+
rabbitmq-stream-client = { path = "../../" }

examples/single_active_consumer/README.md

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,79 @@
1-
Single active consumer
1+
Super stream example
22
---
33

4-
This is an example to enable single active consumer functionality for superstream:
5-
https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams
6-
https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
4+
[Super Streams Documentation](https://www.rabbitmq.com/streams.html#super-streams) for more details.
5+
[Super Streams blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams)
76

8-
This folder contains a consumer and a super-stream consumer configured to enable it.
9-
You can use the example in the super-stream folder to produce messages for a super-stream.
107

11-
You can then run the single_active_consumer_super_stream.rs in this folder.
12-
Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream.
8+
This example shows how to use the Super Stream feature in RabbitMQ 3.11.0.
139

14-
You can then run another consumer in parallel.
15-
now you'll see that one of the two consumers will consume from 2 streams while the other on one stream.
10+
Then run the producer in one terminal:
1611

17-
If you run another you'll see that every Consumer will read from a single stream.
12+
$ cargo run --release -- --producer
1813

19-
If you then stop one of the Consumer you'll notice that the related stream is now read from on the Consumer which is still running.
14+
15+
And the consumer in another terminal:
16+
17+
$ cargo run --release -- --consumer
18+
19+
You should see the consumer receiving the messages from the producer.
20+
21+
It would be something like:
22+
```bash
23+
$ cargo run -- --producer
24+
Starting SuperStream Producer
25+
Super Stream Producer connected to RabbitMQ
26+
Super Stream Producer sent 0 messages to invoices
27+
Super Stream Producer sent 1 messages to invoices
28+
Super Stream Producer sent 2 messages to invoices
29+
Super Stream Producer sent 3 messages to invoices
30+
```
31+
32+
```bash
33+
$ cargo run --release -- --consumer my_first_consumer
34+
Starting SuperStream Consumer my_first_consumer
35+
Super Stream Consumer connected to RabbitMQ. ConsumerName my_first_consumer
36+
Consumer Name my_first_consumer: Got message: super_stream_message_1 from stream: invoices-1 with offset: 33
37+
Consumer Name my_first_consumer: Got message: super_stream_message_2 from stream: invoices-2 with offset: 34
38+
Consumer Name my_first_consumer: Got message: super_stream_message_3 from stream: invoices-0 with offset: 37
39+
Consumer Name my_first_consumer: Got message: super_stream_message_4 from stream: invoices-0 with offset: 36
40+
Consumer Name my_first_consumer: Got message: super_stream_message_5 from stream: invoices-1 with offset: 39
41+
Consumer Name my_first_consumer: Got message: super_stream_message_6 from stream: invoices-2 with offset: 40
42+
Consumer Name my_first_consumer: Got message: super_stream_message_7 from stream: invoices-0 with offset: 41
43+
Consumer Name my_first_consumer: Got message: super_stream_message_8 from stream: invoices-1 with offset: 42
44+
Consumer Name my_first_consumer: Got message: super_stream_message_9 from stream: invoices-2 with offset: 43
45+
Consumer Name my_first_consumer: Got message: super_stream_message_10 from stream: invoices-1 with offset: 44
46+
```
47+
48+
To see the Single active consumer in action, run another consumer:
49+
50+
$ cargo run --release -- --consumer my_second_consumer
51+
52+
You should see the second consumer receiving the part of the messages from the producer. In thi case only the messages coming from the `invoices-1`.
53+
54+
It should be something like:
55+
```bash
56+
$ cargo run --release -- --consumer my_second_consumer
57+
Starting SuperStream Consumer my_second_consumer
58+
Super Stream Consumer connected to RabbitMQ. ConsumerName my_second_consumer
59+
Consumer Name my_second_consumer: Got message: super_stream_message_64 from stream: invoices-1 with offset: 86
60+
Consumer Name my_second_consumer: Got message: super_stream_message_65 from stream: invoices-1 with offset: 87
61+
Consumer Name my_second_consumer: Got message: super_stream_message_66 from stream: invoices-1 with offset: 88
62+
Consumer Name my_second_consumer: Got message: super_stream_message_67 from stream: invoices-1 with offset: 89
63+
Consumer Name my_second_consumer: Got message: super_stream_message_68 from stream: invoices-1 with offset: 90
64+
Consumer Name my_second_consumer: Got message: super_stream_message_69 from stream: invoices-1 with offset: 90
65+
Consumer Name my_second_consumer: Got message: super_stream_message_70 from stream: invoices-1 with offset: 90
66+
```
67+
and the first consumer should be receiving the rest of the messages:
68+
```bash
69+
Consumer Name my_first_consumer: Got message: super_stream_message_88 from stream: invoices-0 with offset: 92
70+
Consumer Name my_first_consumer: Got message: super_stream_message_87 from stream: invoices-2 with offset: 93
71+
Consumer Name my_first_consumer: Got message: super_stream_message_89 from stream: invoices-2 with offset: 95
72+
Consumer Name my_first_consumer: Got message: super_stream_message_90 from stream: invoices-0 with offset: 97
73+
Consumer Name my_first_consumer: Got message: super_stream_message_91 from stream: invoices-0 with offset: 96
74+
Consumer Name my_first_consumer: Got message: super_stream_message_92 from stream: invoices-2 with offset: 99
75+
Consumer Name my_first_consumer: Got message: super_stream_message_93 from stream: invoices-2 with offset: 101
76+
```
2077

2178

2279

examples/single_active_consumer/single_active_consumer.rs

Lines changed: 0 additions & 97 deletions
This file was deleted.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
mod send_super_stream;
2+
mod single_active_consumer_super_stream;
3+
4+
use std::env;
5+
6+
static SUPER_STREAM: &str = "invoices";
7+
8+
#[tokio::main]
9+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
10+
let mut args = env::args().skip(1);
11+
while let Some(arg) = args.next() {
12+
match &arg[..] {
13+
"-h" | "--help" => help(),
14+
"--consumer" => {
15+
let mut consumer_name = String::from("");
16+
let next = args.next().take();
17+
if next.is_some() {
18+
println!("is some");
19+
consumer_name = next.clone().take().unwrap();
20+
}
21+
println!("Starting SuperStream Consumer {}", consumer_name);
22+
single_active_consumer_super_stream::start_consumer(consumer_name).await?;
23+
}
24+
25+
"--producer" => {
26+
println!("Starting SuperStream Producer");
27+
send_super_stream::start_producer().await?
28+
},
29+
30+
arg if arg.starts_with("-") => {
31+
eprintln!("Unknown argument: {}", arg);
32+
}
33+
34+
_ => {
35+
eprintln!("Unknown argument: {}", arg);
36+
help();
37+
}
38+
}
39+
}
40+
Ok(())
41+
}
42+
43+
fn help() {
44+
println!("--consumer or --producer")
45+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use rabbitmq_stream_client::error::StreamCreateError;
2+
use rabbitmq_stream_client::types::{
3+
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy,
4+
};
5+
use std::convert::TryInto;
6+
use std::sync::atomic::{AtomicU32, Ordering};
7+
use std::sync::Arc;
8+
use tokio::sync::Notify;
9+
use tokio::time;
10+
11+
fn hash_strategy_value_extractor(message: &Message) -> String {
12+
message
13+
.application_properties()
14+
.unwrap()
15+
.get("id")
16+
.unwrap()
17+
.clone()
18+
.try_into()
19+
.unwrap()
20+
}
21+
22+
pub async fn start_producer() -> Result<(), Box<dyn std::error::Error>> {
23+
use rabbitmq_stream_client::Environment;
24+
let environment = Environment::builder().build().await?;
25+
println!("Super Stream Producer connected to RabbitMQ");
26+
let confirmed_messages = Arc::new(AtomicU32::new(0));
27+
let notify_on_send = Arc::new(Notify::new());
28+
let _ = environment
29+
.stream_creator()
30+
.max_length(ByteCapacity::GB(5))
31+
.create_super_stream(crate::SUPER_STREAM, 3, None)
32+
.await;
33+
34+
let create_response = environment
35+
.stream_creator()
36+
.max_length(ByteCapacity::GB(5))
37+
.create_super_stream(crate::SUPER_STREAM, 3, None)
38+
.await;
39+
40+
if let Err(e) = create_response {
41+
if let StreamCreateError::Create { stream, status } = e {
42+
match status {
43+
// we can ignore this error because the stream already exists
44+
ResponseCode::StreamAlreadyExists => {}
45+
err => {
46+
println!(
47+
"[Super Stream Producer] Error creating stream: {:?} {:?}",
48+
stream, err
49+
);
50+
}
51+
}
52+
}
53+
}
54+
55+
let super_stream_producer = environment
56+
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
57+
HashRoutingMurmurStrategy {
58+
routing_extractor: &hash_strategy_value_extractor,
59+
},
60+
))
61+
.client_provided_name("rust stream producer - sac example")
62+
.build(crate::SUPER_STREAM)
63+
.await;
64+
65+
match super_stream_producer {
66+
Ok(mut producer) => {
67+
println!("[Super Stream Producer] Successfully created super stream producer");
68+
let mut idx = 0;
69+
loop {
70+
let counter = confirmed_messages.clone();
71+
let notifier = notify_on_send.clone();
72+
let msg = Message::builder()
73+
.body(format!("super stream message_{}", idx))
74+
.application_properties()
75+
.insert("id", idx.to_string())
76+
.message_builder()
77+
.build();
78+
79+
let send_result = producer
80+
.send(msg, move |_| {
81+
let inner_counter = counter.clone();
82+
let inner_notifier = notifier.clone();
83+
async move {
84+
if inner_counter.fetch_add(1, Ordering::Relaxed) == idx - 1 {
85+
inner_notifier.notify_one();
86+
}
87+
}
88+
})
89+
.await;
90+
91+
match send_result {
92+
Ok(_) => {
93+
idx += 1;
94+
println!(
95+
"[Super Stream Producer] Message {} sent to {}",
96+
idx,
97+
crate::SUPER_STREAM
98+
);
99+
}
100+
Err(err) => {
101+
println!(
102+
"[Super Stream Producer] Failed to send message. error: {}",
103+
err
104+
);
105+
}
106+
}
107+
108+
time::sleep(time::Duration::from_millis(1_000)).await;
109+
}
110+
}
111+
Err(err) => {
112+
println!("Failed to create super stream producer. error {}", err);
113+
Ok(())
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)