Skip to content

Commit e69af99

Browse files
committed
feat: Add ackable events
1 parent 85f14d8 commit e69af99

File tree

7 files changed

+344
-49
lines changed

7 files changed

+344
-49
lines changed

socketio/src/asynchronous/client/builder.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,51 @@ impl ClientBuilder {
196196
self
197197
}
198198

199+
/// Registers a new callback for a certain [`crate::event::Event`] that expects the client to
200+
/// ack. The event could either be one of the common events like `message`, `error`, `open`,
201+
/// `close` or a custom event defined by a string, e.g. `onPayment` or `foo`.
202+
///
203+
/// # Example
204+
/// ```rust
205+
/// use rust_socketio::{asynchronous::{ClientBuilder, Client}, i32, Payload};
206+
/// use futures_util::FutureExt;
207+
///
208+
/// #[tokio::main]
209+
/// async fn main() {
210+
/// let socket = ClientBuilder::new("http://localhost:4200/")
211+
/// .namespace("/admin")
212+
/// .on_with_ack("test", |payload: Payload, client: Client, ack: i32| {
213+
/// async move {
214+
/// match payload {
215+
/// Payload::Text(values) => println!("Received: {:#?}", values),
216+
/// Payload::Binary(bin_data) => println!("Received bytes: {:#?}", bin_data),
217+
/// // This is deprecated, use Payload::Text instead
218+
/// Payload::String(str) => println!("Received: {}", str),
219+
/// }
220+
/// client.ack(ack, "received").await;
221+
/// }
222+
/// .boxed()
223+
/// })
224+
/// .on("error", |err, _| async move { eprintln!("Error: {:#?}", err) }.boxed())
225+
/// .connect()
226+
/// .await;
227+
/// }
228+
///
229+
#[cfg(feature = "async-callbacks")]
230+
pub fn on_with_ack<T: Into<Event>, F>(mut self, event: T, callback: F) -> Self
231+
where
232+
F: for<'a> std::ops::FnMut(Payload, Client, i32) -> BoxFuture<'static, ()>
233+
+ 'static
234+
+ Send
235+
+ Sync,
236+
{
237+
self.on.insert(
238+
event.into(),
239+
Callback::<DynAsyncCallback>::new_with_ack(callback),
240+
);
241+
self
242+
}
243+
199244
/// Registers a callback for reconnect events. The event handler must return
200245
/// a [ReconnectSettings] struct with the settings that should be updated.
201246
///
@@ -262,6 +307,41 @@ impl ClientBuilder {
262307
self
263308
}
264309

310+
/// Registers a Callback for all [`crate::event::Event::Custom`] and
311+
/// [`crate::event::Event::Message`] that expect the client to ack.
312+
///
313+
/// # Example
314+
/// ```rust
315+
/// use rust_socketio::{asynchronous::ClientBuilder, Payload};
316+
/// use futures_util::future::FutureExt;
317+
///
318+
/// #[tokio::main]
319+
/// async fn main() {
320+
/// let client = ClientBuilder::new("http://localhost:4200/")
321+
/// .namespace("/admin")
322+
/// .on_any_with_ack(|event, payload, client, ack| {
323+
/// async {
324+
/// if let Payload::String(str) = payload {
325+
/// println!("{}: {}", String::from(event), str);
326+
/// }
327+
/// client.ack(ack, "received").await;
328+
/// }.boxed()
329+
/// })
330+
/// .connect()
331+
/// .await;
332+
/// }
333+
/// ```
334+
pub fn on_any_with_ack<F>(mut self, callback: F) -> Self
335+
where
336+
F: for<'a> FnMut(Event, Payload, Client, i32) -> BoxFuture<'static, ()>
337+
+ 'static
338+
+ Send
339+
+ Sync,
340+
{
341+
self.on_any = Some(Callback::<DynAsyncAnyCallback>::new_with_ack(callback));
342+
self
343+
}
344+
265345
/// Uses a preconfigured TLS connector for secure communication. This configures
266346
/// both the `polling` as well as the `websocket` transport type.
267347
/// # Example

socketio/src/asynchronous/client/callback.rs

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use futures_util::future::BoxFuture;
1+
use futures_util::{future::BoxFuture, FutureExt};
22
use std::{
33
fmt::Debug,
4+
future::Future,
45
ops::{Deref, DerefMut},
56
};
67

