Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 140 additions & 87 deletions node.js/messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,47 @@ status: released
<!--- % include links-for-node.md %} -->
<!--- % include _chapters toc="2,3" %} -->

## Overview

Messaging enables decoupled communication between services using events.
CAP distinguishes between the logical and technical messaging layers, separating business concerns from technical infrastructure.

The **logical layer** consists of three primary components:

**Modeled Events**: Events are defined in CDS models with typed schemas, providing compile-time validation and IDE support. These events represent business occurrences like `'orderProcessed'`, or `'stockUpdated'`.

**Event Topics**: Topics organize events into logical channels and are responsible for event routing. Topics can be explicitly defined as annotation or derived from service and event names.

**CAP Services**: Services act as event producers or consumers, using simple APIs like `srv.emit('reviewed', data)` or `srv.on('orderProcessed', handler)`. Services communicate using logical event names without needing to know the underlying infrastructure details.

The **technical layer** handles the actual message transport and delivery:

**CAP Messaging Service**: Acts as the translation layer between logical events and technical infrastructure. It manages topic resolution, message serialization, and routing logic.
For topic resolution the logical events are delegated to the messaging service, the corresponding event name on the technical service is either the fully qualified event name or the value of the @topic annotation if given.

**Message Brokers**: Form the core of the technical infrastructure, handling message persistence, delivery guarantees, and cross-service communication. Examples include SAP Event Mesh, Apache Kafka, or Redis Streams.

The message flow follows a clear path through both layers:

**Outbound Flow (Publisher)**: A CAP service calls `srv.emit('reviewed', data)` → CAP Messaging Service resolves the event name to a fully qualified topic (e.g., `OrderSrv.reviewed`) → Message is serialized and sent to the Event Broker → Broker stores and distributes the message to all subscribers.

**Inbound Flow (Subscriber)**: Event Broker delivers message from subscribed topic → CAP Messaging Service receives the message → Service name and event name are resolved from the topic → Message is routed to the appropriate CAP service handler via `srv.on('reviewed', handler)`. Registering a modeled `srv.on(...)` event handler causes the broker to listen to those events, e.g. creates a subscription for Event Mesh.

**Alternatively** custom handlers can bypass the service layer and work directly with the messaging service.

### Summary Table


| CDS Event Declaration | Emitting via `srv.emit` | Emitting via `messaging.emit` | Broker Topic | Receiving via `srv.on` | Receiving via `messaging.on` |
|------------------------------|-------------------------|-------------------------------|----------------------|------------------------|------------------------------|
| No `@topic` | `'reviewed'` | `'OrderSrv.reviewed'` | `OrderSrv.reviewed` | `'reviewed'` | `'OrderSrv.reviewed'` |
| With `@topic: 'foo.bar'` | `'reviewed'` | `'foo.bar'` | `foo.bar` | `'reviewed'` | `'foo.bar'` |


## cds.**MessagingService** <i> class </i>

Class `cds.MessagingService` and subclasses thereof are technical services representing asynchronous messaging channels.
They can be used directly/low-level, or behind the scenes on higher-level service-to-service eventing.
Class `cds.MessagingService` and subclasses thereof are technical services representing asynchronous messaging channels.
They can be used directly/low-level, or behind the scenes on higher-level service-to-service eventing.

### class cds.**MessagingService** <i> extends cds.Service </i>

Expand Down Expand Up @@ -55,7 +91,7 @@ In _srv/external/external.cds_:
service ExternalService {
event ExternalEvent {
ID: UUID;
name: String;
rating: Decimal;
}
}
```
Expand All @@ -66,12 +102,14 @@ In _srv/own.cds_:
service OwnService {
event OwnEvent {
ID: UUID;
name: String;
rating: Decimal;
}
}
```

In _srv/own.js_:
The implementation can use CAP application services or bypass them and work directly with the messaging service.

In _srv/own.js_ (CAP Application Services):

```js
module.exports = async srv => {
Expand All @@ -82,7 +120,18 @@ module.exports = async srv => {
}
```

#### Custom Topics with Declared Events
In _srv/own.js_ (CAP Messaging Services):
```js
module.exports = async srv => {
const externalService = await cds.connect.to('messaging')
messaging.on('ExternalService.ExternalEvent', async msg => {
await srv.emit('OwnService.OwnEvent', msg.data)
})
}
```


### Custom Topics with Declared Events

You can specify topics to modeled events using the `@topic` annotation.
::: tip
Expand All @@ -94,87 +143,7 @@ Example:
```cds
service OwnService {
@topic: 'my.custom.topic'
event OwnEvent { ID: UUID; name: String; }
}
```


## Emitting Events

To send a message to the message broker, you can use the `emit` method on a transaction for the connected service.

Example:

