From 88223eeb98aba80ccf1de2839661e9b8ba8a1473 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 6 Nov 2025 12:01:19 +0100 Subject: [PATCH 1/5] ADR-57: JetStream durable stream sourcing/mirroring Signed-off-by: Maurice van Veen --- README.md | 4 ++ adr/ADR-57.md | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 adr/ADR-57.md diff --git a/README.md b/README.md index a3a6e0e..8f7493b 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc |Index|Tags|Description| |-----|----|-----------| |[ADR-50](adr/ADR-50.md)|jetstream, server, client, 2.12, 2.14|JetStream Batch Publishing| +|[ADR-57](adr/ADR-57.md)|jetstream, client, server, 2.14|JetStream durable stream sourcing/mirroring| ## Client @@ -65,6 +66,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc |[ADR-48](adr/ADR-48.md)|jetstream, client, kv, refinement, 2.11|TTL Support for Key-Value Buckets (updating [ADR-8](adr/ADR-8.md))| |[ADR-50](adr/ADR-50.md)|jetstream, server, client, 2.12, 2.14|JetStream Batch Publishing| |[ADR-54](adr/ADR-54.md)|jetstream, client, spec, orbit, kv|KV Codecs (updating [ADR-8](adr/ADR-8.md))| +|[ADR-57](adr/ADR-57.md)|jetstream, client, server, 2.14|JetStream durable stream sourcing/mirroring| ## Jetstream @@ -97,6 +99,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc |[ADR-50](adr/ADR-50.md)|jetstream, server, client, 2.12, 2.14|JetStream Batch Publishing| |[ADR-51](adr/ADR-51.md)|jetstream, 2.12|JetStream Message Scheduler| |[ADR-54](adr/ADR-54.md)|jetstream, client, spec, orbit, kv|KV Codecs (updating [ADR-8](adr/ADR-8.md))| +|[ADR-57](adr/ADR-57.md)|jetstream, client, server, 2.14|JetStream durable stream sourcing/mirroring| ## Kv @@ -173,6 +176,7 @@ This repository captures Architecture, Design Specifications and Feature Guidanc |[ADR-50](adr/ADR-50.md)|jetstream, server, client, 2.12, 2.14|JetStream Batch Publishing| |[ADR-55](adr/ADR-55.md)|server, 2.12|Trusted Protocol Aware Proxies| |[ADR-56](adr/ADR-56.md)|server, 2.12|JetStream Consistency Models| +|[ADR-57](adr/ADR-57.md)|jetstream, client, server, 2.14|JetStream durable stream sourcing/mirroring| ## Spec diff --git a/adr/ADR-57.md b/adr/ADR-57.md new file mode 100644 index 0000000..bef60d6 --- /dev/null +++ b/adr/ADR-57.md @@ -0,0 +1,128 @@ +# JetStream durable stream sourcing/mirroring + +| Metadata | Value | +|----------|---------------------------------| +| Date | 2025-11-06 | +| Author | @MauriceVanVeen | +| Status | Proposed | +| Tags | jetstream, client, server, 2.14 | + +| Revision | Date | Author | Info | +|----------|------------|-----------------|----------------| +| 1 | 2025-11-06 | @MauriceVanVeen | Initial design | + +## Context and Problem Statement + +JetStream streams can be mirrored or sourced from another stream. Usually this is done on separate servers, for example, +loosely connected as a leaf node. This is achieved by the server creating an ephemeral ordered push consumer using +`AckNone`. This is really reliable if the stream that's being mirrored/sourcing is a Limits stream. If the server +detects a gap, it recreates the consumer at the sequence it missed. And since the stream is a Limits stream, it will be +able to recover from the gap since the messages will still be in the stream. + +However, if the stream is a WorkQueue or Interest stream, then the use of an ephemeral `AckNone` consumer is problematic +for two reasons: + +- For both WorkQueue and Interest streams any messages that are sent are immediately acknowledged and removed. If this + message is not received on the other end, this message will be lost. +- Additionally, for an Interest stream since the consumer is ephemeral, interest will be lost while there's no active + connection between the two servers. This also results in messages being lost. + +Reliable stream mirroring/sourcing is required for use cases where WorkQueue or Interest streams are used or desired. + +## Design + +### Pre-created durable consumer + +Instead of the server creating and managing ephemeral consumers for stream sourcing, the user creates a durable consumer +that the server will use. + +The benefits of using a durable consumer are that these will be visible to the user and can be monitored. This eases the +control (and security implications) of the consumer configuration as this consumer will be manually created on the +server containing the data that will be sourced. Additionally, the consumer can be paused and resumed, allowing the +sourcing to temporarily stop if desired. + +Some additional tooling will be required to create the durable consumer with the proper configuration. But through the +use of new fields/values on the consumer configuration, the server will be able to help enforce the correct +configuration. + +### Performance / Consumer configuration + +The durable consumer used for stream sourcing/mirroring will need to be just as performant as the current ephemeral +variant. The current ephemeral consumer configuration uses `AckNone` which is problematic for WorkQueue and Interest +streams. A different `AckPolicy` will need to be used to ensure that messages are not lost. + +The consumer configuration will closely resemble the ephemeral push consumer variant: + +- The consumer will still act as an "ordered push consumer" but it will be durable. +- Requires `FlowControl` and `Heartbeat` to be set. +- Uses `AckPolicy=AckFlowControl` instead of `AckNone`. +- `AckPolicy=AckFlowControl` will function like `AckAll` although the server will not use the ack reply on the received + messages. +- The server responds to the flow control messages, including the stream sequence (`Nats-Last-Stream`) and delivery + sequence (`Nats-Last-Consumer`) as headers to specify which messages have been successfully stored. +- The server receiving the flow control response will ack messages based on these stream/delivery sequences. +- Acknowledgements happen based on flow control during active message flow, but if the stream is idle the + `Heartbeat` will also trigger a flow control message to move the acknowledgement floor up. + +The stream configuration will be extended to include the consumer name as well as the delivery subject used for stream +sourcing/mirroring. + +```go +type StreamSource struct { + Name string `json:"name"` + ConsumerName string `json:"consumer_name,omitempty"` + ConsumerDeliverSubject string `json:"consumer_deliver_subject,omitempty"` +} +``` + +### Consumer delivery state reset API + +The ordered consumer implementation relies on the consumer's delivery sequence to start at 1 and increment by 1 for +every delivered message. Gaps are detected by ensuring this delivery sequence increments monotonically. If a gap is +detected, the consumer delivery state will need to be reset such that the delivery sequence starts at 1 again. + +This is a non-issue for the ephemeral ordered push consumer variant as it creates a new consumer starting at the +expected sequence if a gap is detected. However, the durable consumer must not be deleted and recreated, since that will +result in losing interest on an Interest stream and subsequently losing messages. + +Therefore, the server will provide an API to reset the consumer delivery state. When the server detects a gap, it will +call this API to reset the consumer delivery state. This ensures the delivery sequence restarts at 1 and any gaps are +reliably handled. + +This reset API, `$JS.API.CONSUMER.RESET..`, will have the following functionality: + +- The consumer will be reset, resembling the delivery state of creating a new consumer with `opt_start_seq` set to the + specified sequence. +- The pending and redelivered messages will always be reset. +- The delivered stream/consumer sequences will always be reset. +- The ack floor consumer sequence will always be reset. +- The ack floor stream sequence will be updated depending on the payload. The next message to be delivered is above this + new ack floor. +- An empty payload will reset the consumer's state, but the ack floor stream sequence will remain the same. (This will + be used for the durable sourcing consumer after detecting a gap.) +- A payload of `{"seq":}` (with `seq>0`) will update the ack floor stream sequence to be one below the provided + sequence. The next message to be delivered has a sequence of `msg.seq >= reset.seq`. A zero-sequence is invalid. +- Resetting a consumer to a specific sequence will only be allowed on specific consumer configurations. + - Only allowed on `DeliverPolicy=all,by_start_sequence,by_start_time`. + - If `DeliverPolicy=all`, the reset will always be successful and allow to move forward or backward arbitrarily. + - If `DeliverPolicy=by_start_sequence,by_start_time`, the reset will only be successful if + `reset.seq >= opt_start_seq`, or if `loadNextMsg(reset.seq).start_time >= opt_start_time`. This is a safety + measure to prevent the consumer from being reset to a sequence before what was allowed by the consumer + configuration. +- The response to the reset API call will follow standard JS API conventions. Specifically, returning a "consumer + create" response to not only reset the consumer, but also expose the current configuration and updated delivery state. + This is useful for the durable sourcing consumer to confirm the proper configuration is used before allowing the + sourcing to happen. As well as generally looking as if the consumer was recreated, this response can then also be kept + by clients if they need to keep a cached consumer response. + +## Decision + +[Maybe this was just an architectural decision...] + +## Consequences + +Clients will need to add support for the `$JS.API.CONSUMER.RESET..` reset API. Clients should not rely +on this call only happening within the same process, but it potentially being called by another process, by the CLI for +example. Importantly, clients should normally not need to rely on the consumer delivery sequence being monotonic, except +when needed for the "ordered consumer" implementations. If the reset API is called for an ordered consumer, the client +should detect a gap as it would normally and simply recreate the consumer. From f6f720da1e4e5f45e0908627310dec84e4f10c90 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 11 Nov 2025 15:52:39 +0100 Subject: [PATCH 2/5] ADR-57: Feedback Signed-off-by: Maurice van Veen --- adr/ADR-57.md | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/adr/ADR-57.md b/adr/ADR-57.md index bef60d6..37de351 100644 --- a/adr/ADR-57.md +++ b/adr/ADR-57.md @@ -42,8 +42,7 @@ server containing the data that will be sourced. Additionally, the consumer can sourcing to temporarily stop if desired. Some additional tooling will be required to create the durable consumer with the proper configuration. But through the -use of new fields/values on the consumer configuration, the server will be able to help enforce the correct -configuration. +use of a new `AckPolicy=AckFlowControl` field, the server will be able to help enforce the correct configuration. ### Performance / Consumer configuration @@ -56,12 +55,13 @@ The consumer configuration will closely resemble the ephemeral push consumer var - The consumer will still act as an "ordered push consumer" but it will be durable. - Requires `FlowControl` and `Heartbeat` to be set. - Uses `AckPolicy=AckFlowControl` instead of `AckNone`. -- `AckPolicy=AckFlowControl` will function like `AckAll` although the server will not use the ack reply on the received - messages. -- The server responds to the flow control messages, including the stream sequence (`Nats-Last-Stream`) and delivery - sequence (`Nats-Last-Consumer`) as headers to specify which messages have been successfully stored. -- The server receiving the flow control response will ack messages based on these stream/delivery sequences. -- Acknowledgements happen based on flow control during active message flow, but if the stream is idle the +- `AckPolicy=AckFlowControl` will function like `AckAll` although the receiving server will not use the current ack + reply format and ack individual messages. +- The receiving server responds to the flow control messages, which includes the stream sequence (`Nats-Last-Stream`) + and delivery sequence (`Nats-Last-Consumer`) as headers to signal which messages have been successfully stored. +- The server receiving the flow control response will ack messages based on these stream/delivery sequences. For + WorkQueue and Interest streams this may result in messages deletion. +- Acknowledgements happen based on flow control limits, usually a data window size. But if the stream is idle the `Heartbeat` will also trigger a flow control message to move the acknowledgement floor up. The stream configuration will be extended to include the consumer name as well as the delivery subject used for stream @@ -83,18 +83,19 @@ detected, the consumer delivery state will need to be reset such that the delive This is a non-issue for the ephemeral ordered push consumer variant as it creates a new consumer starting at the expected sequence if a gap is detected. However, the durable consumer must not be deleted and recreated, since that will -result in losing interest on an Interest stream and subsequently losing messages. +result in losing interest on an Interest stream and subsequently losing messages. Waiting for re-delivery is also not an +option as this will result in out of order delivery. Therefore, the server will provide an API to reset the consumer delivery state. When the server detects a gap, it will -call this API to reset the consumer delivery state. This ensures the delivery sequence restarts at 1 and any gaps are -reliably handled. +call this API to reset the consumer delivery state. The consumer delivery sequence restarts at 1 and re-delivers pending +messages. Optionally re-delivery will start from a specified stream sequence. This reset API, `$JS.API.CONSUMER.RESET..`, will have the following functionality: - The consumer will be reset, resembling the delivery state of creating a new consumer with `opt_start_seq` set to the specified sequence. - The pending and redelivered messages will always be reset. -- The delivered stream/consumer sequences will always be reset. +- The delivered stream and consumer sequences will always be reset. - The ack floor consumer sequence will always be reset. - The ack floor stream sequence will be updated depending on the payload. The next message to be delivered is above this new ack floor. @@ -110,10 +111,10 @@ This reset API, `$JS.API.CONSUMER.RESET..`, will have the foll measure to prevent the consumer from being reset to a sequence before what was allowed by the consumer configuration. - The response to the reset API call will follow standard JS API conventions. Specifically, returning a "consumer - create" response to not only reset the consumer, but also expose the current configuration and updated delivery state. - This is useful for the durable sourcing consumer to confirm the proper configuration is used before allowing the - sourcing to happen. As well as generally looking as if the consumer was recreated, this response can then also be kept - by clients if they need to keep a cached consumer response. + reset" response to not only reset the consumer, but also expose the current configuration and updated delivery state + like a "consumer create" response. This is useful for the durable sourcing consumer to confirm the proper + configuration is used before allowing the sourcing to happen. As well as generally looking as if the consumer was + recreated, this response can then also be kept by clients if they need to keep a cached consumer response. ## Decision @@ -121,8 +122,8 @@ This reset API, `$JS.API.CONSUMER.RESET..`, will have the foll ## Consequences -Clients will need to add support for the `$JS.API.CONSUMER.RESET..` reset API. Clients should not rely -on this call only happening within the same process, but it potentially being called by another process, by the CLI for -example. Importantly, clients should normally not need to rely on the consumer delivery sequence being monotonic, except -when needed for the "ordered consumer" implementations. If the reset API is called for an ordered consumer, the client -should detect a gap as it would normally and simply recreate the consumer. +Client should support the `$JS.API.CONSUMER.RESET..` reset API. Clients should not rely on this call +to be initiated by the client process, but it potentially being called by another process, by the CLI for example. +Importantly, clients should not fail when the consumer delivery sequence being monotonic, except when needed for the +"ordered consumer" implementations. If the reset API is called for an ordered consumer, the client should detect a gap +as it would normally and simply recreate the consumer. From c84d5cad38c37e488351a6250847d95f933038ae Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 12 Nov 2025 09:16:01 +0100 Subject: [PATCH 3/5] ADR-57: Feedback, expose ResetSeq Signed-off-by: Maurice van Veen --- adr/ADR-57.md | 1 + 1 file changed, 1 insertion(+) diff --git a/adr/ADR-57.md b/adr/ADR-57.md index 37de351..d996a80 100644 --- a/adr/ADR-57.md +++ b/adr/ADR-57.md @@ -115,6 +115,7 @@ This reset API, `$JS.API.CONSUMER.RESET..`, will have the foll like a "consumer create" response. This is useful for the durable sourcing consumer to confirm the proper configuration is used before allowing the sourcing to happen. As well as generally looking as if the consumer was recreated, this response can then also be kept by clients if they need to keep a cached consumer response. +- Additionally, the response will contain the `ResetSeq` that the consumer is reset to. ## Decision From ed4847e7e71c44c610521745413c82ddfd4f7223 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 5 Dec 2025 09:41:28 +0100 Subject: [PATCH 4/5] ADR-57: Refinement after initial implementation Signed-off-by: Maurice van Veen --- adr/ADR-57.md | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/adr/ADR-57.md b/adr/ADR-57.md index d996a80..c5e3aca 100644 --- a/adr/ADR-57.md +++ b/adr/ADR-57.md @@ -7,17 +7,18 @@ | Status | Proposed | | Tags | jetstream, client, server, 2.14 | -| Revision | Date | Author | Info | -|----------|------------|-----------------|----------------| -| 1 | 2025-11-06 | @MauriceVanVeen | Initial design | +| Revision | Date | Author | Info | +|----------|------------|-----------------|-----------------------------------------| +| 1 | 2025-11-06 | @MauriceVanVeen | Initial design | +| 2 | 2025-12-05 | @MauriceVanVeen | Refinement after initial implementation | ## Context and Problem Statement JetStream streams can be mirrored or sourced from another stream. Usually this is done on separate servers, for example, loosely connected as a leaf node. This is achieved by the server creating an ephemeral ordered push consumer using -`AckNone`. This is really reliable if the stream that's being mirrored/sourcing is a Limits stream. If the server -detects a gap, it recreates the consumer at the sequence it missed. And since the stream is a Limits stream, it will be -able to recover from the gap since the messages will still be in the stream. +`AckNone`. This is really reliable if the stream that's being mirrored/sourced is a Limits stream. If the server detects +a gap, it recreates the consumer at the sequence it missed. And since the stream is a Limits stream, it will be able to +recover from the gap since the messages will still be in the stream. However, if the stream is a WorkQueue or Interest stream, then the use of an ephemeral `AckNone` consumer is problematic for two reasons: @@ -48,7 +49,7 @@ use of a new `AckPolicy=AckFlowControl` field, the server will be able to help e The durable consumer used for stream sourcing/mirroring will need to be just as performant as the current ephemeral variant. The current ephemeral consumer configuration uses `AckNone` which is problematic for WorkQueue and Interest -streams. A different `AckPolicy` will need to be used to ensure that messages are not lost. +streams. A different `AckPolicy` (`AckFlowControl`) will need to be used to ensure that messages are not lost. The consumer configuration will closely resemble the ephemeral push consumer variant: @@ -56,13 +57,21 @@ The consumer configuration will closely resemble the ephemeral push consumer var - Requires `FlowControl` and `Heartbeat` to be set. - Uses `AckPolicy=AckFlowControl` instead of `AckNone`. - `AckPolicy=AckFlowControl` will function like `AckAll` although the receiving server will not use the current ack - reply format and ack individual messages. + reply format by acknowledging individual messages. - The receiving server responds to the flow control messages, which includes the stream sequence (`Nats-Last-Stream`) and delivery sequence (`Nats-Last-Consumer`) as headers to signal which messages have been successfully stored. - The server receiving the flow control response will ack messages based on these stream/delivery sequences. For WorkQueue and Interest streams this may result in messages deletion. - Acknowledgements happen based on flow control limits, usually a data window size. But if the stream is idle the `Heartbeat` will also trigger a flow control message to move the acknowledgement floor up. +- Flow control messages will happen automatically after a certain data size is reached, but can be controlled using the + `MaxAckPending` setting. `MaxAckPending` determines the maximum number of pending messages that can be sent before the + sourcing pauses. A flow control message will be automatically sent (no need to wait for a `Heartbeat`) so these + messages are acknowledged and new messages can be sent as soon as possible. +- Since acknowledgements happen based on dynamic flow control, it being determined either by data size, `MaxAckPending` + or `Heartbeat`, the consumer cannot have an `AckWait` or `BackOff` setting. These fields need to be unset. +- Additionally, `MaxDeliver` must be set to `-1` (infinite) to ensure if some messages are lost in transit, they can + still be reliably redelivered. The stream configuration will be extended to include the consumer name as well as the delivery subject used for stream sourcing/mirroring. @@ -125,6 +134,6 @@ This reset API, `$JS.API.CONSUMER.RESET..`, will have the foll Client should support the `$JS.API.CONSUMER.RESET..` reset API. Clients should not rely on this call to be initiated by the client process, but it potentially being called by another process, by the CLI for example. -Importantly, clients should not fail when the consumer delivery sequence being monotonic, except when needed for the +Importantly, clients should not fail when the consumer delivery sequence is not monotonic, except when needed for the "ordered consumer" implementations. If the reset API is called for an ordered consumer, the client should detect a gap as it would normally and simply recreate the consumer. From 06e8ab3b46c05daa0e05b5c64ed164e096ead5a2 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 8 Dec 2025 11:37:30 +0100 Subject: [PATCH 5/5] ADR-57: Comment about WQ & manually resetting source consumer Signed-off-by: Maurice van Veen --- adr/ADR-57.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/adr/ADR-57.md b/adr/ADR-57.md index c5e3aca..dbad15c 100644 --- a/adr/ADR-57.md +++ b/adr/ADR-57.md @@ -42,6 +42,10 @@ control (and security implications) of the consumer configuration as this consum server containing the data that will be sourced. Additionally, the consumer can be paused and resumed, allowing the sourcing to temporarily stop if desired. +WorkQueue streams don't allow having multiple consumers with overlapping filter subjects. This means that a durable +consumer used for mirroring/sourcing of a WorkQueue stream, would not allow another overlapping consumer to be created +used for a different purpose. In that case, an Interest or Limits stream should be used. + Some additional tooling will be required to create the durable consumer with the proper configuration. But through the use of a new `AckPolicy=AckFlowControl` field, the server will be able to help enforce the correct configuration. @@ -126,9 +130,11 @@ This reset API, `$JS.API.CONSUMER.RESET..`, will have the foll recreated, this response can then also be kept by clients if they need to keep a cached consumer response. - Additionally, the response will contain the `ResetSeq` that the consumer is reset to. -## Decision - -[Maybe this was just an architectural decision...] +Importantly, the server should also handle the case where a user manually resets the consumer that's used for sourcing. +The server should handle this gracefully and ensure no messages are lost. However, the user could also reset the +consumer such that it moves ahead in the stream. The server should also handle this by properly skipping over those +messages. If instead the user manually resets the consumer to go backward, the server should guarantee that mirrored +messages are not duplicated. ## Consequences