Skip to content

Commit 23a141f

Browse files
authored
Cleanup and modularization of Power Manager, Power Distributor and DataPipeline (#881)
2 parents d34512e + f957170 commit 23a141f

File tree

11 files changed

+288
-214
lines changed

11 files changed

+288
-214
lines changed

src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from frequenz.channels.util import SkipMissedAndDrift, Timer, select, selected_from
1515
from typing_extensions import override
1616

17-
from ...timeseries._base_types import PoolType, SystemBounds
17+
from ...microgrid.component import ComponentCategory
18+
from ...timeseries._base_types import SystemBounds
1819
from .._actor import Actor
1920
from .._channel_registry import ChannelRegistry
2021
from ._base_classes import Algorithm, BaseAlgorithm, Proposal, ReportRequest, _Report
@@ -31,7 +32,7 @@ class PowerManagingActor(Actor):
3132

3233
def __init__( # pylint: disable=too-many-arguments
3334
self,
34-
pool_type: PoolType,
35+
component_category: ComponentCategory,
3536
proposals_receiver: Receiver[Proposal],
3637
bounds_subscription_receiver: Receiver[ReportRequest],
3738
power_distributing_requests_sender: Sender[power_distributing.Request],
@@ -44,8 +45,8 @@ def __init__( # pylint: disable=too-many-arguments
4445
"""Create a new instance of the power manager.
4546
4647
Args:
47-
pool_type: The type of the component pool this power manager instance is
48-
going to support.
48+
component_category: The category of the component this power manager
49+
instance is going to support.
4950
proposals_receiver: The receiver for proposals.
5051
bounds_subscription_receiver: The receiver for bounds subscriptions.
5152
power_distributing_requests_sender: The sender for power distribution
@@ -63,7 +64,7 @@ def __init__( # pylint: disable=too-many-arguments
6364
f"PowerManagingActor: Unknown algorithm: {algorithm}"
6465
)
6566

66-
self._pool_type = pool_type
67+
self._component_category = component_category
6768
self._bounds_subscription_receiver = bounds_subscription_receiver
6869
self._power_distributing_requests_sender = power_distributing_requests_sender
6970
self._power_distributing_results_receiver = power_distributing_results_receiver
@@ -132,8 +133,11 @@ def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None:
132133
microgrid,
133134
)
134135

135-
if self._pool_type is not PoolType.BATTERY_POOL:
136-
err = f"PowerManagingActor: Unsupported pool type: {self._pool_type}"
136+
if self._component_category is not ComponentCategory.BATTERY:
137+
err = (
138+
"PowerManagingActor: Unsupported component category: "
139+
f"{self._component_category}"
140+
)
137141
_logger.error(err)
138142
raise NotImplementedError(err)
139143
battery_pool = microgrid.battery_pool(component_ids)

src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import grpc
1414
from frequenz.channels import Receiver, Sender
15+
from typing_extensions import override
1516

1617
from .... import microgrid
1718
from ...._internal._channels import LatestValueCache
@@ -122,6 +123,7 @@ def _add(key: str, value: dict[int, set[int]] | None) -> None:
122123
class BatteryManager(ComponentManager):
123124
"""Class to manage the data streams for batteries."""
124125

126+
@override
125127
def __init__(
126128
self,
127129
component_pool_status_sender: Sender[ComponentPoolStatus],
@@ -163,18 +165,22 @@ def __init__(
163165
)
164166
"""The distribution algorithm used to distribute power between batteries."""
165167

168+
@override
166169
def component_ids(self) -> collections.abc.Set[int]:
167170
"""Return the set of component ids."""
168171
return self._battery_ids
169172

173+
@override
170174
async def start(self) -> None:
171175
"""Start the battery data manager."""
172176
await self._create_channels()
173177

178+
@override
174179
async def stop(self) -> None:
175180
"""Stop the battery data manager."""
176181
await self._component_pool_status_tracker.stop()
177182

183+
@override
178184
async def distribute_power(self, request: Request) -> Result:
179185
"""Distribute the requested power to the components.
180186

src/frequenz/sdk/actor/power_distributing/_component_status/_battery_status_tracker.py

Lines changed: 16 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,19 @@
11
# License: MIT
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3-
"""Class to return battery status."""
43

4+
"""Background service that tracks the status of a battery.
5+
6+
A battery is consider to be WORKING if both the battery and the adjacent inverter are
7+
sending data that shows that they are working.
8+
9+
If either of them stops sending data, or if the data shows that they are not working,
10+
then the battery is considered to be NOT_WORKING.
11+
12+
If a battery and its adjacent inverter are WORKING, but the last request to the battery
13+
failed, then the battery's status is considered to be UNCERTAIN. In this case, the
14+
battery is blocked for a short time, and it is not recommended to use it unless it is
15+
necessary.
16+
"""
517

618
import asyncio
719
import logging
@@ -28,6 +40,7 @@
2840
InverterData,
2941
)
3042
from ..._background_service import BackgroundService
43+
from ._blocking_status import BlockingStatus
3144
from ._component_status import (
3245
ComponentStatus,
3346
ComponentStatusEnum,
@@ -53,79 +66,6 @@ class _ComponentStreamStatus:
5366
"""Flag whether last message was correct or not."""
5467

5568

56-
@dataclass
57-
class _BlockingStatus:
58-
min_duration: timedelta
59-
"""The minimum blocking duration."""
60-
61-
max_duration: timedelta
62-
"""The maximum blocking duration."""
63-
64-
last_blocking_duration: timedelta = timedelta(seconds=0.0)
65-
"""Last blocking duration."""
66-
67-
blocked_until: datetime | None = None
68-
"""Until when the battery is blocked."""
69-
70-
def __post_init__(self) -> None:
71-
assert self.min_duration <= self.max_duration, (
72-
f"Minimum blocking duration ({self.min_duration}) cannot be greater "
73-
f"than maximum blocking duration ({self.max_duration})"
74-
)
75-
self.last_blocking_duration = self.min_duration
76-
self._timedelta_zero = timedelta(seconds=0.0)
77-
78-
def block(self) -> timedelta:
79-
"""Block battery.
80-
81-
Battery can be unblocked using `self.unblock()` method.
82-
83-
Returns:
84-
The duration for which the battery is blocked.
85-
"""
86-
now = datetime.now(tz=timezone.utc)
87-
88-
# If is not blocked
89-
if self.blocked_until is None:
90-
self.last_blocking_duration = self.min_duration
91-
self.blocked_until = now + self.last_blocking_duration
92-
return self.last_blocking_duration
93-
94-
# If still blocked, then do nothing
95-
if self.blocked_until > now:
96-
return self._timedelta_zero
97-
98-
# If previous blocking time expired, then blocked it once again.
99-
# Increase last blocking time, unless it reach the maximum.
100-
self.last_blocking_duration = min(
101-
2 * self.last_blocking_duration, self.max_duration
102-
)
103-
self.blocked_until = now + self.last_blocking_duration
104-
105-
return self.last_blocking_duration
106-
107-
def unblock(self) -> None:
108-
"""Unblock battery.
109-
110-
This will reset duration of the next blocking timeout.
111-
112-
Battery can be blocked using `self.block()` method.
113-
"""
114-
self.blocked_until = None
115-
116-
def is_blocked(self) -> bool:
117-
"""Return if battery is blocked.
118-
119-
Battery can be blocked if last request for that battery failed.
120-
121-
Returns:
122-
True if battery is blocked, False otherwise.
123-
"""
124-
if self.blocked_until is None:
125-
return False
126-
return self.blocked_until > datetime.now(tz=timezone.utc)
127-
128-
12969
class BatteryStatusTracker(ComponentStatusTracker, BackgroundService):
13070
"""Class for tracking if battery is working.
13171
@@ -194,8 +134,8 @@ def __init__( # pylint: disable=too-many-arguments
194134
# First battery is considered as not working.
195135
# Change status after first messages are received.
196136
self._last_status: ComponentStatusEnum = ComponentStatusEnum.NOT_WORKING
197-
self._blocking_status: _BlockingStatus = _BlockingStatus(
198-
timedelta(seconds=1.0), max_blocking_duration
137+
self._blocking_status: BlockingStatus = BlockingStatus(
138+
min_duration=timedelta(seconds=1.0), max_duration=max_blocking_duration
199139
)
200140
self._timedelta_zero = timedelta(seconds=0.0)
201141

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tracking of the blocking status of a component."""
5+
6+
from dataclasses import dataclass
7+
from datetime import datetime, timedelta, timezone
8+
9+
10+
@dataclass(kw_only=True)
11+
class BlockingStatus:
12+
"""Tracking of the blocking status of a component."""
13+
14+
min_duration: timedelta
15+
"""The minimum blocking duration."""
16+
17+
max_duration: timedelta
18+
"""The maximum blocking duration."""
19+
20+
last_blocking_duration: timedelta = timedelta(seconds=0.0)
21+
"""Last blocking duration."""
22+
23+
blocked_until: datetime | None = None
24+
"""Time until which the component is blocked."""
25+
26+
def __post_init__(self) -> None:
27+
"""Validate the blocking duration."""
28+
assert self.min_duration <= self.max_duration, (
29+
f"Minimum blocking duration ({self.min_duration}) cannot be greater "
30+
f"than maximum blocking duration ({self.max_duration})"
31+
)
32+
self.last_blocking_duration = self.min_duration
33+
self._timedelta_zero = timedelta(seconds=0.0)
34+
35+
def block(self) -> timedelta:
36+
"""Set the component as blocked.
37+
38+
Returns:
39+
The duration for which the component is blocked.
40+
"""
41+
now = datetime.now(tz=timezone.utc)
42+
43+
# If is not blocked
44+
if self.blocked_until is None:
45+
self.last_blocking_duration = self.min_duration
46+
self.blocked_until = now + self.last_blocking_duration
47+
return self.last_blocking_duration
48+
49+
# If still blocked, then do nothing
50+
if self.blocked_until > now:
51+
return self._timedelta_zero
52+
53+
# If previous blocking time expired, then blocked it once again.
54+
# Increase last blocking time, unless it reach the maximum.
55+
self.last_blocking_duration = min(
56+
2 * self.last_blocking_duration, self.max_duration
57+
)
58+
self.blocked_until = now + self.last_blocking_duration
59+
60+
return self.last_blocking_duration
61+
62+
def unblock(self) -> None:
63+
"""Set the component as unblocked."""
64+
self.blocked_until = None
65+
66+
def is_blocked(self) -> bool:
67+
"""Check if the component is blocked.
68+
69+
Returns:
70+
True if battery is blocked, False otherwise.
71+
"""
72+
if self.blocked_until is None:
73+
return False
74+
return self.blocked_until > datetime.now(tz=timezone.utc)

0 commit comments

Comments
 (0)