Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc
|Index|Tags|Description|
|-----|----|-----------|
|[ADR-50](adr/ADR-50.md)|jetstream, server, client, 2.12, 2.14|JetStream Batch Publishing|
|[ADR-57](adr/ADR-57.md)|jetstream, client, server, 2.14|JetStream durable stream sourcing/mirroring|

## Client

Expand Down Expand Up @@ -65,6 +66,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc
|[ADR-48](adr/ADR-48.md)|jetstream, client, kv, refinement, 2.11|TTL Support for Key-Value Buckets (updating [ADR-8](adr/ADR-8.md))|
|[ADR-50](adr/ADR-50.md)|jetstream, server, client, 2.12, 2.14|JetStream Batch Publishing|
|[ADR-54](adr/ADR-54.md)|jetstream, client, spec, orbit, kv|KV Codecs (updating [ADR-8](adr/ADR-8.md))|
|[ADR-57](adr/ADR-57.md)|jetstream, client, server, 2.14|JetStream durable stream sourcing/mirroring|

## Jetstream

Expand Down Expand Up @@ -97,6 +99,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc
|[ADR-50](adr/ADR-50.md)|jetstream, server, client, 2.12, 2.14|JetStream Batch Publishing|
|[ADR-51](adr/ADR-51.md)|jetstream, 2.12|JetStream Message Scheduler|
|[ADR-54](adr/ADR-54.md)|jetstream, client, spec, orbit, kv|KV Codecs (updating [ADR-8](adr/ADR-8.md))|
|[ADR-57](adr/ADR-57.md)|jetstream, client, server, 2.14|JetStream durable stream sourcing/mirroring|

## Kv

Expand Down Expand Up @@ -173,6 +176,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc
|[ADR-50](adr/ADR-50.md)|jetstream, server, client, 2.12, 2.14|JetStream Batch Publishing|
|[ADR-55](adr/ADR-55.md)|server, 2.12|Trusted Protocol Aware Proxies|
|[ADR-56](adr/ADR-56.md)|server, 2.12|JetStream Consistency Models|
|[ADR-57](adr/ADR-57.md)|jetstream, client, server, 2.14|JetStream durable stream sourcing/mirroring|

## Spec

Expand Down
128 changes: 128 additions & 0 deletions adr/ADR-57.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# JetStream durable stream sourcing/mirroring

| Metadata | Value |
|----------|---------------------------------|
| Date | 2025-11-06 |
| Author | @MauriceVanVeen |
| Status | Proposed |
| Tags | jetstream, client, server, 2.14 |

| Revision | Date | Author | Info |
|----------|------------|-----------------|----------------|
| 1 | 2025-11-06 | @MauriceVanVeen | Initial design |

## Context and Problem Statement

JetStream streams can be mirrored or sourced from another stream. Usually this is done on separate servers, for example,
loosely connected as a leaf node. This is achieved by the server creating an ephemeral ordered push consumer using
`AckNone`. This is really reliable if the stream that's being mirrored/sourcing is a Limits stream. If the server
detects a gap, it recreates the consumer at the sequence it missed. And since the stream is a Limits stream, it will be
able to recover from the gap since the messages will still be in the stream.

However, if the stream is a WorkQueue or Interest stream, then the use of an ephemeral `AckNone` consumer is problematic
for two reasons:

- For both WorkQueue and Interest streams any messages that are sent are immediately acknowledged and removed. If this
message is not received on the other end, this message will be lost.
- Additionally, for an Interest stream since the consumer is ephemeral, interest will be lost while there's no active
connection between the two servers. This also results in messages being lost.

Reliable stream mirroring/sourcing is required for use cases where WorkQueue or Interest streams are used or desired.

## Design

### Pre-created durable consumer

Instead of the server creating and managing ephemeral consumers for stream sourcing, the user creates a durable consumer
that the server will use.

The benefits of using a durable consumer are that these will be visible to the user and can be monitored. This eases the
control (and security implications) of the consumer configuration as this consumer will be manually created on the
server containing the data that will be sourced. Additionally, the consumer can be paused and resumed, allowing the
sourcing to temporarily stop if desired.

Some additional tooling will be required to create the durable consumer with the proper configuration. But through the
use of new fields/values on the consumer configuration, the server will be able to help enforce the correct
configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too vague. Which fields? What will be enforced? We need to at least reference where the details are specified,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to:

... But through the use of a new AckPolicy=AckFlowControl field, the server will be able to help enforce the correct configuration.

The enforcing it does is then mentioned in the "Consumer configuration" section, specifically:

  • Requires FlowControl and Heartbeat to be set.


### Performance / Consumer configuration

The durable consumer used for stream sourcing/mirroring will need to be just as performant as the current ephemeral
variant. The current ephemeral consumer configuration uses `AckNone` which is problematic for WorkQueue and Interest
streams. A different `AckPolicy` will need to be used to ensure that messages are not lost.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new AckPolicy=AckFlowControl will be introduce.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just introductory text explaining AckNone can't be used, the mention of the new field is done below:

  • Uses AckPolicy=AckFlowControl instead of AckNone


The consumer configuration will closely resemble the ephemeral push consumer variant:

