diff --git a/imcpy/actors/base.py b/imcpy/actors/base.py index bce7694..3810df1 100644 --- a/imcpy/actors/base.py +++ b/imcpy/actors/base.py @@ -328,6 +328,11 @@ def send_static(self, msg, set_timestamp=True): with IMCSenderUDP(svc.ip) as s: s.send(message=msg, port=svc.port) + def send_backseat(self, msg, set_timestamp=True): + # Send to connected backseat (if any) + if self._backseat_server is not None and self._loop is not None: + asyncio.ensure_future(self._backseat_server.write_message(msg), loop=self._loop) + def send(self, node_id, msg, set_timestamp=True): """ Send an imc message to the specified imc node. The node can be specified through it's imc address, system name @@ -351,8 +356,7 @@ def send(self, node_id, msg, set_timestamp=True): self.send_static(msg) # Send to connected backseat (if any) - if self._backseat_server is not None and self._loop is not None: - asyncio.ensure_future(self._backseat_server.write_message(msg), loop=self._loop) + self.send_backseat(msg) def on_exception(self, loc, exc): """ diff --git a/imcpy/network/tcp.py b/imcpy/network/tcp.py index eff839b..a3cf41b 100644 --- a/imcpy/network/tcp.py +++ b/imcpy/network/tcp.py @@ -13,13 +13,20 @@ def __init__(self, name: str, reader: asyncio.StreamReader, writer: asyncio.Stre self.writer = writer self._parser = imcpy.Parser() + def is_closing(self): + return self.writer.is_closing() + async def write_bytes(self, data: bytes): """Write the bytes to the connected clients and flush buffer if necessary. :param data: The serialized IMC message to write. """ - self.writer.write(data) - await self.writer.drain() + try: + self.writer.write(data) + await self.writer.drain() + except ConnectionError as e: + logger.error(f'Connection error ({self.name}): {e}') + await self.close() async def close(self): self.writer.close() @@ -56,11 +63,14 @@ def __init__(self, instance) -> None: self._clients = set() async def on_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - logger.info(f'New connection from {writer.get_extra_info("peername")}') - client = IMCProtocolTCPClientConnection(writer.get_extra_info('peername'), reader, writer) - self._clients.add(client) - await client.handle_data(self.instance) - self._clients.remove(client) + try: + logger.info(f'New connection from {writer.get_extra_info("peername")}') + client = IMCProtocolTCPClientConnection(writer.get_extra_info('peername'), reader, writer) + self._clients.add(client) + await client.handle_data(self.instance) + self._clients.remove(client) + except Exception as e: + logger.error(f'Connnection terminated with error ({e})') async def write_message(self, msg: imcpy.Message): """Write message to all connected clients. @@ -69,4 +79,5 @@ async def write_message(self, msg: imcpy.Message): """ b = msg.serialize() for client in self._clients: - await client.write_bytes(b) + if not client.is_closing(): + await client.write_bytes(b)