Skip to content

Commit 4f26871

Browse files
authored
Merge pull request #91 from ezmsg-org/dev
Major refactor of signal processing modules using OOP instead of generator methods
2 parents 54ae431 + 9273306 commit 4f26871

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+9904
-3879
lines changed

docs/ProcessorsBase.md

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
## Processor Base Classes
2+
3+
The `ezmsg.sigproc.base` module contains the base classes for the signal processors. The base classes are designed to allow users to create custom signal processors with minimal errors and minimal repetition of boilerplate code.
4+
5+
> The information below was written at the time of a major refactor to `ezmsg.sigproc.base` to help collate the design decisions and to help with future refactoring. However, it may be outdated or incomplete. Please refer to the source code for the most accurate information.
6+
7+
> Updates: Added CompositeProducer, BaseProcessorUnit.
8+
9+
10+
### Generic TypeVars
11+
12+
| Idx | Class | Description |
13+
|-----|-----------------------|----------------------------------------------------------------------------|
14+
| 1 | `MessageInType` (Mi) | for messages passed to a consumer, processor, or transformer |
15+
| 2 | `MessageOutType` (Mo) | for messages returned by a producer, processor, or transformer |
16+
| 3 | `SettingsType` | bound to ez.Settings |
17+
| 4 | `StateType` (St) | bound to ProcessorState which is simply ez.State with a `hash: int` field. |
18+
19+
20+
### Protocols
21+
22+
| Idx | Class | Parent | State | `__call__` sig | @state | partial_fit |
23+
|-----|-----------------------|--------|-------|--------------------------|--------|-------------|
24+
| 1 | `Processor` | - | No | Any -> Any | - | - |
25+
| 2 | `Producer` | - | No | None -> Mo | - | - |
26+
| 3 | `Consumer` | 1 | No | Mi -> None | - | - |
27+
| 4 | `Transformer` | 1 | No | Mi -> Mo | - | - |
28+
| 5 | `StatefulProcessor` | - | Yes | Any -> Any | Y | - |
29+
| 6 | `StatefulProducer` | - | Yes | None -> Mo | Y | - |
30+
| 7 | `StatefulConsumer` | 5 | Yes | Mi -> None | Y | - |
31+
| 8 | `StatefulTransformer` | 5 | Yes | Mi -> Mo | Y | - |
32+
| 9 | `AdaptiveTransformer` | 8 | Yes | Mi -> Mo | Y | Y |
33+
34+
Note: `__call__` and `partial_fit` both have asynchronous alternatives: `__acall__` and `apartial_fit` respectively.
35+
36+
37+
### Abstract implementations (Base Classes) for standalone processors
38+
39+
| Idx | Class | Parent | Protocol | Features |
40+
|-----|---------------------------|--------|----------|--------------------------------------------------------------------------------------------|
41+
| 1 | `BaseProcessor` | - | 1 | `__init__` for settings; `__call__` (alias: `send`) wraps abstract `_process`. |
42+
| 2 | `BaseProducer` | - | 2 | Similar to `BaseProcessor`; `next`/`anext` instead of `send`/`asend` aliases. async first! |
43+
| 3 | `BaseConsumer` | 1 | 3 | Overrides return type to None |
44+
| 4 | `BaseTransformer` | 1 | 4 | Overrides input and return types |
45+
| 5 | `BaseStatefulProcessor` | 1 | 5 | `state` setter unpickles arg; `stateful_op` wraps `__call__` |
46+
| 6 | `BaseStatefulProducer` | 2 | 6 | `state` setter and getter; `stateful_op` wraps `__call__` which runs `__acall__`. |
47+
| 7 | `BaseStatefulConsumer` | 5 | 7 | Overrides return type to None |
48+
| 8 | `BaseStatefulTransformer` | 5 | 8 | Overrides input and return types |
49+
| 9 | `BaseAdaptiveTransformer` | 8 | 9 | Implements `partial_fit`. `__call__` may call `partial_fit` if message has `.trigger`. |
50+
| 10 | `BaseAsyncTransformer` | 8 | 8 | `__acall__` wraps abstract `_aprocess`; `__call__` runs `__acall__`. |
51+
| 11 | `CompositeProcessor` | 1 | 5 | Methods iterate over sequence of processors created in `_initialize_processors`. |
52+
| 12 | `CompositeProducer` | 2 | 6 | Similar to `CompositeProcessor`, but first processor must be a producer. |
53+
54+
NOTES:
55+
1. Producers do not inherit from `BaseProcessor`, so concrete implementations should subclass `BaseProducer` or `BaseStatefulProducer`.
56+
2. For concrete implementations of non-producer processors, inherit from the base subclasses of `BaseProcessor` (eg. `BaseConsumer`, `BaseTransformer`) and from base subclasses of `BaseStatefulProcessor`. These two processor classes are primarily used for efficient abstract base class construction.
57+
3. For most base classes, the async methods simply call the synchronous methods where the processor logic is expected. Exceptions are `BaseProducer` (and its children) and `BaseAsyncTransformer` which are async-first and should be strongly considered for operations that are I/O bound.
58+
4. For async-first classes, the logic is implemented in the async methods and the sync methods are thin wrappers around the async methods. The wrapper uses a helper method called `run_coroutine_sync` to run the async method in a synchronous context, but this adds some noticeable processing overhead.
59+
5. If you need to call your processor outside ezmsg (which uses async), and you cannot easily add an async context* in your processing, then you might want to consider duplicating the processor logic in the sync methods. __Note__: Jupyter notebooks are async by default, so you can await async code in a notebook without any extra setup.
60+
6. `CompositeProcessor` and `CompositeProducer` are stateful, and structurally subclass the `StatefulProcessor` and `StatefulProducer` protocols, but they
61+
do not inherit from `BaseStatefulProcessor` and `BaseStatefulProducer`. They accomplish statefulness by inheriting from the mixin abstract base class `CompositeStateful`, which implements the state related methods: `get_state_type`, `state.setter`, `state.getter`, `_hash_message`, `_reset_state`, and `stateful_op` (as well as composite processor chain related methods). However, `BaseStatefulProcessor`, `BaseStatefulProducer` implement `stateful_op` method for a single processor in an incompatible way to what is required for composite chains of processors.
62+
63+
64+
### Generic TypeVars for ezmsg Units
65+
66+
| Idx | Class | Description |
67+
|-----|---------------------------|------------------------------------------------------------------------------------------------------------------|
68+
| 5 | `ProducerType` | bound to `BaseProducer` (hence, also `BaseStatefulProducer`, `CompositeProducer`) |
69+
| 6 | `ConsumerType` | bound to `BaseConsumer`, `BaseStatefulConsumer` |
70+
| 7 | `TransformerType` | bound to `BaseTransformer`, `BaseStatefulTransformer`, `CompositeProcessor` (hence, also `BaseAsyncTransformer`) |
71+
| 8 | `AdaptiveTransformerType` | bound to `BaseAdaptiveTransformer` |
72+
73+
74+
### Abstract implementations (Base Classes) for ezmsg Units using processors:
75+
76+
| Idx | Class | Parents | Expected TypeVars |
77+
|-----|-------------------------------|---------|---------------------------|
78+
| 1 | `BaseProcessorUnit` | - | - |
79+
| 2 | `BaseProducerUnit` | - | `ProducerType` |
80+
| 3 | `BaseConsumerUnit` | 1 | `ConsumerType` |
81+
| 4 | `BaseTransformerUnit` | 1 | `TransformerType` |
82+
| 5 | `BaseAdaptiveTransformerUnit` | 1 | `AdaptiveTransformerType` |
83+
84+
Note, it is strongly recommended to use `BaseConsumerUnit`, `BaseTransformerUnit`, or `BaseAdaptiveTransformerUnit` for implementing concrete subclasses rather than `BaseProcessorUnit`.
85+
86+
87+
## Implementing a custom standalone processor
88+
89+
1. Create a new settings dataclass: `class MySettings(ez.Settings):`
90+
2. Create a new state dataclass:
91+
```
92+
@processor_state
93+
class MyState:
94+
```
95+
3. Decide on your base processor class, considering the protocol, whether it should be async-first, and other factors.
96+
97+
```mermaid
98+
flowchart TD
99+
AMP{Multiple Processors?};
100+
AMP -->|no| ARI{Receives Input?};
101+
AMP -->|yes| ACB{Single Chain / Branching?}
102+
ARI -->|no| P(Producer);
103+
ARI -->|yes| APO{Produces Output?};
104+
ACB -->|branching| NBC[no base class];
105+
ACB -->|single chain| ACRI{Receives Input?};
106+
P --> PS{Stateful?};
107+
APO -->|no| C(Consumer);
108+
APO -->|yes| T(Transformer);
109+
ACRI -->|no| CompositeProducer;
110+
ACRI -->|yes| CompositeProcessor;
111+
PS -->|no| BaseProducer;
112+
PS -->|yes| BaseStatefulProducer;
113+
C --> CS{Stateful?};
114+
T --> TS{Stateful?};
115+
CS -->|no| BaseConsumer;
116+
CS -->|yes| BaseStatefulConsumer;
117+
TS -->|no| BaseTransformer;
118+
TS -->|yes| TSA{Adaptive?};
119+
TSA -->|no| TSAF{Async First?};
120+
TSA -->|yes| BaseAdaptiveTransformer;
121+
TSAF -->|no| BaseStatefulTransformer;
122+
TSAF -->|yes| BaseAsyncTransformer;
123+
```
124+
125+
4. Implement the child class.
126+
* The minimum implementation is `_process` for sync processors, `_aprocess` for async processors, and `_produce` for producers.
127+
* For any stateful processor, implement `_reset_state`.
128+
* For stateful processors that need to respond to a change in the incoming data, implement `_hash_message`.
129+
* For adaptive transformers, implement `partial_fit`.
130+
* For chains of processors (`CompositeProcessor`/ `CompositeProducer`), need to implement `_initialize_processors`.
131+
* See processors in `ezmsg.sigproc` for examples.
132+
5. Override non-abstract methods if you need special behaviour. For example:
133+
* `WindowTransformer` overrides `__init__` to do some sanity checks on the provided settings.
134+
* `TransposeTransformer` and `WindowTransformer` override `__call__` to provide a passthrough shortcut when the settings allow for it.
135+
* `ClockProducer` overrides `__call__` in order to provide a synchronous call bypassing the default async behaviour.
136+
137+
138+
## Implementing a custom ezmsg Unit
139+
140+
1. Create and test custom standalone processor as above.
141+
2. Decide which base unit to implement.
142+
* Use the "Generic TypeVars for ezmsg Units" table above to determine the expected TypeVar.
143+
* Fine the Expected TypeVar in the "ezmsg Units" table.
144+
3. Create the derived class.
145+
146+
Often, all that is required is the following (e.g., for a custom transformer):
147+
148+
```Python
149+
import ezmsg.core as ez
150+
from ezmsg.util.messages.axisarray import AxisArray
151+
from ezmsg.sigproc.base import BaseTransformer, BaseTransformerUnit
152+
153+
154+
class CustomTransformerSettings(ez.Settings):
155+
...
156+
157+
158+
class CustomTransformer(BaseTransformer[CustomTransformerSettings, AxisArray, AxisArray]):
159+
def _process(self, message: AxisArray) -> AxisArray:
160+
# Your processing code here...
161+
return message
162+
163+
164+
class CustomUnit(BaseTransformerUnit[
165+
CustomTransformerSettings, # SettingsType
166+
AxisArray, # MessageInType
167+
AxisArray, # MessageOutType
168+
CustomTransformer, # TransformerType
169+
]):
170+
SETTINGS = CustomTransformerSettings
171+
```
172+
173+
__Note__, the type of ProcessorUnit is based on the internal processor and not the input or output of the unit. Input streams are allowed in ProducerUnits and output streams in ConsumerUnits. For an example of such a use case, see `BaseCounterFirstProducerUnit` and its subclasses. `BaseCounterFirstProducerUnit` has an input stream that receives a flag signal from a clock that triggers a call to the internal producer.

