Skip to content

Commit dc6e4f5

Browse files
Message queue optimizations (Fixes #1240)
1 parent c419fc5 commit dc6e4f5

File tree

4 files changed

+204
-96
lines changed

4 files changed

+204
-96
lines changed

src/socketio/asyncio_pubsub_manager.py

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,12 @@ async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
6464
callback = (room, namespace, id)
6565
else:
6666
callback = None
67-
await self._publish({'method': 'emit', 'event': event, 'data': data,
68-
'namespace': namespace, 'room': room,
69-
'skip_sid': skip_sid, 'callback': callback,
70-
'host_id': self.host_id})
67+
message = {'method': 'emit', 'event': event, 'data': data,
68+
'namespace': namespace, 'room': room,
69+
'skip_sid': skip_sid, 'callback': callback,
70+
'host_id': self.host_id}
71+
await self._handle_emit(message) # handle in this host
72+
await self._publish(message) # notify other hosts
7173

7274
async def can_disconnect(self, sid, namespace):
7375
if self.is_connected(sid, namespace):
@@ -76,18 +78,23 @@ async def can_disconnect(self, sid, namespace):
7678
else:
7779
# client is in another server, so we post request to the queue
7880
await self._publish({'method': 'disconnect', 'sid': sid,
79-
'namespace': namespace or '/'})
81+
'namespace': namespace or '/',
82+
'host_id': self.host_id})
8083

8184
async def disconnect(self, sid, namespace, **kwargs):
8285
if kwargs.get('ignore_queue'):
8386
return await super(AsyncPubSubManager, self).disconnect(
8487
sid, namespace=namespace)
85-
await self._publish({'method': 'disconnect', 'sid': sid,
86-
'namespace': namespace or '/'})
88+
message = {'method': 'disconnect', 'sid': sid,
89+
'namespace': namespace or '/', 'host_id': self.host_id}
90+
await self._handle_disconnect(message) # handle in this host
91+
await self._publish(message) # notify other hosts
8792

8893
async def close_room(self, room, namespace=None):
89-
await self._publish({'method': 'close_room', 'room': room,
90-
'namespace': namespace or '/'})
94+
message = {'method': 'close_room', 'room': room,
95+
'namespace': namespace or '/', 'host_id': self.host_id}
96+
await self._handle_close_room(message) # handle in this host
97+
await self._publish(message) # notify other hosts
9198

