Skip to content

Commit

Permalink
Add try-catch for tcp connections and add send_backseat as separate f…
Browse files Browse the repository at this point in the history
…unction
  • Loading branch information
oysstu committed Sep 3, 2024
1 parent 6622a02 commit dc57c3a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
8 changes: 6 additions & 2 deletions imcpy/actors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ def send_static(self, msg, set_timestamp=True):
with IMCSenderUDP(svc.ip) as s:
s.send(message=msg, port=svc.port)

def send_backseat(self, msg, set_timestamp=True):
# Send to connected backseat (if any)
if self._backseat_server is not None and self._loop is not None:
asyncio.ensure_future(self._backseat_server.write_message(msg), loop=self._loop)

def send(self, node_id, msg, set_timestamp=True):
"""
Send an imc message to the specified imc node. The node can be specified through it's imc address, system name
Expand All @@ -351,8 +356,7 @@ def send(self, node_id, msg, set_timestamp=True):
self.send_static(msg)

# Send to connected backseat (if any)
if self._backseat_server is not None and self._loop is not None:
asyncio.ensure_future(self._backseat_server.write_message(msg), loop=self._loop)
self.send_backseat(msg)

def on_exception(self, loc, exc):
"""
Expand Down
27 changes: 19 additions & 8 deletions imcpy/network/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@ def __init__(self, name: str, reader: asyncio.StreamReader, writer: asyncio.Stre
self.writer = writer
self._parser = imcpy.Parser()

def is_closing(self):
return self.writer.is_closing()

async def write_bytes(self, data: bytes):
"""Write the bytes to the connected clients and flush buffer if necessary.
:param data: The serialized IMC message to write.
"""
self.writer.write(data)
await self.writer.drain()
try:
self.writer.write(data)
await self.writer.drain()
except ConnectionError as e:
logger.error(f'Connection error ({self.name}): {e}')
await self.close()

async def close(self):
self.writer.close()
Expand Down Expand Up @@ -56,11 +63,14 @@ def __init__(self, instance) -> None:
self._clients = set()

async def on_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
logger.info(f'New connection from {writer.get_extra_info("peername")}')
client = IMCProtocolTCPClientConnection(writer.get_extra_info('peername'), reader, writer)
self._clients.add(client)
await client.handle_data(self.instance)
self._clients.remove(client)
try:
logger.info(f'New connection from {writer.get_extra_info("peername")}')
client = IMCProtocolTCPClientConnection(writer.get_extra_info('peername'), reader, writer)
self._clients.add(client)
await client.handle_data(self.instance)
self._clients.remove(client)
except Exception as e:
logger.error(f'Connnection terminated with error ({e})')

async def write_message(self, msg: imcpy.Message):
"""Write message to all connected clients.
Expand All @@ -69,4 +79,5 @@ async def write_message(self, msg: imcpy.Message):
"""
b = msg.serialize()
for client in self._clients:
await client.write_bytes(b)
if not client.is_closing():
await client.write_bytes(b)

0 comments on commit dc57c3a

Please sign in to comment.