pyproject.toml

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,29 @@ description = "Timeseries signal processing implementations in ezmsg"
44
authors = [
55
{ name = "Griffin Milsap", email = "[email protected]" },
66
{ name = "Preston Peranich", email = "[email protected]" },
7-
{ name = "Chadwick Boulay", email = "[email protected]" }
7+
{ name = "Chadwick Boulay", email = "[email protected]" },
88
]
99
license = "MIT"
1010
readme = "README.md"
1111
requires-python = ">=3.10.15"
1212
dynamic = ["version"]
1313
dependencies = [
14-
"ezmsg>=3.6.0",
15-
"numba>=0.61.0",
16-
"numpy>=1.26.0",
17-
"pywavelets>=1.6.0",
18-
"scipy>=1.13.1",
19-
"sparse>=0.15.4",
14+
"array-api-compat>=1.11.1",
15+
"ezmsg>=3.6.0",
16+
"numba>=0.61.0",
17+
"numpy>=1.26.0",
18+
"pywavelets>=1.6.0",
19+
"scipy>=1.13.1",
20+
"sparse>=0.15.4",
2021
]
2122

2223
[project.optional-dependencies]
2324
test = [
24-
"flake8>=7.1.1",
25-
"frozendict>=2.4.4",
26-
"pytest-asyncio>=0.24.0",
27-
"pytest-cov>=5.0.0",
28-
"pytest>=8.3.3",
25+
"flake8>=7.1.1",
26+
"frozendict>=2.4.4",
27+
"pytest-asyncio>=0.24.0",
28+
"pytest-cov>=5.0.0",
29+
"pytest>=8.3.3",
2930
]
3031

