Skip to content

Conversation

@Cyb3rWard0g
Copy link
Collaborator

This PR Depends on #232

Overview

This PR introduces a new @message_router decorator that enables native Dapr workflow orchestration triggered directly from Pub/Sub messages without relying on legacy Dapr Agents abstractions. Developers now have full control over the Dapr Workflow runtime and Pub/Sub client, allowing workflows and LLM/Agent activities to be registered, composed, and executed using the official Dapr APIs.

This update modernizes the message routing layer to align with Dapr’s workflow primitives, improving transparency, flexibility, and maintainability.

Key Changes

  • Added decorators/routers.py
    • Implements a new @message_router decorator for schema-aware message routing
    • Fully compatible with native WorkflowRuntime and DaprClient lifecycle
    • Supports Union types, forward references, and robust model validation
    • Designed for explicit developer control rather than hidden orchestration
  • Added supporting utilities
    • utils/routers.py: message extraction, validation, and CloudEvent parsing helpers
    • utils/registration.py: register_message_handlers() for runtime subscription via the active DaprClient
  • Deprecated legacy message router
    • Added deprecation warnings to decorators/messaging.py
    • Directs users to migrate to the new Dapr-native decorator
  • Added new Message Router Workflow Quickstart
    • Demonstrates event-driven orchestration using Pub/Sub → Workflow integration
    • Includes LLM-powered create_outline and write_post workflow activities
    • Adds message_client.py publisher and updated README with full configuration
  • Updated quickstart index
    • Added Message Router Workflow alongside LLM- and Agent-based patterns

Check the new quickstart quickstarts/04-agent-based-workflows/README.md

Copy link
Contributor

@sicoyle sicoyle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a great addition to simplify things and support a more python-native decorator approach to handle pubsub messages and trigger workflows!

Comment on lines +11 to +17
_MESSAGE_ROUTER_DEPRECATION_MESSAGE = (
"@message_router (legacy version from dapr_agents.workflow.decorators.messaging) "
"is deprecated and will be removed in a future release. "
"Please migrate to the updated decorator in "
"`dapr_agents.workflow.decorators.routers`, which supports "
"Union types, forward references, and explicit Dapr workflow integration."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i vote we just remove since we are not v1.0 and beyond, we can make breaking changes and remove the message_router as long as we let folks know in the next release announcement :)

└─ message_client.py # publishes a test message to the topic
```

## Files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it might be nice moving forward for us to not put the code in the readmes and just reference/link the files/components. Thoughts?

Comment on lines +20 to +44
func: Optional[Callable[..., Any]] = None,
*,
pubsub: Optional[str] = None,
topic: Optional[str] = None,
dead_letter_topic: Optional[str] = None,
broadcast: bool = False,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Decorate a message handler with routing metadata.
The handler must accept a parameter named `message`. Its type hint defines the
expected payload model(s), e.g.:
@message_router(pubsub="pubsub", topic="orders")
def on_order(message: OrderCreated): ...
@message_router(pubsub="pubsub", topic="events")
def on_event(message: Union[Foo, Bar]): ...
Args:
func: (optional) bare-decorator form support.
pubsub: Name of the Dapr pub/sub component (required when used with args).
topic: Topic name to subscribe to (required when used with args).
dead_letter_topic: Optional dead-letter topic (defaults to f"{topic}_DEAD").
broadcast: Optional flag you can use downstream for fan-out semantics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of these say required in the docs string but the code shows optional. Are func, pubsub, topic the ones that we require?

Copy link
Collaborator

@bibryam bibryam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. IMO, In addition to declarative, programatic, and streaming subscriptions, the Message Router is another (4th) type of subscribing to a topic. It is probably the most pythonic version among all.

  2. Is there anything specific to DaprAgents or can message router be even at python sdk?

  3. I love this feature, just not sure where it fits best (DA or Python SDK) and what to call it? There is already the concept of subscription, and @app.route in python sdk

@Cyb3rWard0g
Copy link
Collaborator Author

  1. IMO, In addition to declarative, programatic, and streaming subscriptions, the Message Router is another (4th) type of subscribing to a topic. It is probably the most pythonic version among all.
  2. Is there anything specific to DaprAgents or can message router be even at python sdk?
  3. I love this feature, just not sure where it fits best (DA or Python SDK) and what to call it? There is already the concept of subscription, and @app.route in python sdk
  1. I believe it can be part of the python SDK. I just like to simplify how we could register those subscriptions, and since we do not have a class in the Python SDK that makes it easy to define streaming subscriptions (https://docs.dapr.io/developing-applications/building-blocks/pubsub/subscription-methods/#streaming-subscriptions) without relying on HTTP endpoints, I created those utils.
  2. How long would it take to add something like that to the Python SDK. Once again, streaming pubsub subscriptions without HTTP endpoints. app.route relies on an HTTP endpoint right?

@bibryam
Copy link
Collaborator

bibryam commented Oct 23, 2025

  1. I believe it can be part of the python SDK. I just like to simplify how we could register those subscriptions, and since we do not have a class in the Python SDK that makes it easy to define streaming subscriptions (https://docs.dapr.io/developing-applications/building-blocks/pubsub/subscription-methods/#streaming-subscriptions) without relying on HTTP endpoints, I created those utils.

Isn't this the way to create a streaming consumer?

        subscription = client.subscribe(
            pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
        )
  1. How long would it take to add something like that to the Python SDK. Once again, streaming pubsub subscriptions without HTTP endpoints. app.route relies on an HTTP endpoint right?

There is no extra time requirement, but we have to make sure:

  • it is documented in th SDK docs how it works
  • when to use this instead of other ways
  • has example (that is in the sdk repo)
  • aligned with existing naming convention
    Once these are in place (btw we would need these even if it is in the dapr agents repo), the review and merge could be pretty quick.

@Cyb3rWard0g
Copy link
Collaborator Author

  1. I believe it can be part of the python SDK. I just like to simplify how we could register those subscriptions, and since we do not have a class in the Python SDK that makes it easy to define streaming subscriptions (https://docs.dapr.io/developing-applications/building-blocks/pubsub/subscription-methods/#streaming-subscriptions) without relying on HTTP endpoints, I created those utils.

Isn't this the way to create a streaming consumer?

        subscription = client.subscribe(
            pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
        )
  1. How long would it take to add something like that to the Python SDK. Once again, streaming pubsub subscriptions without HTTP endpoints. app.route relies on an HTTP endpoint right?

There is no extra time requirement, but we have to make sure:

  • it is documented in th SDK docs how it works
  • when to use this instead of other ways
  • has example (that is in the sdk repo)
  • aligned with existing naming convention
    Once these are in place (btw we would need these even if it is in the dapr agents repo), the review and merge could be pretty quick.

that is not enough

subscription = client.subscribe(
    pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
)

We need to map it to a handler which can be done with

close_fn = client.subscribe_with_handler(
      pubsub_name='pubsub', topic='orders', handler_fn=process_message,
      dead_letter_topic='orders_dead'
  )

But once again, that is not enough, the "handler_fun" in our project would point to a Dapr Workflow script which would need to be run using a dapr workflow client. Therefore, we need to create that abstraction so that we can use it on our DurableAgent or Orchestrator class which rely on a message_router decorator. For users not using DurableAgent or Orchestrator classes but writing their own Dapr Workflows, they can still use message utils that I have added in this PR to make it easy to subscribe a handler that is a dapr workflow function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants