Skip to content

Commit 1659da2

Browse files
committed
Move AssetDetails and MessageHandlers into separate file
1 parent c2174dd commit 1659da2

File tree

3 files changed

+188
-156
lines changed

3 files changed

+188
-156
lines changed

src/s2python/s2_asset_details.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import logging
2+
import uuid
3+
from dataclasses import dataclass
4+
from typing import Optional, List
5+
6+
from s2python.common import (
7+
Role,
8+
ResourceManagerDetails,
9+
Duration,
10+
Currency,
11+
)
12+
from s2python.generated.gen_s2 import CommodityQuantity
13+
from s2python.s2_control_type import S2ControlType
14+
15+
logger = logging.getLogger("s2python")
16+
17+
18+
@dataclass
19+
class AssetDetails: # pylint: disable=too-many-instance-attributes
20+
resource_id: uuid.UUID
21+
22+
provides_forecast: bool
23+
provides_power_measurements: List[CommodityQuantity]
24+
25+
instruction_processing_delay: Duration
26+
roles: List[Role]
27+
currency: Optional[Currency] = None
28+
29+
name: Optional[str] = None
30+
manufacturer: Optional[str] = None
31+
model: Optional[str] = None
32+
firmware_version: Optional[str] = None
33+
serial_number: Optional[str] = None
34+
35+
def to_resource_manager_details(
36+
self, control_types: List[S2ControlType]
37+
) -> ResourceManagerDetails:
38+
return ResourceManagerDetails(
39+
available_control_types=[
40+
control_type.get_protocol_control_type()
41+
for control_type in control_types
42+
],
43+
currency=self.currency,
44+
firmware_version=self.firmware_version,
45+
instruction_processing_delay=self.instruction_processing_delay,
46+
manufacturer=self.manufacturer,
47+
message_id=uuid.uuid4(),
48+
model=self.model,
49+
name=self.name,
50+
provides_forecast=self.provides_forecast,
51+
provides_power_measurement_types=self.provides_power_measurements,
52+
resource_id=self.resource_id,
53+
roles=self.roles,
54+
serial_number=self.serial_number,
55+
)

src/s2python/s2_connection.py

Lines changed: 11 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
__all__ = [
2+
"AssetDetails",
3+
"S2MessageHandler",
4+
"SendOkay",
5+
"MessageHandlers",
6+
"S2Connection"
7+
] # re-export for backward compatibility
8+
19
try:
210
import websockets
311
except ImportError as exc:
@@ -12,8 +20,7 @@
1220
import threading
1321
import uuid
1422
import ssl
15-
from dataclasses import dataclass
16-
from typing import Any, Optional, List, Type, Dict, Callable, Awaitable, Union
23+
from typing import Any, Optional, List, Dict, Awaitable
1724

1825
from websockets.asyncio.client import (
1926
ClientConnection as WSConnection,
@@ -25,173 +32,21 @@
2532
ReceptionStatus,
2633
Handshake,
2734
EnergyManagementRole,
28-
Role,
2935
HandshakeResponse,
30-
ResourceManagerDetails,
31-
Duration,
32-
Currency,
3336
SelectControlType,
3437
)
35-
from s2python.generated.gen_s2 import CommodityQuantity
3638
from s2python.reception_status_awaiter import ReceptionStatusAwaiter
3739
from s2python.s2_control_type import S2ControlType
3840
from s2python.s2_parser import S2Parser
3941
from s2python.s2_validation_error import S2ValidationError
42+
from s2python.s2_asset_details import AssetDetails
43+
from s2python.s2_message_handlers import S2MessageHandler, SendOkay, MessageHandlers
4044
from s2python.message import S2Message
4145
from s2python.version import S2_VERSION
4246

4347
logger = logging.getLogger("s2python")
4448

4549