- The consumer will still act as an "ordered push consumer" but it will be durable.
- Requires `FlowControl` and `Heartbeat` to be set.
- Uses `AckPolicy=AckFlowControl` instead of `AckNone`.
- `AckPolicy=AckFlowControl` will function like `AckAll` although the server will not use the ack reply on the received
messages.
- The server responds to the flow control messages, including the stream sequence (`Nats-Last-Stream`) and delivery
sequence (`Nats-Last-Consumer`) as headers to specify which messages have been successfully stored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The receiving stream responds with a flow control message, which includes the stream sequence (Nats-Last-Stream) and delivery sequence (Nats-Last-Consumer) as headers to signalling which messages have been successfully stored

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly updated to:

The receiving server responds to the flow control messages, which includes the stream sequence (Nats-Last-Stream) and delivery sequence (Nats-Last-Consumer) as headers to signal which messages have been successfully stored.

Specifically the server that's receiving the flow control message responds to it. It doesn't send flow control messages itself.

- The server receiving the flow control response will ack messages based on these stream/delivery sequences.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server receiving the flow control response will internally acknowledge all prior messages in the consumer according to its retention policy. For work queue and interest based streams this may result in messages deletion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have kept the addition of For WorkQueue and Interest streams this may result in messages deletion. But am inclined to not change the former. A response to a flow control message will not mean ALL prior messages are to be acknowledged. Only those messages that are below the stream/delivery sequences are acked.

- Acknowledgements happen based on flow control during active message flow, but if the stream is idle the
`Heartbeat` will also trigger a flow control message to move the acknowledgement floor up.

The stream configuration will be extended to include the consumer name as well as the delivery subject used for stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream configuration will be extended to include the consumer name.

(The delivery subject is not new)

Copy link
Member Author

@MauriceVanVeen MauriceVanVeen Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delivery subject is new, see also the ConsumerDeliverSubject below in the StreamSource. This means the consumer itself is a push consumer with a delivery subject. The consumer name needs to match that in the source, and its delivery subject must also match what's specified in the source (may be different over JS domains, or with subject mapping).

sourcing/mirroring.

```go
type StreamSource struct {
Name string `json:"name"`
ConsumerName string `json:"consumer_name,omitempty"`
ConsumerDeliverSubject string `json:"consumer_deliver_subject,omitempty"`
}
```

### Consumer delivery state reset API

The ordered consumer implementation relies on the consumer's delivery sequence to start at 1 and increment by 1 for
every delivered message. Gaps are detected by ensuring this delivery sequence increments monotonically. If a gap is
detected, the consumer delivery state will need to be reset such that the delivery sequence starts at 1 again.

This is a non-issue for the ephemeral ordered push consumer variant as it creates a new consumer starting at the
expected sequence if a gap is detected. However, the durable consumer must not be deleted and recreated, since that will
result in losing interest on an Interest stream and subsequently losing messages.

Therefore, the server will provide an API to reset the consumer delivery state. When the server detects a gap, it will
call this API to reset the consumer delivery state. This ensures the delivery sequence restarts at 1 and any gaps are
reliably handled.

This reset API, `$JS.API.CONSUMER.RESET.<STREAM>.<CONSUMER>`, will have the following functionality:

- The consumer will be reset, resembling the delivery state of creating a new consumer with `opt_start_seq` set to the
specified sequence.
- The pending and redelivered messages will always be reset.
- The delivered stream/consumer sequences will always be reset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delivered consumer sequence will always be reset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated this to delivered stream and consumer sequence. These are two independent sequences that need to be reset.

- The ack floor consumer sequence will always be reset.
- The ack floor stream sequence will be updated depending on the payload. The next message to be delivered is above this
new ack floor.
- An empty payload will reset the consumer's state, but the ack floor stream sequence will remain the same. (This will
be used for the durable sourcing consumer after detecting a gap.)
- A payload of `{"seq":<seq>}` (with `seq>0`) will update the ack floor stream sequence to be one below the provided
sequence. The next message to be delivered has a sequence of `msg.seq >= reset.seq`. A zero-sequence is invalid.
- Resetting a consumer to a specific sequence will only be allowed on specific consumer configurations.
- Only allowed on `DeliverPolicy=all,by_start_sequence,by_start_time`.
- If `DeliverPolicy=all`, the reset will always be successful and allow to move forward or backward arbitrarily.
- If `DeliverPolicy=by_start_sequence,by_start_time`, the reset will only be successful if
`reset.seq >= opt_start_seq`, or if `loadNextMsg(reset.seq).start_time >= opt_start_time`. This is a safety
measure to prevent the consumer from being reset to a sequence before what was allowed by the consumer
configuration.
- The response to the reset API call will follow standard JS API conventions. Specifically, returning a "consumer
create" response to not only reset the consumer, but also expose the current configuration and updated delivery state.
This is useful for the durable sourcing consumer to confirm the proper configuration is used before allowing the
sourcing to happen. As well as generally looking as if the consumer was recreated, this response can then also be kept
by clients if they need to keep a cached consumer response.

## Decision

[Maybe this was just an architectural decision...]

## Consequences

Clients will need to add support for the `$JS.API.CONSUMER.RESET.<STREAM>.<CONSUMER>` reset API. Clients should not rely
on this call only happening within the same process, but it potentially being called by another process, by the CLI for
example. Importantly, clients should normally not need to rely on the consumer delivery sequence being monotonic, except
when needed for the "ordered consumer" implementations. If the reset API is called for an ordered consumer, the client
should detect a gap as it would normally and simply recreate the consumer.