9299
async def _publish(self, data):
93100
"""Publish a message on the Socket.IO channel.
@@ -139,18 +146,21 @@ async def _return_callback(self, host_id, sid, namespace, callback_id,
139146
*args):
140147
# When an event callback is received, the callback is returned back
141148
# the sender, which is identified by the host_id
142-
await self._publish({'method': 'callback', 'host_id': host_id,
143-
'sid': sid, 'namespace': namespace,
144-
'id': callback_id, 'args': args})
149+
if host_id == self.host_id:
150+
await self.trigger_callback(sid, callback_id, args)
151+
else:
152+
await self._publish({'method': 'callback', 'host_id': host_id,
153+
'sid': sid, 'namespace': namespace,
154+
'id': callback_id, 'args': args})
145155

146156
async def _handle_disconnect(self, message):
147157
await self.server.disconnect(sid=message.get('sid'),
148158
namespace=message.get('namespace'),
149159
ignore_queue=True)
150160

151161
async def _handle_close_room(self, message):
152-
await super().close_room(
153-
room=message.get('room'), namespace=message.get('namespace'))
162+
await super().close_room(room=message.get('room'),
163+
namespace=message.get('namespace'))
154164

155165
async def _thread(self):
156166
while True:
@@ -171,17 +181,18 @@ async def _thread(self):
171181
except:
172182
pass
173183
if data and 'method' in data:
174-
self._get_logger().info('pubsub message: {}'.format(
184+
self._get_logger().debug('pubsub message: {}'.format(
175185
data['method']))
176186
try:
177-
if data['method'] == 'emit':
178-
await self._handle_emit(data)
179-
elif data['method'] == 'callback':
187+
if data['method'] == 'callback':
180188
await self._handle_callback(data)
181-
elif data['method'] == 'disconnect':
182-
await self._handle_disconnect(data)
183-
elif data['method'] == 'close_room':
184-
await self._handle_close_room(data)
189+
elif data.get('host_id') != self.host_id:
190+
if data['method'] == 'emit':
191+
await self._handle_emit(data)
192+
elif data['method'] == 'disconnect':
193+
await self._handle_disconnect(data)
194+
elif data['method'] == 'close_room':
195+
await self._handle_close_room(data)
185196
except asyncio.CancelledError:
186197
raise # let the outer try/except handle it
187198
except:

src/socketio/pubsub_manager.py

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,30 +61,38 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
6161
callback = (room, namespace, id)
6262
else:
6363
callback = None
64-
self._publish({'method': 'emit', 'event': event, 'data': data,
65-
'namespace': namespace, 'room': room,
66-
'skip_sid': skip_sid, 'callback': callback,
67-
'host_id': self.host_id})
64+
message = {'method': 'emit', 'event': event, 'data': data,
65+
'namespace': namespace, 'room': room,
66+
'skip_sid': skip_sid, 'callback': callback,
67+
'host_id': self.host_id}
68+
self._handle_emit(message) # handle in this host
69+
self._publish(message) # notify other hosts
6870

6971
def can_disconnect(self, sid, namespace):
7072
if self.is_connected(sid, namespace):
7173
# client is in this server, so we can disconnect directly
7274
return super().can_disconnect(sid, namespace)
7375
else:
7476
# client is in another server, so we post request to the queue
75-
self._publish({'method': 'disconnect', 'sid': sid,
76-
'namespace': namespace or '/'})
77+
message = {'method': 'disconnect', 'sid': sid,
78+
'namespace': namespace or '/', 'host_id': self.host_id}
79+
self._handle_disconnect(message) # handle in this host
80+
self._publish(message) # notify other hosts
7781

7882
def disconnect(self, sid, namespace=None, **kwargs):
7983
if kwargs.get('ignore_queue'):
8084
return super(PubSubManager, self).disconnect(
8185
sid, namespace=namespace)
82-
self._publish({'method': 'disconnect', 'sid': sid,
83-
'namespace': namespace or '/'})
86+
message = {'method': 'disconnect', 'sid': sid,
87+
'namespace': namespace or '/', 'host_id': self.host_id}
88+
self._handle_disconnect(message) # handle in this host
89+
self._publish(message) # notify other hosts
8490

8591
def close_room(self, room, namespace=None):
86-
self._publish({'method': 'close_room', 'room': room,
87-
'namespace': namespace or '/'})
92+
message = {'method': 'close_room', 'room': room,
93+
'namespace': namespace or '/', 'host_id': self.host_id}
94+
self._handle_close_room(message) # handle in this host
95+
self._publish(message) # notify other hosts
8896

8997
def _publish(self, data):
9098
"""Publish a message on the Socket.IO channel.
@@ -116,11 +124,10 @@ def _handle_emit(self, message):
116124
*remote_callback)
117125
else:
118126
callback = None
119-
super(PubSubManager, self).emit(message['event'], message['data'],
120-
namespace=message.get('namespace'),
121-
room=message.get('room'),
122-
skip_sid=message.get('skip_sid'),
123-
callback=callback)
127+
super().emit(message['event'], message['data'],
128+
namespace=message.get('namespace'),
129+
room=message.get('room'),
130+
skip_sid=message.get('skip_sid'), callback=callback)
124131

125132
def _handle_callback(self, message):
126133
if self.host_id == message.get('host_id'):
@@ -135,18 +142,21 @@ def _handle_callback(self, message):
135142
def _return_callback(self, host_id, sid, namespace, callback_id, *args):
136143
# When an event callback is received, the callback is returned back
137144
# to the sender, which is identified by the host_id
138-
self._publish({'method': 'callback', 'host_id': host_id,
139-
'sid': sid, 'namespace': namespace, 'id': callback_id,
140-
'args': args})
145+
if host_id == self.host_id:
146+
self.trigger_callback(sid, callback_id, args)
147+
else:
148+
self._publish({'method': 'callback', 'host_id': host_id,
149+
'sid': sid, 'namespace': namespace,
150+
'id': callback_id, 'args': args})
141151

142152
def _handle_disconnect(self, message):
143153
self.server.disconnect(sid=message.get('sid'),
144154
namespace=message.get('namespace'),
145155
ignore_queue=True)
146156

147157
def _handle_close_room(self, message):
148-
super(PubSubManager, self).close_room(
149-
room=message.get('room'), namespace=message.get('namespace'))
158+
super().close_room(room=message.get('room'),
159+
namespace=message.get('namespace'))
150160

