Skip to content

Commit

Permalink
refactoring project and publish/send API
Browse files Browse the repository at this point in the history
  • Loading branch information
jkyberneees committed Oct 20, 2024
1 parent e6c537a commit a4f1787
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 220 deletions.
8 changes: 2 additions & 6 deletions demos/publish_and_reply.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ async def handle_session_started(message):
await client.wait_for('session.started')

# Send a message
wait_for = await client.send('Hello, world!', {
'messageType': 'text-message'
})
wait_for = await client.send('Hello, world!', message_type='text-message')
await wait_for.wait_for_ack()

# Define a message handler
Expand All @@ -59,9 +57,7 @@ def handle_message(message, reply_fn):
# Subscribe to chat.text-message events
client.on('chat.text-message', handle_message)

wait_for = await client.publish('chat', 'Hello out there!', {
'messageType': 'text-message'
})
wait_for = await client.publish('chat', 'Hello out there!', message_type='text-message')
response = await wait_for.wait_for_reply()
print('Reply:', response)

Expand Down
4 changes: 1 addition & 3 deletions demos/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ async def get_url():
# Define a message handler
async def handle_session_started(message):
client.logger.info('Requesting server time...')
waiter = await client.send('', {
'messageType': 'gettime'
})
waiter = await client.send('', message_type='gettime')

response, = await waiter.wait_for_reply(timeout=5)
client.logger.info(f"Server time: {response['data']['time']}")
Expand Down
225 changes: 27 additions & 198 deletions realtime_pubsub_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,129 +15,11 @@
import json # JSON encoding and decoding
import secrets # Generate secure random numbers
import logging # Logging library
from typing import Callable # Type hinting support
from websockets import connect, exceptions


class EventEmitter:
"""
A simple event emitter class that allows registering and emitting events.
Supports wildcard events using '*' and '**' with '.' as the separator.
"""

def __init__(self):
"""
Initialize an `EventEmitter` instance with an empty events dictionary.
"""
self._events = {}

def on(self, event: str, listener: Callable):
"""
Register a listener for a specific event, with support for wildcards.
Args:
event (str): The name of the event, can include wildcards ('*' or '**').
listener (Callable): The function to call when the event is emitted.
"""
if event not in self._events:
self._events[event] = []
self._events[event].append(listener)

def off(self, event: str, listener: Callable):
"""
Remove a listener for a specific event.
Args:
event (str): The name of the event.
listener (Callable): The function to remove from the event listeners.
"""
if event in self._events:
try:
self._events[event].remove(listener)
if not self._events[event]:
del self._events[event]
except ValueError:
# Listener not found in the list
pass

def emit(self, event: str, *args, **kwargs):
"""
Trigger all listeners associated with an event, supporting wildcards.
Args:
event (str): The name of the event to emit.
*args: Positional arguments to pass to the event listeners.
**kwargs: Keyword arguments to pass to the event listeners.
"""
listeners = []
for event_pattern, event_listeners in self._events.items():
if self.event_matches(event_pattern, event):
listeners.extend(event_listeners)
for listener in listeners:
if asyncio.iscoroutinefunction(listener):
asyncio.create_task(listener(*args, **kwargs))
else:
listener(*args, **kwargs)

def once(self, event: str, listener: Callable):
"""
Register a listener for a specific event that will be called at most once.
Args:
event (str): The name of the event.
listener (Callable): The function to call when the event is emitted.
"""

def _once_listener(*args, **kwargs):
listener(*args, **kwargs)
self.off(event, _once_listener)

self.on(event, _once_listener)

@staticmethod
def event_matches(pattern: str, event_name: str) -> bool:
"""
Check if an event pattern matches an event name, supporting wildcards '*' and '**'.
Args:
pattern (str): The event pattern, may include wildcards '*' and '**'.
event_name (str): The event name to match against.

Returns:
bool: True if the pattern matches the event name, False otherwise.
"""

def match_segments(pattern_segments, event_segments):
i = j = 0
while i < len(pattern_segments) and j < len(event_segments):
if pattern_segments[i] == '**':
# '**' matches any number of segments, including zero
if i == len(pattern_segments) - 1:
# '**' at the end matches all remaining segments
return True
else:
# Try to match remaining pattern with any position in event_segments
for k in range(j, len(event_segments) + 1):
if match_segments(pattern_segments[i + 1:], event_segments[k:]):
return True
return False
elif pattern_segments[i] == '*':
# '*' matches exactly one segment
i += 1
j += 1
elif pattern_segments[i] == event_segments[j]:
# Exact match
i += 1
j += 1
else:
return False
while i < len(pattern_segments) and pattern_segments[i] == '**':
i += 1
return i == len(pattern_segments) and j == len(event_segments)
from websockets import connect, exceptions

pattern_segments = pattern.split('.')
event_segments = event_name.split('.')
return match_segments(pattern_segments, event_segments)
from realtime_pubsub_client.event_emitter import EventEmitter
from realtime_pubsub_client.wait_for import WaitFor


def reply(client, message):
Expand All @@ -158,14 +40,14 @@ def reply(client, message):
ValueError: If the connection ID is not available in the incoming message.
"""

def reply_function(data, status='ok', options=None):
def reply_function(data, status='ok', compress=False):
"""
Sends a reply message back to the sender of the original message.
Args:
data: The payload data to send in the reply.
status (str, optional): The status of the reply. Defaults to 'ok'.
options (dict, optional): Additional message options. Defaults to None.
compress (bool, optional): Whether to compress the reply payload. Defaults to False.
Returns:
WaitFor: An instance to wait for acknowledgments or replies.
Expand All @@ -181,69 +63,13 @@ def reply_function(data, status='ok', options=None):
'data': data,
'status': status,
'id': message['data'].get('id'),
},
{
'messageType': 'response',
'compress': options.get('compress', False) if options else False,
},
))
}, message_type='response', compress=compress, ))
else:
raise ValueError('Connection ID is not available in the message')

