JetStream Extensions is a set of utilities providing additional features to jetstream package in nats.go client.
go get github.com/synadia-io/orbit.go/jetstreamextGetBatch and GetLastMsgsFor are utilities that allow you to fetch multiple messages from a JetStream stream.
Responses are returned in an iterator, which you can range over to receive messages.
GetBatch fetches a batch of messages from a provided stream, starting from
either the lowest matching sequence, from the provided sequence, or from the
given time. It can be configured to fetch messages from matching subject (which
may contain wildcards) and up to a maximum byte limit.
Examples:
- fetching 10 messages from the beginning of the stream:
msgs, err := jetstreamext.GetBatch(ctx, js, "stream", 10)
if err != nil {
// handle error
}
for msg, err := range msgs {
if err != nil {
// handle error
}
fmt.Println(string(msg.Data))
}- fetching 10 messages from the stream starting from sequence 100 and matching subject:
msgs, err := jetstreamext.GetBatch(ctx, js, "stream", 10, jetstreamext.GetBatchSeq(100), jetstreamext.GetBatchSubject("foo"))
if err != nil {
// handle error
}
// process msgs- fetching 10 messages from the stream starting from time 1 hour ago:
msgs, err := jetstreamext.GetBatch(ctx, js, "stream", 10, jetstreamext.GetBatchStartTime(time.Now().Add(-time.Hour)))
if err != nil {
// handle error
}
// process msgs- fetching 10 messages or up to provided byte limit:
msgs, err := jetstreamext.GetBatch(ctx, js, "stream", 10, jetstreamext.GetBatchMaxBytes(1024))
if err != nil {
// handle error
}
// process msgsGetLastMsgsFor fetches the last messages for the specified subjects from the specified stream. It can be optionally configured to fetch messages up to the provided sequence (or time), rather than the latest messages available. It can also be configured to fetch messages up to a provided batch size.
The provided subjects may contain wildcards, however it is important to note that the NATS server will match a maximum of 1024 subjects.
Responses are returned in an iterator, which you can range over to receive messages.
Examples:
- fetching last messages from the stream for the provided subjects:
msgs, err := jetstreamext.GetLastMsgsFor(ctx, js, "stream", []string{"foo", "bar"})
if err != nil {
// handle error
}
for msg, err := range msgs {
if err != nil {
// handle error
}
fmt.Println(string(msg.Data))
}- fetching last messages from the stream for the provided subjects up to stream sequence 100:
msgs, err := jetstreamext.GetLastMsgsFor(ctx, js, "stream", []string{"foo", "bar"}, jetstreamext.GetLastMsgsUpToSeq(100))
if err != nil {
// handle error
}
// process msgs- fetching last messages from the stream for the provided subjects up to time 1 hour ago:
msgs, err := jetstreamext.GetLastMsgsFor(ctx, js, "stream", []string{"foo", "bar"}, jetstreamext.GetLastMsgsUpToTime(time.Now().Add(-time.Hour)))
if err != nil {
// handle error
}
// process msgs- fetching last messages from the stream for the provided subjects up to a batch size of 10:
msgs, err := jetstreamext.GetLastMsgsFor(ctx, js, "stream", []string{"foo.*"}, jetstreamext.GetLastMsgsBatchSize(10))
if err != nil {
// handle error
}
// process msgsPublishMsgBatch and BatchPublisher provide atomic batch publishing to JetStream streams with configurable flow control.
A batch publish is an atomic operation - either all messages in the batch are persisted, or none are, depending on the result of the commit.
In order to use this feature, stream has to be configured with AllowAtomicPublish enabled.
Note: This module requires nats-server v2.12.0 o later.
BatchPublisher allows you to create a publisher that publishes messages in streaming-like fashion, where each message is published individually, but the commit is done for the entire batch.
It can be configured with options for flow control and supports publish consistency checks.
Adding messages to the batch is an IO operation, and messages are published immediately and persisted upon commit.
A commit is done when the Commit method is called, which returns a BatchAck containing the results of the publish.
// Create a stream with batch publishing enabled
// stream has to be created with AllowAtomicPublish enabled
cfg := jetstream.StreamConfig{
Name: "FOO",
Subjects: []string{"foo.>"},
AllowAtomicPublish: true,
}
stream, err := js.CreateStream(ctx, cfg)
if err != nil {
// handle error
}
// Create a batch publisher
batch, err := jetstreamext.NewBatchPublisher(js)
if err != nil {
// handle error
}
// Add message to the batch
err := batch.AddMsg("foo.A", &nats.Msg{
Subject: "test.A",
Data: []byte("hello"),
})
if err != nil {
// handle error
}
// Commit the batch
ack, err := batch.Commit(ctx, "test.A", []byte("commit msg"))
if err != nil {
// handle error
}By default, BatchPublisher waits for an ack from the server for the first message in the batch and for the commit.
This can be configured with options, for example to wait for an ack for every 10 messages.
batch, err := jetstreamext.NewBatchPublisher(js, jetstreamext.BatchFlowControl{
AckEvery: 10,
AckTimeout: 5 * time.Second,
})
if err != nil {
// handle error
}PublishMsgBatch allows you to atomically publish a slice of messages to a stream and wait for an ack for the commit.
It can be configured with options for flow control. For consistency checks, relevant headers can be set on individual messages.
msgs := make([]*nats.Msg, 0, count)
for range count {
messages = append(messages, &nats.Msg{
Subject: "foo",
Data: []byte("message"),
})
}
ack, err := jetstreamext.PublishMsgBatch(ctx, js, messages)
if err != nil {
// handle error
}FastPublisher provides high-throughput batch publishing to JetStream streams
using the fast-ingest nats-server feature. Unlike BatchPublisher, which
atomically publishes messages and only persists them in stream upon commit,
FastPublisher persists messages as they are added to the batch, and the commit
is used to signal the end of the batch and return an ack with the last persisted
message.
A FastPublisher is not safe for concurrent use — all calls to Add,
AddMsg, Commit, CommitMsg, and Close must be made from a single
goroutine.
Note: Fast publishing requires nats-server v2.14.0 or later.
// Create a fast publisher
fp, err := jetstreamext.NewFastPublisher(js)
if err != nil {
// handle error
}
// Add messages to the batch
for i := range 1000 {
_, err := fp.Add("foo.bar", []byte(fmt.Sprintf("message %d", i)))
if err != nil {
// handle error
}
}
// Commit the batch with a final message
ack, err := fp.Commit(ctx, "foo.bar", []byte("final message"))
if err != nil {
// handle error
}
fmt.Printf("Batch committed: stream=%s, sequence=%d\n", ack.Stream, ack.Sequence)Flow control can be configured to tune throughput and back-pressure behavior. The server may dynamically adjust the ack frequency based on its own load. Note that the server may send acks more frequently than the configured flow, but not less.
fp, err := jetstreamext.NewFastPublisher(js, jetstreamext.FastPublishFlowControl{
Flow: 200, // initial ack frequency (default: 100)
MaxOutstandingAcks: 3, // max unacked batches before stalling (default: 2)
AckTimeout: 10 * time.Second, // timeout waiting for acks
})By default, if the server detects a gap (lost messages), the batch is abandoned. You can configure the publisher to continue on gaps and be notified via an error handler:
fp, err := jetstreamext.NewFastPublisher(js,
jetstreamext.WithFastPublisherContinueOnGap(true),
jetstreamext.WithFastPublisherErrorHandler(func(err error) {
log.Printf("fast publish error: %v", err)
}),
)If you don't need to add a final message on commit, use Close to send an end-of-batch signal:
ack, err := fp.Close(ctx)
if err != nil {
// handle error
}