1- """Message flow implementation for type-safe routing.
2-
3- Provides an explicit routing API for building message-driven workflows where
4- messages are routed based on their types. Uses strategic type erasure at
5- routing boundaries to handle union types while maintaining type safety at
6- flow input/output boundaries.
7- """
1+ """Message flow implementation for type-safe routing."""
82
93import types
104from dataclasses import dataclass
@@ -159,51 +153,21 @@ def _validate_terminal_type_not_routed(
159153
160154@final
161155class _Flow [TStartIn : Message , TEnd : Message ](Node [TStartIn , TEnd ]):
162- """Executable AI workflow that routes messages through nodes based on their types.
163-
164- Orchestrates message flow through a graph of AI operations, routing based on
165- message types. Each node performs a specific AI task (LLM calls, vector search,
166- validation) and produces typed outputs that determine the next step.
167-
168- Why _Flow is internal:
169- - Complex type erasure patterns needed for union type routing
170- - Internal optimization strategies for message dispatch
171- - Separation of builder API from execution mechanics
172- - Maintains simpler public API surface
173-
174- The flow provides:
175- - Type-safe message routing at runtime
176- - Automatic causality chain preservation
177- - Observer hooks for monitoring AI decisions
178- - Single termination enforcement for clear workflow completion
179-
180- """
156+ """Executable workflow that routes messages through nodes based on their types."""
181157
182- starting_node : NodeInterface [Message , Message ] = Field (
183- description = "First node to process incoming messages, typically parsing or validation"
184- )
158+ starting_node : NodeInterface [Message , Message ] = Field (description = "First node to process incoming messages" )
185159 routes : tuple [RouteEntry , ...] = Field (
186160 description = "Routing table mapping (source_node, message_type) pairs to destination nodes"
187161 )
188- terminal_type : type [Message ] = Field (
189- description = "Message type that immediately completes the flow when produced by any node"
190- )
191- callbacks : CallbackHandler | None = Field (
192- default = None , description = "Optional handler for observer callbacks to monitor flow execution events"
193- )
162+ terminal_type : type [Message ] = Field (description = "Message type that completes the flow when produced" )
163+ callbacks : CallbackHandler | None = Field (default = None , description = "Optional handler for observer callbacks" )
194164
195165 async def _safe_callback (self , method : str , * args : str | Message | Exception | None ) -> None :
196166 """Execute callback safely without affecting flow.
197167
198- REQ-016: Zero overhead when no callbacks
199- REQ-017: Async execution (non-blocking)
200-
201- CallbackHandler internally handles all errors (REQ-005, REQ-006) so we don't
202- need additional error handling here.
203-
204168 Args:
205169 method: Name of callback method to invoke
206- *args: Arguments to pass to the callback method (flow_name, node_name, message, error)
170+ *args: Arguments to pass to the callback method
207171
208172 """
209173 if not self .callbacks : # REQ-016: Zero overhead when no callbacks
@@ -311,20 +275,11 @@ async def process(self, message: TStartIn) -> TEnd:
311275@final
312276@dataclass (frozen = True )
313277class _FlowBuilder [TStartIn : Message , TStartOut : Message ](FlowBuilder [TStartIn , TStartOut ]):
314- """Module private builder for composing message routes with explicit source nodes.
315-
316- Uses the pattern from the original flow API where each route explicitly
317- specifies: from_node -> outcome -> to_node. This enables sequential thinking
318- about workflow construction.
278+ """Internal builder for composing message routes with explicit source nodes.
319279
320280 Type parameters:
321281 TStartIn: The input message type the flow accepts
322- TStartOut: The output type of the start node (remains constant throughout builder chain)
323-
324- The builder maintains stable type parameters throughout the chain, unlike tracking
325- current message types, because type erasure makes intermediate types meaningless.
326-
327- Call end_flow() to specify where the flow terminates and get the completed flow.
282+ TStartOut: The output type of the start node
328283 """
329284
330285 name : str
@@ -399,9 +354,6 @@ def _validate_and_create_route[TFromIn: Message, TFromOut: Message, TToIn: Messa
399354 def observe (self , * observers : Observer ) -> "_FlowBuilder[TStartIn, TStartOut]" :
400355 """Attach observers to the flow.
401356
402- REQ-009: MessageFlow accepts optional observers
403- REQ-016: Zero overhead when no observers
404-
405357 Args:
406358 *observers: Observer instances to monitor flow execution
407359
@@ -428,17 +380,6 @@ def route[TFromIn: Message, TFromOut: Message, TToIn: Message, TToOut: Message](
428380 ) -> "_FlowBuilder[TStartIn, TStartOut]" :
429381 """Route specific message type from source node to destination.
430382
431- Explicitly specifies that when `from_node` produces a message of type `outcome`,
432- route it to `to_node`. This matches the original flow API pattern for clarity.
433-
434- Type Erasure Rationale:
435- Python's type system cannot express "route only this specific type from
436- a union to the next node." For example, if a node outputs
437- UserMessage | SystemMessage, we cannot type-check at compile time that
438- only UserMessage goes to a specific handler. We use type erasure
439- (outcome: type[Message]) to allow this flexibility while maintaining
440- runtime validation.
441-
442383 Args:
443384 from_node: Source node that may emit the outcome message type
444385 outcome: Specific message type that triggers this route
@@ -472,15 +413,11 @@ def end_flow[TEnd: Message](
472413 ) -> Node [TStartIn , TEnd ]:
473414 """Declare the message type that completes this flow.
474415
475- When any node in the flow produces an instance of the terminal type,
476- the flow immediately terminates and returns that message. The terminal
477- type cannot be routed between nodes - it always ends the flow.
478-
479416 Args:
480417 terminal_type: The message type that completes the flow
481418
482419 Returns:
483- A Node that represents the complete flow with single terminal type
420+ A Node that represents the complete flow
484421
485422 """
486423 # Validate that terminal type is not already routed
@@ -501,11 +438,8 @@ def create_flow[TStartIn: Message, TStartOut: Message](
501438) -> FlowBuilder [TStartIn , TStartOut ]:
502439 """Create a flow with type-safe routing.
503440
504- This is the entry point for building message-driven workflows. The flow
505- starts at the given node and routes messages based on their types.
506-
507441 Args:
508- name: The name of the flow for identification and debugging
442+ name: The name of the flow
509443 starting_node: The starting node that processes TStartIn
510444
511445 Returns:
0 commit comments