Skip to content

Commit f2240db

Browse files
authored
Merge pull request containerd#272 from teawater/exp_server_send
examples: Add example for server_send_stream
2 parents 8610b15 + 28fa4d3 commit f2240db

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

example/async-stream-client.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ async fn main() {
4444
let sc1 = sc.clone();
4545
let t6 = tokio::spawn(echo_null_stream(sc1));
4646

47-
let t7 = tokio::spawn(echo_default_value(sc));
47+
let sc1 = sc.clone();
48+
let t7 = tokio::spawn(echo_default_value(sc1));
49+
50+
let t8 = tokio::spawn(server_send_stream(sc));
4851

49-
let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7);
52+
let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7, t8);
5053
}
5154

5255
fn default_ctx() -> Context {
@@ -201,3 +204,18 @@ async fn echo_default_value(cli: streaming_ttrpc::StreamingClient) {
201204
assert_eq!(received.seq, 0);
202205
assert_eq!(received.msg, "");
203206
}
207+
208+
#[cfg(unix)]
209+
async fn server_send_stream(cli: streaming_ttrpc::StreamingClient) {
210+
let mut stream = cli
211+
.server_send_stream(default_ctx(), &Default::default())
212+
.await
213+
.unwrap();
214+
215+
let mut seq = 0;
216+
while let Some(received) = stream.recv().await.unwrap() {
217+
assert_eq!(received.seq, seq);
218+
assert_eq!(received.msg, "hello");
219+
seq += 1;
220+
}
221+
}

example/async-stream-server.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,25 @@ impl streaming_ttrpc::Streaming for StreamingService {
152152

153153
Ok(())
154154
}
155+
156+
async fn server_send_stream(
157+
&self,
158+
_ctx: &::ttrpc::r#async::TtrpcContext,
159+
_: empty::Empty,
160+
s: ::ttrpc::r#async::ServerStreamSender<streaming::EchoPayload>,
161+
) -> ::ttrpc::Result<()> {
162+
let mut seq = 0;
163+
while seq < 10 {
164+
sleep(std::time::Duration::from_millis(100)).await;
165+
let mut e = streaming::EchoPayload::new();
166+
e.seq = seq;
167+
e.msg = format!("hello");
168+
s.send(&e).await.unwrap();
169+
seq += 1;
170+
}
171+
172+
Ok(())
173+
}
155174
}
156175

157176
#[cfg(windows)]

example/protocols/protos/streaming.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ service Streaming {
3333
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
3434
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
3535
rpc EchoDefaultValue(EchoPayload) returns (stream EchoPayload);
36+
rpc ServerSendStream(google.protobuf.Empty) returns (stream EchoPayload);
3637
}
3738

3839
message EchoPayload {

0 commit comments

Comments
 (0)