151161
def _thread(self):
152162
for message in self._listen():
@@ -165,17 +175,18 @@ def _thread(self):
165175
except:
166176
pass
167177
if data and 'method' in data:
168-
self._get_logger().info('pubsub message: {}'.format(
178+
self._get_logger().debug('pubsub message: {}'.format(
169179
data['method']))
170180
try:
171-
if data['method'] == 'emit':
172-
self._handle_emit(data)
173-
elif data['method'] == 'callback':
181+
if data['method'] == 'callback':
174182
self._handle_callback(data)
175-
elif data['method'] == 'disconnect':
176-
self._handle_disconnect(data)
177-
elif data['method'] == 'close_room':
178-
self._handle_close_room(data)
183+
elif data.get('host_id') != self.host_id:
184+
if data['method'] == 'emit':
185+
self._handle_emit(data)
186+
elif data['method'] == 'disconnect':
187+
self._handle_disconnect(data)
188+
elif data['method'] == 'close_room':
189+
self._handle_close_room(data)
179190
except:
180191
self.server.logger.exception(
181192
'Unknown error in pubsub listening thread')

tests/asyncio/test_asyncio_pubsub_manager.py

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,15 @@ def test_can_disconnect(self):
163163
assert _run(self.pm.can_disconnect(sid, '/')) is True
164164
_run(self.pm.can_disconnect(sid, '/foo'))
165165
self.pm._publish.mock.assert_called_once_with(
166-
{'method': 'disconnect', 'sid': sid, 'namespace': '/foo'}
166+
{'method': 'disconnect', 'sid': sid, 'namespace': '/foo',
167+
'host_id': '123456'}
167168
)
168169

169170
def test_disconnect(self):
170171
_run(self.pm.disconnect('foo', '/'))
171172
self.pm._publish.mock.assert_called_once_with(
172-
{'method': 'disconnect', 'sid': 'foo', 'namespace': '/'}
173+
{'method': 'disconnect', 'sid': 'foo', 'namespace': '/',
174+
'host_id': '123456'}
173175
)
174176

175177
def test_disconnect_ignore_queue(self):
@@ -182,13 +184,15 @@ def test_disconnect_ignore_queue(self):
182184
def test_close_room(self):
183185
_run(self.pm.close_room('foo'))
184186
self.pm._publish.mock.assert_called_once_with(
185-
{'method': 'close_room', 'room': 'foo', 'namespace': '/'}
187+
{'method': 'close_room', 'room': 'foo', 'namespace': '/',
188+
'host_id': '123456'}
186189
)
187190

188191
def test_close_room_with_namespace(self):
189192
_run(self.pm.close_room('foo', '/bar'))
190193
self.pm._publish.mock.assert_called_once_with(
191-
{'method': 'close_room', 'room': 'foo', 'namespace': '/bar'}
194+
{'method': 'close_room', 'room': 'foo', 'namespace': '/bar',
195+
'host_id': '123456'}
192196
)
193197

194198
def test_handle_emit(self):
@@ -263,8 +267,7 @@ def test_handle_emit_with_skip_sid(self):
263267
callback=None,
264268
)
265269

266-
def test_handle_emit_with_callback(self):
267-
host_id = self.pm.host_id
270+
def test_handle_emit_with_remote_callback(self):
268271
with mock.patch.object(
269272
asyncio_manager.AsyncManager, 'emit', new=AsyncMock()
270273
) as super_emit:
@@ -275,7 +278,7 @@ def test_handle_emit_with_callback(self):
275278
'data': 'bar',
276279
'namespace': '/baz',
277280
'callback': ('sid', '/baz', 123),
278-
'host_id': '123456',
281+
'host_id': 'x',
279282
}
280283
)
281284
)
@@ -291,14 +294,40 @@ def test_handle_emit_with_callback(self):
291294
self.pm._publish.mock.assert_called_once_with(
292295
{
293296
'method': 'callback',
294-
'host_id': host_id,
297+
'host_id': 'x',
295298
'sid': 'sid',
296299
'namespace': '/baz',
297300
'id': 123,
298301
'args': ('one', 2, 'three'),
299302
}
300303
)
301304

