diff --git a/asyncio_paho/client.py b/asyncio_paho/client.py index a8147eb..41a66ec 100644 --- a/asyncio_paho/client.py +++ b/asyncio_paho/client.py @@ -270,6 +270,37 @@ async def on_subscribe(*args: Any) -> None: finally: unsubscribe() + async def asyncio_unsubscribe( + self, + topic: str | Tuple[str], + properties: paho.Properties | None = None, + ) -> Optional[Tuple[int, int]]: + """Unsubscribe the client from one or more topics.""" + unsubscribed_future = self._event_loop.create_future() + result: tuple[int, int] + + async def on_unsubscribe(*args: Any) -> None: + # pylint: disable=unused-argument + nonlocal result + print("on_unsubscribe", args, result) + if result[1] == args[2]: + nonlocal unsubscribed_future + unsubscribed_future.set_result(None) + + unsubscribe = self.asyncio_listeners.add_on_unsubscribe( + on_unsubscribe, is_high_pri=True + ) + try: + result = super().unsubscribe(topic, properties) + + if result[0] == paho.MQTT_ERR_NO_CONN: + return result + + await unsubscribed_future + return result + finally: + unsubscribe() + def user_data_set(self, userdata: Any) -> None: """Set the user data variable passed to callbacks. May be any data type.""" self._userdata = userdata @@ -415,6 +446,7 @@ class _EventType(Enum): ON_CONNECT_FAILED = auto() ON_MESSAGE = auto() ON_SUBSCRIBE = auto() + ON_UNSUBSCRIBE = auto() ON_PUBLISH = auto() @@ -549,6 +581,22 @@ def forwarder(*args: Any) -> None: paho.Client.on_subscribe.fset(self._client, forwarder) return self._add_async_listener(_EventType.ON_SUBSCRIBE, callback, is_high_pri) + + def add_on_unsubscribe( + self, + callback: Callable[[paho.Client, Any, int, tuple[int, ...]], Awaitable[None]] + | Callable[ + [paho.Client, Any, int, list[int], paho.Properties], Awaitable[None] + ], + is_high_pri: bool = False, + ) -> Callable[[], None]: + """Add on_unsubscribe async listener.""" + + def forwarder(*args: Any) -> None: + self._async_forwarder(_EventType.ON_UNSUBSCRIBE, *args) + + paho.Client.on_unsubscribe.fset(self._client, forwarder) + return self._add_async_listener(_EventType.ON_UNSUBSCRIBE, callback, is_high_pri) def add_on_publish( self,