return reply_function


class WaitFor:
"""
Class representing a factory for waiting on acknowledgments or replies.
The `WaitFor` class provides methods to wait for acknowledgments from the Messaging Gateway
or replies from other subscribers or backend services. It is used in conjunction with
message publishing and sending methods to ensure reliable communication.
"""

def __init__(self, client, options):
"""
Initialize a new instance of the `WaitFor` class.
Args:
client (RealtimeClient): The `RealtimeClient` instance associated with this factory.
options (dict): The message options used for publishing or sending messages.
"""
self.client = client
self.options = options

async def wait_for_ack(self, timeout=5):
"""
Wait for an acknowledgment event with a timeout.
Args:
timeout (int, optional): The maximum time to wait for the acknowledgment in seconds. Defaults to 5.
Returns:
Any: The acknowledgment data received.
Raises:
TimeoutError: If the acknowledgment is not received within the timeout period.
"""
return await self.client.wait_for(f"ack.{self.options['id']}", timeout)

async def wait_for_reply(self, timeout=5):
"""
Wait for a reply event with a timeout.
Args:
timeout (int, optional): The maximum time to wait for the reply in seconds. Defaults to 5.
Returns:
Any: The reply data received.
Raises:
TimeoutError: If the reply is not received within the timeout period.
"""
return await self.client.wait_for(f"response.{self.options['id']}", timeout)


async def wait(ms):
"""
Wait for a specified duration before proceeding.
Expand Down Expand Up @@ -367,8 +193,7 @@ async def connect(self):
raise ValueError('WebSocket URL is not provided')

try:
max_float = float('inf')
self.ws = await connect(ws_url, max_size=None, ping_interval=None, ping_timeout=None)
self.ws = await connect(ws_url, max_size=None, ping_interval=None, ping_timeout=None)
self.logger.info(f'Connected to WebSocket URL: {ws_url[:80]}...') # Masking the URL for security
asyncio.ensure_future(self._receive_messages())

Expand All @@ -393,7 +218,7 @@ async def disconnect(self):
self.ws = None
self.logger.info('WebSocket connection closed.')

async def publish(self, topic, payload, options=None):
async def publish(self, topic, payload, message_type="broadcast", compress=False, message_id=None):
"""
Publish a message to a specified topic.
Expand All @@ -403,7 +228,9 @@ async def publish(self, topic, payload, options=None):
Args:
topic (str): The topic to publish the message to.
payload (str or dict): The message payload.
options (dict, optional): Optional message options, including `id`, `messageType`, and `compress`.
message_type (str, optional): The type of message being published. Defaults to "broadcast".
compress (bool, optional): Whether to compress the message payload. Defaults to False.
message_id (str, optional): The unique identifier for the message. Defaults to auto-generated value.
Returns:
WaitFor: An instance to wait for acknowledgments or replies.
Expand All @@ -415,26 +242,26 @@ async def publish(self, topic, payload, options=None):
self.logger.error('Attempted to publish without an active WebSocket connection.')
raise Exception('WebSocket connection is not established')

options = options or {}
options['id'] = options.get('id', get_random_id())
if message_id is None:
message_id = get_random_id()

message = json.dumps({
'type': 'publish',
'data': {
'topic': topic,
'messageType': options.get('messageType'),
'compress': options.get('compress', False),
'messageType': message_type,
'compress': bool(compress),
'payload': payload,
'id': options['id'],
'id': message_id,
},
})

self.logger.debug(f'Publishing message to topic {topic}: {payload}')
await self.ws.send(message)

return WaitFor(self, options)
return WaitFor(self, message_id)

async def send(self, payload, options=None):
async def send(self, payload, message_type="broadcast", compress=False, message_id=None):
"""
Send a message directly to the server.
Expand All @@ -443,7 +270,9 @@ async def send(self, payload, options=None):
Args:
payload (str or dict): The message payload.
options (dict, optional): Optional message options, including `id`, `messageType`, and `compress`.
message_type (str, optional): The type of message being sent. Defaults to "broadcast".
compress (bool, optional): Whether to compress the message payload. Defaults to False.
message_id (str, optional): The unique identifier for the message. Defaults to auto-generated value.
Returns:
WaitFor: An instance to wait for acknowledgments or replies.
Expand All @@ -455,23 +284,23 @@ async def send(self, payload, options=None):
self.logger.error('Attempted to send without an active WebSocket connection.')
raise Exception('WebSocket connection is not established')

options = options or {}
options['id'] = options.get('id', get_random_id())
if message_id is None:
message_id = get_random_id()

message = json.dumps({
'type': 'message',
'data': {
'messageType': options.get('messageType'),
'compress': options.get('compress', False),
'messageType': message_type,
'compress': bool(compress),
'payload': payload,
'id': options['id'],
'id': message_id,
},
})

self.logger.debug(f'Sending message: {payload}')
await self.ws.send(message)

return WaitFor(self, options)
return WaitFor(self, message_id)

async def subscribe_remote_topic(self, topic):
"""
Expand Down
Loading

0 comments on commit a4f1787

Please sign in to comment.