3132
[build-system]
@@ -42,10 +43,7 @@ version-file = "src/ezmsg/sigproc/__version__.py"
4243
packages = ["src/ezmsg"]
4344

4445
[tool.uv]
45-
dev-dependencies = [
46-
"pre-commit>=3.8.0",
47-
"ruff>=0.6.7",
48-
]
46+
dev-dependencies = ["pre-commit>=3.8.0", "ruff>=0.6.7"]
4947

5048
[tool.pytest.ini_options]
5149
norecursedirs = "tests/helpers"

src/ezmsg/sigproc/activation.py

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
import typing
2-
3-
import numpy as np
41
import scipy.special
52
import ezmsg.core as ez
63
from ezmsg.util.messages.axisarray import AxisArray
74
from ezmsg.util.messages.util import replace
8-
from ezmsg.util.generator import consumer
95

106
from .spectral import OptionsEnum
11-
from .base import GenAxisArray
7+
from .base import BaseTransformer, BaseTransformerUnit
128

139

1410
class ActivationFunction(OptionsEnum):
@@ -39,10 +35,41 @@ class ActivationFunction(OptionsEnum):
3935
}
4036

4137

42-
@consumer
38+
class ActivationSettings(ez.Settings):
39+
function: str | ActivationFunction = ActivationFunction.NONE
40+
"""An enum value from ActivationFunction or a string representing the activation function.
41+
Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, "sigmoid", "expit", "logit", "log_expit".
42+
SIGMOID and EXPIT are equivalent. See :obj:`scipy.special.expit` for more details."""
43+
44+
45+
class ActivationTransformer(BaseTransformer[ActivationSettings, AxisArray, AxisArray]):
46+
def _process(self, message: AxisArray) -> AxisArray:
47+
if type(self.settings.function) is ActivationFunction:
48+
func = ACTIVATIONS[self.settings.function]
49+
else:
50+
# str type handling
51+
function = self.settings.function.lower()
52+
if function not in ActivationFunction.options():
53+
raise ValueError(
54+
f"Unrecognized activation function {function}. Must be one of {ACTIVATIONS.keys()}"
55+
)
56+
function = list(ACTIVATIONS.keys())[
57+
ActivationFunction.options().index(function)
58+
]
59+
func = ACTIVATIONS[function]
60+
61+
return replace(message, data=func(message.data))
62+
63+
64+
class Activation(
65+
BaseTransformerUnit[ActivationSettings, AxisArray, AxisArray, ActivationTransformer]
66+
):
67+
SETTINGS = ActivationSettings
68+
69+
4370
def activation(
4471
function: str | ActivationFunction,
45-
) -> typing.Generator[AxisArray, AxisArray, None]:
72+
) -> ActivationTransformer:
4673
"""
4774
Transform the data with a simple activation function.
4875
@@ -51,37 +78,7 @@ def activation(
5178
Possible values are: SIGMOID, EXPIT, LOGIT, LOGEXPIT, "sigmoid", "expit", "logit", "log_expit".
5279
SIGMOID and EXPIT are equivalent. See :obj:`scipy.special.expit` for more details.
5380
54-
Returns: A primed generator that, when passed an input message via `.send(msg)`, yields an AxisArray
55-
with the data payload containing a transformed version of the input data.
81+
Returns: :obj:`ActivationTransformer`
5682
5783
"""
58-
if type(function) is ActivationFunction:
59-
func = ACTIVATIONS[function]
60-
else:
61-
# str type. There's probably an easier way to support either enum or str argument. Oh well this works.
62-
function: str = function.lower()
63-
if function not in ActivationFunction.options():
64-
raise ValueError(
65-
f"Unrecognized activation function {function}. Must be one of {ACTIVATIONS.keys()}"
66-
)
67-
function = list(ACTIVATIONS.keys())[
68-
ActivationFunction.options().index(function)
69-
]
70-
func = ACTIVATIONS[function]
71-
72-
msg_out = AxisArray(np.array([]), dims=[""])
73-
74-
while True:
75-
msg_in: AxisArray = yield msg_out
76-
msg_out = replace(msg_in, data=func(msg_in.data))
77-
78-
79-
class ActivationSettings(ez.Settings):
80-
function: str = ActivationFunction.NONE
81-
82-
83-
class Activation(GenAxisArray):
84-
SETTINGS = ActivationSettings
85-
86-
def construct_generator(self):
87-
self.STATE.gen = activation(function=self.SETTINGS.function)
84+
return ActivationTransformer(ActivationSettings(function=function))

0 commit comments

Comments
 (0)