46-
@dataclass
47-
class AssetDetails: # pylint: disable=too-many-instance-attributes
48-
resource_id: uuid.UUID
49-
50-
provides_forecast: bool
51-
provides_power_measurements: List[CommodityQuantity]
52-
53-
instruction_processing_delay: Duration
54-
roles: List[Role]
55-
currency: Optional[Currency] = None
56-
57-
name: Optional[str] = None
58-
manufacturer: Optional[str] = None
59-
model: Optional[str] = None
60-
firmware_version: Optional[str] = None
61-
serial_number: Optional[str] = None
62-
63-
def to_resource_manager_details(
64-
self, control_types: List[S2ControlType]
65-
) -> ResourceManagerDetails:
66-
return ResourceManagerDetails(
67-
available_control_types=[
68-
control_type.get_protocol_control_type()
69-
for control_type in control_types
70-
],
71-
currency=self.currency,
72-
firmware_version=self.firmware_version,
73-
instruction_processing_delay=self.instruction_processing_delay,
74-
manufacturer=self.manufacturer,
75-
message_id=uuid.uuid4(),
76-
model=self.model,
77-
name=self.name,
78-
provides_forecast=self.provides_forecast,
79-
provides_power_measurement_types=self.provides_power_measurements,
80-
resource_id=self.resource_id,
81-
roles=self.roles,
82-
serial_number=self.serial_number,
83-
)
84-
85-
86-
S2MessageHandler = Union[
87-
Callable[["S2Connection", S2Message, Callable[[], None]], None],
88-
Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]],
89-
]
90-
91-
92-
class SendOkay:
93-
status_is_send: threading.Event
94-
connection: "S2Connection"
95-
subject_message_id: uuid.UUID
96-
97-
def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID):
98-
self.status_is_send = threading.Event()
99-
self.connection = connection
100-
self.subject_message_id = subject_message_id
101-
102-
async def run_async(self) -> None:
103-
self.status_is_send.set()
104-
105-
await self.connection._respond_with_reception_status( # pylint: disable=protected-access
106-
subject_message_id=self.subject_message_id,
107-
status=ReceptionStatusValues.OK,
108-
diagnostic_label="Processed okay.",
109-
)
110-
111-
def run_sync(self) -> None:
112-
self.status_is_send.set()
113-
114-
self.connection.respond_with_reception_status_sync(
115-
subject_message_id=self.subject_message_id,
116-
status=ReceptionStatusValues.OK,
117-
diagnostic_label="Processed okay.",
118-
)
119-
120-
async def ensure_send_async(self, type_msg: Type[S2Message]) -> None:
121-
if not self.status_is_send.is_set():
122-
logger.warning(
123-
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
124-
"Sending it now.",
125-
type_msg,
126-
self.subject_message_id,
127-
)
128-
await self.run_async()
129-
130-
def ensure_send_sync(self, type_msg: Type[S2Message]) -> None:
131-
if not self.status_is_send.is_set():
132-
logger.warning(
133-
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
134-
"Sending it now.",
135-
type_msg,
136-
self.subject_message_id,
137-
)
138-
self.run_sync()
139-
140-
141-
class MessageHandlers:
142-
handlers: Dict[Type[S2Message], S2MessageHandler]
143-
144-
def __init__(self) -> None:
145-
self.handlers = {}
146-
147-
async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None:
148-
"""Handle the S2 message using the registered handler.
149-
150-
:param connection: The S2 conncetion the `msg` is received from.
151-
:param msg: The S2 message
152-
"""
153-
handler = self.handlers.get(type(msg))
154-
if handler is not None:
155-
send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr]
156-
157-
try:
158-
if asyncio.iscoroutinefunction(handler):
159-
await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type]
160-
await send_okay.ensure_send_async(type(msg))
161-
else:
162-
163-
def do_message() -> None:
164-
handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type]
165-
send_okay.ensure_send_sync(type(msg))
166-
167-
eventloop = asyncio.get_event_loop()
168-
await eventloop.run_in_executor(executor=None, func=do_message)
169-
except Exception:
170-
if not send_okay.status_is_send.is_set():
171-
await connection._respond_with_reception_status( # pylint: disable=protected-access
172-
subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr]
173-
status=ReceptionStatusValues.PERMANENT_ERROR,
174-
diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long
175-
f"an unrecoverable error occurred.",
176-
)
177-
raise
178-
else:
179-
logger.warning(
180-
"Received a message of type %s but no handler is registered. Ignoring the message.",
181-
type(msg),
182-
)
183-
184-
def register_handler(
185-
self, msg_type: Type[S2Message], handler: S2MessageHandler
186-
) -> None:
187-
"""Register a coroutine function or a normal function as the handler for a specific S2 message type.
188-
189-
:param msg_type: The S2 message type to attach the handler to.
190-
:param handler: The function (asynchronuous or normal) which should handle the S2 message.
191-
"""
192-
self.handlers[msg_type] = handler
193-
194-
19550
class S2Connection: # pylint: disable=too-many-instance-attributes
19651
url: str
19752
reconnect: bool
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import asyncio
2+
import logging
3+
import threading
4+
import uuid
5+
from typing import Type, Dict, Callable, Awaitable, Union, TYPE_CHECKING
6+
7+
from s2python.common import ReceptionStatusValues
8+
from s2python.message import S2Message
9+
10+
if TYPE_CHECKING:
11+
from s2python.s2_connection import S2Connection
12+
13+
logger = logging.getLogger("s2python")
14+
15+
16+
S2MessageHandler = Union[
17+
Callable[["S2Connection", S2Message, Callable[[], None]], None],
18+
Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]],
19+
]
20+
21+
22+
class SendOkay:
23+
status_is_send: threading.Event
24+
connection: "S2Connection"
25+
subject_message_id: uuid.UUID
26+
27+
def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID):
28+
self.status_is_send = threading.Event()
29+
self.connection = connection
30+
self.subject_message_id = subject_message_id
31+
32+
async def run_async(self) -> None:
33+
self.status_is_send.set()
34+
35+
await self.connection._respond_with_reception_status( # pylint: disable=protected-access
36+
subject_message_id=self.subject_message_id,
37+
status=ReceptionStatusValues.OK,
38+
diagnostic_label="Processed okay.",
39+
)
40+
41+
def run_sync(self) -> None:
42+
self.status_is_send.set()
43+
44+
self.connection.respond_with_reception_status_sync(
45+
subject_message_id=self.subject_message_id,
46+
status=ReceptionStatusValues.OK,
47+
diagnostic_label="Processed okay.",
48+
)
49+
50+
async def ensure_send_async(self, type_msg: Type[S2Message]) -> None:
51+
if not self.status_is_send.is_set():
52+
logger.warning(
53+
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
54+
"Sending it now.",
55+
type_msg,
56+
self.subject_message_id,
57+
)
58+
await self.run_async()
59+
60+
def ensure_send_sync(self, type_msg: Type[S2Message]) -> None:
61+
if not self.status_is_send.is_set():
62+
logger.warning(
63+
"Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. "
64+
"Sending it now.",
65+
type_msg,
66+
self.subject_message_id,
67+
)
68+
self.run_sync()
69+
70+
71+
class MessageHandlers:
72+
handlers: Dict[Type[S2Message], S2MessageHandler]
73+
74+
def __init__(self) -> None:
75+
self.handlers = {}
76+
77+
async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None:
78+
"""Handle the S2 message using the registered handler.
79+
80+
:param connection: The S2 conncetion the `msg` is received from.
81+
:param msg: The S2 message
82+
"""
83+
handler = self.handlers.get(type(msg))
84+
if handler is not None:
85+
send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr]
86+
87+
try:
88+
if asyncio.iscoroutinefunction(handler):
89+
await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type]
90+
await send_okay.ensure_send_async(type(msg))
91+
else:
92+
93+
def do_message() -> None:
94+
handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type]
95+
send_okay.ensure_send_sync(type(msg))
96+
97+
eventloop = asyncio.get_event_loop()
98+
await eventloop.run_in_executor(executor=None, func=do_message)
99+
except Exception:
100+
if not send_okay.status_is_send.is_set():
101+
await connection._respond_with_reception_status( # pylint: disable=protected-access
102+
subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr]
103+
status=ReceptionStatusValues.PERMANENT_ERROR,
104+
diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long
105+
f"an unrecoverable error occurred.",
106+
)
107+
raise
108+
else:
109+
logger.warning(
110+
"Received a message of type %s but no handler is registered. Ignoring the message.",
111+
type(msg),
112+
)
113+
114+
def register_handler(
115+
self, msg_type: Type[S2Message], handler: S2MessageHandler
116+
) -> None:
117+
"""Register a coroutine function or a normal function as the handler for a specific S2 message type.
118+
119+
:param msg_type: The S2 message type to attach the handler to.
120+
:param handler: The function (asynchronuous or normal) which should handle the S2 message.
121+
"""
122+
self.handlers[msg_type] = handler

0 commit comments

Comments
 (0)