305+
def test_handle_emit_with_local_callback(self):
306+
with mock.patch.object(
307+
asyncio_manager.AsyncManager, 'emit', new=AsyncMock()
308+
) as super_emit:
309+
_run(
310+
self.pm._handle_emit(
311+
{
312+
'event': 'foo',
313+
'data': 'bar',
314+
'namespace': '/baz',
315+
'callback': ('sid', '/baz', 123),
316+
'host_id': self.pm.host_id,
317+
}
318+
)
319+
)
320+
assert super_emit.mock.call_count == 1
321+
assert super_emit.mock.call_args[0] == (self.pm, 'foo', 'bar')
322+
assert super_emit.mock.call_args[1]['namespace'] == '/baz'
323+
assert super_emit.mock.call_args[1]['room'] is None
324+
assert super_emit.mock.call_args[1]['skip_sid'] is None
325+
assert isinstance(
326+
super_emit.mock.call_args[1]['callback'], functools.partial
327+
)
328+
_run(super_emit.mock.call_args[1]['callback']('one', 2, 'three'))
329+
self.pm._publish.mock.assert_not_called()
330+
302331
def test_handle_callback(self):
303332
host_id = self.pm.host_id
304333
with mock.patch.object(
@@ -419,50 +448,66 @@ def test_background_thread(self):
419448
self.pm._handle_callback = AsyncMock()
420449
self.pm._handle_disconnect = AsyncMock()
421450
self.pm._handle_close_room = AsyncMock()
451+
host_id = self.pm.host_id
422452

423453
async def messages():
424454
import pickle
425455

426-
yield {'method': 'emit', 'value': 'foo'}
427-
yield {'missing': 'method'}
428-
yield '{"method": "callback", "value": "bar"}'
429-
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
430-
yield {'method': 'bogus'}
431-
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
456+
yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
457+
yield {'missing': 'method', 'host_id': 'x'}
458+
yield '{"method": "callback", "value": "bar", "host_id": "x"}'
459+
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
460+
'host_id': 'x'}
461+
yield {'method': 'bogus', 'host_id': 'x'}
462+
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
463+
'host_id': 'x'})
432464
yield 'bad json'
433465
yield b'bad pickled'
466+
467+
# these should not publish anything on the queue, as they come from
468+
# the same host
469+
yield {'method': 'emit', 'value': 'foo', 'host_id': host_id}
470+
yield {'method': 'callback', 'value': 'bar', 'host_id': host_id}
471+
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
472+
'host_id': host_id}
473+
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
474+
'host_id': host_id})
434475
raise asyncio.CancelledError() # force the thread to exit
435476

436477
self.pm._listen = messages
437478
_run(self.pm._thread())
438479

439480
self.pm._handle_emit.mock.assert_called_once_with(
440-
{'method': 'emit', 'value': 'foo'}
481+
{'method': 'emit', 'value': 'foo', 'host_id': 'x'}
482+
)
483+
self.pm._handle_callback.mock.assert_any_call(
484+
{'method': 'callback', 'value': 'bar', 'host_id': 'x'}
441485
)
442-
self.pm._handle_callback.mock.assert_called_once_with(
443-
{'method': 'callback', 'value': 'bar'}
486+
self.pm._handle_callback.mock.assert_any_call(
487+
{'method': 'callback', 'value': 'bar', 'host_id': host_id}
444488
)
445489
self.pm._handle_disconnect.mock.assert_called_once_with(
446-
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
490+
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
491+
'host_id': 'x'}
447492
)
448493
self.pm._handle_close_room.mock.assert_called_once_with(
449-
{'method': 'close_room', 'value': 'baz'}
494+
{'method': 'close_room', 'value': 'baz', 'host_id': 'x'}
450495
)
451496

452497
def test_background_thread_exception(self):
453498
self.pm._handle_emit = AsyncMock(side_effect=[ValueError(),
454499
asyncio.CancelledError])
455500

456501
async def messages():
457-
yield {'method': 'emit', 'value': 'foo'}
458-
yield {'method': 'emit', 'value': 'bar'}
502+
yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
503+
yield {'method': 'emit', 'value': 'bar', 'host_id': 'x'}
459504

460505
self.pm._listen = messages
461506
_run(self.pm._thread())
462507

463508
self.pm._handle_emit.mock.assert_any_call(
464-
{'method': 'emit', 'value': 'foo'}
509+
{'method': 'emit', 'value': 'foo', 'host_id': 'x'}
465510
)
466511
self.pm._handle_emit.mock.assert_called_with(
467-
{'method': 'emit', 'value': 'bar'}
512+
{'method': 'emit', 'value': 'bar', 'host_id': 'x'}
468513
)

0 commit comments

Comments
 (0)