```js
const messaging = await cds.connect.to('messaging')

this.after(['CREATE', 'UPDATE', 'DELETE'], 'Reviews', async (_, req) => {
const { subject } = req.data
const { rating } = await cds.run(
SELECT.one(['round(avg(rating),2) as rating'])
.from(Reviews)
.where({ subject }))

// send to a topic
await messaging.emit('cap/msg/system/review/reviewed',
{ subject, rating })

// alternative if you want to send custom headers
await messaging.emit({ event: 'cap/msg/system/review/reviewed',
data: { subject, rating },
headers: { 'X-Correlation-ID': req.headers['X-Correlation-ID'] }})
})
```
::: tip
The messages are sent once the transaction is successful.
Per default, a persistent queue is used. See [Messaging - Queue](./queue) for more information.
:::

## Receiving Events

To listen to messages from a message broker, you can use the `on` method on the connected service.
This also creates the necessary topic subscriptions.

Example:

```js
const messaging = await cds.connect.to('messaging')

// listen to a topic
messaging.on('cap/msg/system/review/reviewed', msg => {
const { subject, rating } = msg.data
return cds.run(UPDATE(Books, subject).with({ rating }))
})
```

Once all handlers are executed successfully, the message is acknowledged.
If one handler throws an error, the message broker will be informed that the message couldn't be consumed properly and might send the message again. To avoid endless cycles, consider catching all errors.

If you want to receive all messages without creating topic subscriptions, you can register on `'*'`. This is useful when consuming messages from a dead letter queue.

```js
messaging.on('*', async msg => { /*...*/ })
```

::: tip
In general, messages do not contain user information but operate with a technical user. As a consequence, the user of the message processing context (`cds.context.user`) is set to [`cds.User.privileged`](authentication#privileged-user) and, hence, any necessary authorization checks must be done in custom handlers.
:::

### Inbox <Beta />

You can store received messages in an inbox before they're processed. Under the hood, it uses the [task queue](./queue) for reliable asynchronous processing.
Enable it by setting the `inboxed` option to `true`, for example:

```js
{
cds: {
requires: {
messaging: {
kind: 'enterprise-messaging',
inboxed: true
}
}
}
event OwnEvent { ID: UUID; rating: Decimal; }
}
```

Expand Down Expand Up @@ -263,6 +232,90 @@ Examples:
| `my/own.namespace/-/ce/` | `/my/own.namespace` |


## Emitting Events

To send a message to the message broker, you can use the `emit` method on a transaction for the connected service.

Example:

```js
const messaging = await cds.connect.to('messaging')

this.after(['CREATE', 'UPDATE', 'DELETE'], 'Reviews', async (_, req) => {
const { ID } = req.data
const { rating } = await cds.run(
SELECT.one(['round(avg(rating),2) as rating'])
.from(Reviews)
.where({ ID }))

// send to a topic
await messaging.emit('my/custom/topic',
{ ID, rating })

// alternative if you want to send custom headers
await messaging.emit('my/custom/topic',
{ ID, rating },
{ 'X-Correlation-ID': req.headers['X-Correlation-ID'] })

// or use the object parameter
await messaging.emit({ event: 'my/custom/topic',
data: { ID, rating },
headers: { 'X-Correlation-ID': req.headers['X-Correlation-ID'] }})
})
```
::: tip
The messages are sent once the transaction is successful.
Per default, a persistent queue is used. See [Messaging - Queue](./queue) for more information.
:::

## Receiving Events

To listen to messages from a message broker, you can use the `on` method on the connected service.
This also creates the necessary topic subscriptions.

Example:

```js
const messaging = await cds.connect.to('messaging')

// listen to a topic
messaging.on('my/custom/topic', msg => {
const { ID, rating } = msg.data
return cds.run(UPDATE(Books, ID).with({ rating }))
})
```

Once all handlers are executed successfully, the message is acknowledged.
If one handler throws an error, the message broker will be informed that the message couldn't be consumed properly and might send the message again. To avoid endless cycles, consider catching all errors.

If you want to receive all messages without creating topic subscriptions, you can register on `'*'`. This is useful when consuming messages from a dead letter queue.

```js
messaging.on('*', async msg => { /*...*/ })
```

::: tip
In general, messages do not contain user information but operate with a technical user. As a consequence, the user of the message processing context (`cds.context.user`) is set to [`cds.User.privileged`](/node.js/authentication#privileged-user) and, hence, any necessary authorization checks must be done in custom handlers.
:::

### Inbox <Beta />

You can store received messages in an inbox before they're processed. Under the hood, it uses the [task queue](./queue) for reliable asynchronous processing.
Enable it by setting the `inboxed` option to `true`, for example:

```js
{
cds: {
requires: {
messaging: {
kind: 'enterprise-messaging',
inboxed: true
}
}
}
}
```

## Message Brokers

To safely send and receive messages between applications, you need a message broker in-between where you can create queues that listen to topics. All relevant incoming messages are first stored in those queues before they're consumed. This way messages aren't lost when the consuming application isn't available.
Expand Down
Loading