Skip to content

Commit 34a3948

Browse files
committed
Move AssetDetails and MessageHandlers into separate file
1 parent c2174dd commit 34a3948

File tree

3 files changed

+180
-156
lines changed

3 files changed

+180
-156
lines changed

src/s2python/s2_asset_details.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import logging
2+
import uuid
3+
from dataclasses import dataclass
4+
from typing import Optional, List
5+
6+
from s2python.common import (
7+
Role,
8+
ResourceManagerDetails,
9+
Duration,
10+
Currency,
11+
)
12+
from s2python.generated.gen_s2 import CommodityQuantity
13+
from s2python.s2_control_type import S2ControlType
14+
15+
logger = logging.getLogger("s2python")
16+
17+
18+
@dataclass
19+
class AssetDetails: # pylint: disable=too-many-instance-attributes
20+
resource_id: uuid.UUID
21+
22+
provides_forecast: bool
23+
provides_power_measurements: List[CommodityQuantity]
24+
25+
instruction_processing_delay: Duration
26+
roles: List[Role]
27+
currency: Optional[Currency] = None
28+
29+
name: Optional[str] = None
30+
manufacturer: Optional[str] = None
31+
model: Optional[str] = None
32+
firmware_version: Optional[str] = None
33+
serial_number: Optional[str] = None
34+
35+
def to_resource_manager_details(
36+
self, control_types: List[S2ControlType]
37+
) -> ResourceManagerDetails:
38+
return ResourceManagerDetails(
39+
available_control_types=[
40+
control_type.get_protocol_control_type()
41+
for control_type in control_types
42+
],
43+
currency=self.currency,
44+
firmware_version=self.firmware_version,
45+
instruction_processing_delay=self.instruction_processing_delay,
46+
manufacturer=self.manufacturer,
47+
message_id=uuid.uuid4(),
48+
model=self.model,
49+
name=self.name,
50+
provides_forecast=self.provides_forecast,
51+
provides_power_measurement_types=self.provides_power_measurements,
52+
resource_id=self.resource_id,
53+
roles=self.roles,
54+
serial_number=self.serial_number,
55+
)

src/s2python/s2_connection.py

Lines changed: 6 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
from s2python.s2_asset_details import AssetDetails
2+
from s2python.s2_message_handlers import S2MessageHandler, SendOkay, MessageHandlers
3+
4+
__all__ = ["AssetDetails", "S2MessageHandler", "SendOkay", "MessageHandlers", "S2Connection"] # re-export for backward compatibility
5+
16
try:
27
import websockets
38
except ImportError as exc:
@@ -12,8 +17,7 @@
1217
import threading
1318
import uuid
1419
import ssl
15-
from dataclasses import dataclass
16-
from typing import Any, Optional, List, Type, Dict, Callable, Awaitable, Union
20+
from typing import Any, Optional, List, Dict, Awaitable
1721

1822
from websockets.asyncio.client import (
1923
ClientConnection as WSConnection,
@@ -25,14 +29,9 @@
2529
ReceptionStatus,
2630
Handshake,
2731
EnergyManagementRole,
28-
Role,
2932
HandshakeResponse,
30-
ResourceManagerDetails,
31-
Duration,
32-
Currency,
3333
SelectControlType,
3434
)
35-
from s2python.generated.gen_s2 import CommodityQuantity
3635
from s2python.reception_status_awaiter import ReceptionStatusAwaiter
3736
from s2python.s2_control_type import S2ControlType
3837
from s2python.s2_parser import S2Parser
@@ -43,155 +42,6 @@
4342
logger = logging.getLogger("s2python")
4443

4544

46-
@dataclass
47-
class AssetDetails: # pylint: disable=too-many-instance-attributes
48-
resource_id: uuid.UUID
49-
50-
provides_forecast: bool
51-
provides_power_measurements: List[CommodityQuantity]
52-
53-
instruction_processing_delay: Duration
54-
roles: List[Role]
55-
currency: Optional[Currency] = None
56-
57-
name: Optional[str] = None
58-
manufacturer: Optional[str] = None
59-
model: Optional[str] = None
60-
firmware_version: Optional[str] = None
61-
serial_number: Optional[str] = None
62-
63-
def to_resource_manager_details(
64-
self, control_types: List[S2ControlType]
65-
) -> ResourceManagerDetails:
66-
return ResourceManagerDetails(
67-
available_control_types=[
68-
control_type.get_protocol_control_type()
69-
for control_type in control_types
70-
],
71-
currency=self.currency,
72-
firmware_version=self.firmware_version,
73-
instruction_processing_delay=self.instruction_processing_delay,
74-
manufacturer=self.manufacturer,
75-
message_id=uuid.uuid4(),
76-
model=self.model,
77-
name=self.name,
78-
provides_forecast=self.provides_forecast,
79-
provides_power_measurement_types=self.provides_power_measurements,
80-
resource_id=self.resource_id,
81-
roles=self.roles,
82-
serial_number=self.serial_number,
83-
)
84-
85-
86-
S2MessageHandler = Union[
87-
Callable[["S2Connection", S2Message, Callable[[], None]], None],
88-
Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]],
89-
]
90-
91-
92-
class SendOkay:
93-
status_is_send: threading.Event
94-
connection: "S2Connection"
95-
subject_message_id: uuid.UUID
96-
97-
def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID):
98-
self.status_is_send = threading.Event()
99-
self.connection = connection
100-
self.subject_message_id = subject_message_id
101-
102-
async def run_async(self) -> None:
103-
self.status_is_send.set()
104-
105-
await self.connection._respond_with_reception_status( # pylint: disable=protected-access
106-
subject_message_id=self.subject_message_id,
107-
status=ReceptionStatusValues.OK,
108-
diagnostic_label="Processed okay.",
109-
)
110-
111-
def run_sync(self) -> None:
112-
self.status_is_send.set()
113-
114-
self.connection.respond_with_reception_status_sync(
115-
subject_message_id=self.subject_message_id,
116-
status=ReceptionStatusValues.OK,
117-
diagnostic_label="Processed okay.",
118-
)
119-
120-
async def ensure_send_async(self, type_msg: Type[S2Message]) -> None:
121-
if not self.status_is_send.is_set():
122-
logger.warning(
123-
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
124-
"Sending it now.",
125-
type_msg,
126-
self.subject_message_id,
127-
)
128-
await self.run_async()
129-
130-
def ensure_send_sync(self, type_msg: Type[S2Message]) -> None:
131-
if not self.status_is_send.is_set():
132-
logger.warning(
133-
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
134-
"Sending it now.",
135-
type_msg,
136-
self.subject_message_id,
137-
)
138-
self.run_sync()
139-
140-
141-
class MessageHandlers:
142-
handlers: Dict[Type[S2Message], S2MessageHandler]
143-
144-
def __init__(self) -> None:
145-
self.handlers = {}
146-
147-
async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None:
148-
"""Handle the S2 message using the registered handler.
149-
150-
:param connection: The S2 conncetion the `msg` is received from.
151-
:param msg: The S2 message
152-
"""
153-
handler = self.handlers.get(type(msg))
154-
if handler is not None:
155-
send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr]
156-
157-
try:
158-
if asyncio.iscoroutinefunction(handler):
159-
await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type]
160-
await send_okay.ensure_send_async(type(msg))
161-
else:
162-
163-
def do_message() -> None:
164-
handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type]
165-
send_okay.ensure_send_sync(type(msg))
166-
167-
eventloop = asyncio.get_event_loop()
168-
await eventloop.run_in_executor(executor=None, func=do_message)
169-
except Exception:
170-
if not send_okay.status_is_send.is_set():
171-
await connection._respond_with_reception_status( # pylint: disable=protected-access
172-
subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr]
173-
status=ReceptionStatusValues.PERMANENT_ERROR,
174-
diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long
175-
f"an unrecoverable error occurred.",
176-
)
177-
raise
178-
else:
179-
logger.warning(
180-
"Received a message of type %s but no handler is registered. Ignoring the message.",
181-
type(msg),
182-
)
183-
184-
def register_handler(
185-
self, msg_type: Type[S2Message], handler: S2MessageHandler
186-
) -> None:
187-
"""Register a coroutine function or a normal function as the handler for a specific S2 message type.
188-
189-
:param msg_type: The S2 message type to attach the handler to.
190-
:param handler: The function (asynchronuous or normal) which should handle the S2 message.
191-
"""
192-
self.handlers[msg_type] = handler
193-
194-
19545
class S2Connection: # pylint: disable=too-many-instance-attributes
19646
url: str
19747
reconnect: bool
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import asyncio
2+
import logging
3+
import threading
4+
import uuid
5+
from typing import Type, Dict, Callable, Awaitable, Union
6+
7+
from s2python.common import ReceptionStatusValues
8+
from s2python.message import S2Message
9+
10+
logger = logging.getLogger("s2python")
11+
12+
13+
S2MessageHandler = Union[
14+
Callable[["S2Connection", S2Message, Callable[[], None]], None],
15+
Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]],
16+
]
17+
18+
19+
class SendOkay:
20+
status_is_send: threading.Event
21+
connection: "S2Connection"
22+
subject_message_id: uuid.UUID
23+
24+
def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID):
25+
self.status_is_send = threading.Event()
26+
self.connection = connection
27+
self.subject_message_id = subject_message_id
28+
29+
async def run_async(self) -> None:
30+
self.status_is_send.set()
31+
32+
await self.connection._respond_with_reception_status( # pylint: disable=protected-access
33+
subject_message_id=self.subject_message_id,
34+
status=ReceptionStatusValues.OK,
35+
diagnostic_label="Processed okay.",
36+
)
37+
38+
def run_sync(self) -> None:
39+
self.status_is_send.set()
40+
41+
self.connection.respond_with_reception_status_sync(
42+
subject_message_id=self.subject_message_id,
43+
status=ReceptionStatusValues.OK,
44+
diagnostic_label="Processed okay.",
45+
)
46+
47+
async def ensure_send_async(self, type_msg: Type[S2Message]) -> None:
48+
if not self.status_is_send.is_set():
49+
logger.warning(
50+
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
51+
"Sending it now.",
52+
type_msg,
53+
self.subject_message_id,
54+
)
55+
await self.run_async()
56+
57+
def ensure_send_sync(self, type_msg: Type[S2Message]) -> None:
58+
if not self.status_is_send.is_set():
59+
logger.warning(
60+
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
61+
"Sending it now.",
62+
type_msg,
63+
self.subject_message_id,
64+
)
65+
self.run_sync()
66+
67+
68+
class MessageHandlers:
69+
handlers: Dict[Type[S2Message], S2MessageHandler]
70+
71+
def __init__(self) -> None:
72+
self.handlers = {}
73+
74+
async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None:
75+
"""Handle the S2 message using the registered handler.
76+
77+
:param connection: The S2 conncetion the `msg` is received from.
78+
:param msg: The S2 message
79+
"""
80+
handler = self.handlers.get(type(msg))
81+
if handler is not None:
82+
send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr]
83+
84+
try:
85+
if asyncio.iscoroutinefunction(handler):
86+
await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type]
87+
await send_okay.ensure_send_async(type(msg))
88+
else:
89+
90+
def do_message() -> None:
91+
handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type]
92+
send_okay.ensure_send_sync(type(msg))
93+
94+
eventloop = asyncio.get_event_loop()
95+
await eventloop.run_in_executor(executor=None, func=do_message)
96+
except Exception:
97+
if not send_okay.status_is_send.is_set():
98+
await connection._respond_with_reception_status( # pylint: disable=protected-access
99+
subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr]
100+
status=ReceptionStatusValues.PERMANENT_ERROR,
101+
diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long
102+
f"an unrecoverable error occurred.",
103+
)
104+
raise
105+
else:
106+
logger.warning(
107+
"Received a message of type %s but no handler is registered. Ignoring the message.",
108+
type(msg),
109+
)
110+
111+
def register_handler(
112+
self, msg_type: Type[S2Message], handler: S2MessageHandler
113+
) -> None:
114+
"""Register a coroutine function or a normal function as the handler for a specific S2 message type.
115+
116+
:param msg_type: The S2 message type to attach the handler to.
117+
:param handler: The function (asynchronuous or normal) which should handle the S2 message.
118+
"""
119+
self.handlers[msg_type] = handler

0 commit comments

Comments
 (0)