@@ -9,11 +10,18 @@ use crate::{Event, Payload};
910
use super::client::{Client, ReconnectSettings};
1011

1112
/// Internal type, provides a way to store futures and return them in a boxed manner.
12-
pub(crate) type DynAsyncCallback =
13-
Box<dyn for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
13+
pub(crate) type DynAsyncCallback = Box<
14+
dyn for<'a> FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
15+
+ 'static
16+
+ Send
17+
+ Sync,
18+
>;
1419

1520
pub(crate) type DynAsyncAnyCallback = Box<
16-
dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync,
21+
dyn for<'a> FnMut(Event, Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
22+
+ 'static
23+
+ Send
24+
+ Sync,
1725
>;
1826

1927
pub(crate) type DynAsyncReconnectSettingsCallback =
@@ -30,8 +38,10 @@ impl<T> Debug for Callback<T> {
3038
}
3139

3240
impl Deref for Callback<DynAsyncCallback> {
33-
type Target =
34-
dyn for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send;
41+
type Target = dyn for<'a> FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
42+
+ 'static
43+
+ Sync
44+
+ Send;
3545

3646
fn deref(&self) -> &Self::Target {
3747
self.inner.as_ref()
@@ -45,19 +55,34 @@ impl DerefMut for Callback<DynAsyncCallback> {
4555
}
4656

4757
impl Callback<DynAsyncCallback> {
48-
pub(crate) fn new<T>(callback: T) -> Self
58+
pub(crate) fn new_with_ack<T>(mut callback: T) -> Self
4959
where
50-
T: for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send,
60+
T: for<'a> FnMut(Payload, Client, i32) -> BoxFuture<'static, ()> + 'static + Sync + Send,
5161
{
5262
Callback {
53-
inner: Box::new(callback),
63+
inner: Box::new(move |p, c, a| match a {
64+
Some(a) => callback(p, c, a).boxed(),
65+
None => std::future::ready(()).boxed(),
66+
}),
67+
}
68+
}
69+
70+
pub(crate) fn new<T, Fut>(mut callback: T) -> Self
71+
where
72+
T: FnMut(Payload, Client) -> Fut + Sync + Send + 'static,
73+
Fut: Future<Output = ()> + 'static + Send,
74+
{
75+
Callback {
76+
inner: Box::new(move |p, c, _a| callback(p, c).boxed()),
5477
}
5578
}
5679
}
5780

5881
impl Deref for Callback<DynAsyncAnyCallback> {
59-
type Target =
60-
dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send;
82+
type Target = dyn for<'a> FnMut(Event, Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
83+
+ 'static
84+
+ Sync
85+
+ Send;
6186

6287
fn deref(&self) -> &Self::Target {
6388
self.inner.as_ref()
@@ -71,12 +96,28 @@ impl DerefMut for Callback<DynAsyncAnyCallback> {
7196
}
7297

7398
impl Callback<DynAsyncAnyCallback> {
74-
pub(crate) fn new<T>(callback: T) -> Self
99+
pub(crate) fn new_with_ack<T>(mut callback: T) -> Self
75100
where
76-
T: for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send,
101+
T: for<'a> FnMut(Event, Payload, Client, i32) -> BoxFuture<'static, ()>
102+
+ 'static
103+
+ Sync
104+
+ Send,
77105
{
78106
Callback {
79-
inner: Box::new(callback),
107+
inner: Box::new(move |e, p, c, a| match a {
108+
Some(a) => callback(e, p, c, a).boxed(),
109+
None => std::future::ready(()).boxed(),
110+
}),
111+
}
112+
}
113+
114+
pub(crate) fn new<T, Fut>(mut callback: T) -> Self
115+
where
116+
T: FnMut(Event, Payload, Client) -> Fut + Sync + Send + 'static,
117+
Fut: Future<Output = ()> + 'static + Send,
118+
{
119+
Callback {
120+
inner: Box::new(move |e, p, c, _a| callback(e, p, c).boxed()),
80121
}
81122
}
82123
}

socketio/src/asynchronous/client/client.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -376,19 +376,33 @@ impl Client {
376376
self.socket.read().await.send(socket_packet).await
377377
}
378378

379-
async fn callback<P: Into<Payload>>(&self, event: &Event, payload: P) -> Result<()> {
379+
pub async fn ack<D>(&self, ack_id: i32, data: D) -> Result<()>
380+
where
381+
D: Into<Payload>,
382+
{
383+
let socket_packet = Packet::new_ack(data.into(), &self.nsp, ack_id);
384+
385+
self.socket.read().await.send(socket_packet).await
386+
}
387+
388+
async fn callback<P: Into<Payload>>(
389+
&self,
390+
event: &Event,
391+
payload: P,
392+
ack_id: Option<i32>,
393+
) -> Result<()> {
380394
let mut builder = self.builder.write().await;
381395
let payload = payload.into();
382396

383397
if let Some(callback) = builder.on.get_mut(event) {
384-
callback(payload.clone(), self.clone()).await;
398+
callback(payload.clone(), self.clone(), ack_id).await;
385399
}
386400

387401
// Call on_any for all common and custom events.
388402
match event {
389403
Event::Message | Event::Custom(_) => {
390404
if let Some(callback) = builder.on_any.as_mut() {
391-
callback(event.clone(), payload, self.clone()).await;
405+
callback(event.clone(), payload, self.clone(), ack_id).await;
392406
}
393407
}
394408
_ => (),
@@ -411,6 +425,7 @@ impl Client {
411425
ack.callback.deref_mut()(
412426
Payload::from(payload.to_owned()),
413427
self.clone(),
428+
None,
414429
)
415430
.await;
416431
}
@@ -419,6 +434,7 @@ impl Client {
419434
ack.callback.deref_mut()(
420435
Payload::Binary(payload.to_owned()),
421436
self.clone(),
437+
None,
422438
)
423439
.await;
424440
}
@@ -446,8 +462,12 @@ impl Client {
446462

447463
if let Some(attachments) = &packet.attachments {
448464
if let Some(binary_payload) = attachments.get(0) {
449-
self.callback(&event, Payload::Binary(binary_payload.to_owned()))
450-
.await?;
465+
self.callback(
466+
&event,
467+
Payload::Binary(binary_payload.to_owned()),
468+
packet.id,
469+
)
470+
.await?;
451471
}
452472
}
453473
Ok(())
@@ -480,7 +500,7 @@ impl Client {
480500
};
481501

482502
// call the correct callback
483-
self.callback(&event, payloads.to_vec()).await?;
503+
self.callback(&event, payloads.to_vec(), packet.id).await?;
484504
}
485505

486506
Ok(())
@@ -495,22 +515,22 @@ impl Client {
495515
match packet.packet_type {
496516
PacketId::Ack | PacketId::BinaryAck => {
497517
if let Err(err) = self.handle_ack(packet).await {
498-
self.callback(&Event::Error, err.to_string()).await?;
518+
self.callback(&Event::Error, err.to_string(), None).await?;
499519
return Err(err);
500520
}
501521
}
502522
PacketId::BinaryEvent => {
503523
if let Err(err) = self.handle_binary_event(packet).await {
504-
self.callback(&Event::Error, err.to_string()).await?;
524+
self.callback(&Event::Error, err.to_string(), None).await?;
505525
}
506526
}
507527
PacketId::Connect => {
508528
*(self.disconnect_reason.write().await) = DisconnectReason::default();
509-
self.callback(&Event::Connect, "").await?;
529+
self.callback(&Event::Connect, "", None).await?;
510530
}
511531
PacketId::Disconnect => {
512532
*(self.disconnect_reason.write().await) = DisconnectReason::Server;
513-
self.callback(&Event::Close, "").await?;
533+
self.callback(&Event::Close, "", None).await?;
514534
}
515535
PacketId::ConnectError => {
516536
self.callback(
@@ -520,12 +540,13 @@ impl Client {
520540
.data
521541
.as_ref()
522542
.unwrap_or(&String::from("\"No error message provided\"")),
543+
None,
523544
)
524545
.await?;
525546
}
526547
PacketId::Event => {
527548
if let Err(err) = self.handle_event(packet).await {
528-
self.callback(&Event::Error, err.to_string()).await?;
549+
self.callback(&Event::Error, err.to_string(), None).await?;
529550
}
530551
}
531552
}
@@ -547,7 +568,7 @@ impl Client {
547568
None => None,
548569
Some(Err(err)) => {
549570
// call the error callback
550-
match self.callback(&Event::Error, err.to_string()).await {
571+
match self.callback(&Event::Error, err.to_string(), None).await {
551572
Err(callback_err) => Some((Err(callback_err), socket)),
552573
Ok(_) => Some((Err(err), socket)),
553574
}

0 commit comments

Comments
 (0)