Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ builds:
- mips64le
- s390x
- ppc64le
# RISC-V currently only supported on Linux
- riscv64
goarm:
- 6
- 7
Expand All @@ -52,6 +54,12 @@ builds:
goarch: arm64
- goos: freebsd
goarch: 386
- goos: darwin
goarch: riscv64
- goos: windows
goarch: riscv64
- goos: freebsd
goarch: riscv64
mod_timestamp: "{{ .CommitTimestamp }}"

nfpms:
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.42.0
golang.org/x/sys v0.36.0
golang.org/x/time v0.13.0
golang.org/x/crypto v0.43.0
golang.org/x/sys v0.37.0
golang.org/x/time v0.14.0
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
18 changes: 13 additions & 5 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,8 +1348,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
var offlineReason string
if !supported {
apiLevel := getRequiredApiLevel(cfg.Metadata)
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel)
if strictErr != nil {
offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: "))
} else {
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
}
s.Warnf(" Detected unsupported stream '%s > %s': %s", a.Name, cfg.StreamConfig.Name, offlineReason)
} else {
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr)
Expand Down Expand Up @@ -1571,8 +1575,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
var offlineReason string
if !supported {
apiLevel := getRequiredApiLevel(cfg.Metadata)
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel)
if strictErr != nil {
offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: "))
} else {
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
}
s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, e.mset.name(), cfg.Name, offlineReason)
} else {
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr)
Expand All @@ -1582,7 +1590,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
if !e.mset.closed.Load() {
s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name())
e.mset.mu.Lock()
e.mset.offlineReason = "stopped"
e.mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name)
e.mset.mu.Unlock()
e.mset.stop(false, false)
}
Expand Down
53 changes: 32 additions & 21 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,15 @@ type unsupportedStreamAssignment struct {
infoSub *subscription
}

func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment) *unsupportedStreamAssignment {
func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, err error) *unsupportedStreamAssignment {
reason := "stopped"
if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) {
if err != nil {
if errstr := err.Error(); strings.HasPrefix(errstr, "json:") {
reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: "))
} else {
reason = fmt.Sprintf("stopped - %s", errstr)
}
} else if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) {
if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ {
reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel)
}
Expand Down Expand Up @@ -240,9 +246,15 @@ type unsupportedConsumerAssignment struct {
infoSub *subscription
}

func newUnsupportedConsumerAssignment(ca *consumerAssignment) *unsupportedConsumerAssignment {
func newUnsupportedConsumerAssignment(ca *consumerAssignment, err error) *unsupportedConsumerAssignment {
reason := "stopped"
if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) {
if err != nil {
if errstr := err.Error(); strings.HasPrefix(errstr, "json:") {
reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: "))
} else {
reason = fmt.Sprintf("stopped - %s", errstr)
}
} else if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) {
if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ {
reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel)
}
Expand Down Expand Up @@ -4003,8 +4015,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
// If unsupported, we can't register any further.
if sa.unsupported != nil {
sa.unsupported.setupInfoSub(s, sa)
apiLevel := getRequiredApiLevel(sa.Config.Metadata)
s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel)
s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason)
js.mu.Unlock()

// Need to stop the stream, we can't keep running with an old config.
Expand Down Expand Up @@ -4133,8 +4144,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
// If unsupported, we can't register any further.
if sa.unsupported != nil {
sa.unsupported.setupInfoSub(s, sa)
apiLevel := getRequiredApiLevel(sa.Config.Metadata)
s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel)
s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason)
js.mu.Unlock()

// Need to stop the stream, we can't keep running with an old config.
Expand Down Expand Up @@ -4815,12 +4825,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// If unsupported, we can't register any further.
if ca.unsupported != nil {
ca.unsupported.setupInfoSub(s, ca)
apiLevel := getRequiredApiLevel(ca.Config.Metadata)
s.Warnf("Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", accName, stream, ca.Name, apiLevel)
s.Warnf("Detected unsupported consumer '%s > %s > %s': %s", accName, stream, ca.Name, ca.unsupported.reason)

// Mark stream as unsupported as well
if sa.unsupported == nil {
sa.unsupported = newUnsupportedStreamAssignment(s, sa)
sa.unsupported = newUnsupportedStreamAssignment(s, sa, fmt.Errorf("unsupported consumer %q", ca.Name))
}
sa.unsupported.setupInfoSub(s, sa)
js.mu.Unlock()
Expand Down Expand Up @@ -8008,20 +8017,21 @@ func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) {
func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error {
var unsupported bool
var cfg StreamConfig
var err error
decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&cfg); err != nil {
if err = decoder.Decode(&cfg); err != nil {
unsupported = true
cfg = StreamConfig{}
if err = json.Unmarshal(sa.ConfigJSON, &cfg); err != nil {
return err
if err2 := json.Unmarshal(sa.ConfigJSON, &cfg); err2 != nil {
return err2
}
}
sa.Config = &cfg
fixCfgMirrorWithDedupWindow(sa.Config)

if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) {
sa.unsupported = newUnsupportedStreamAssignment(s, sa)
if unsupported || err != nil || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) {
sa.unsupported = newUnsupportedStreamAssignment(s, sa, err)
}
return nil
}
Expand Down Expand Up @@ -8472,18 +8482,19 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) {
func decodeConsumerAssignmentConfig(ca *consumerAssignment) error {
var unsupported bool
var cfg ConsumerConfig
var err error
decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&cfg); err != nil {
if err = decoder.Decode(&cfg); err != nil {
unsupported = true
cfg = ConsumerConfig{}
if err = json.Unmarshal(ca.ConfigJSON, &cfg); err != nil {
return err
if err2 := json.Unmarshal(ca.ConfigJSON, &cfg); err2 != nil {
return err2
}
}
ca.Config = &cfg
if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) {
ca.unsupported = newUnsupportedConsumerAssignment(ca)
if unsupported || err != nil || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) {
ca.unsupported = newUnsupportedConsumerAssignment(ca, err)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9714,7 +9714,7 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes

// Stream should also be reported as offline.
// Specifically, as "stopped" because it's still supported, but can't run due to the unsupported consumer.
expectStreamInfo("stopped", "DowngradeConsumerTest")
expectStreamInfo("stopped - unsupported consumer \"DowngradeConsumerTest\"", "DowngradeConsumerTest")
}

// Consumer should be reported as offline, but healthz should report healthy to not block downgrades.
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21951,7 +21951,7 @@ func TestJetStreamOfflineStreamAndConsumerAfterDowngrade(t *testing.T) {
mset, err = s.globalAccount().lookupStream("DowngradeConsumerTest")
require_NoError(t, err)
require_True(t, mset.closed.Load())
require_Equal(t, mset.offlineReason, "stopped")
require_Equal(t, mset.offlineReason, "stopped - unsupported consumer \"DowngradeConsumerTest\"")

obs := mset.getPublicConsumers()
require_Len(t, len(obs), 1)
Expand Down
6 changes: 3 additions & 3 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3594,9 +3594,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
isNew := sub != nil && sub == n.aesub

// If we are/were catching up ignore old catchup subs, but only if catching up from an older server
// that doesn't send the leader term when catching up. We can reject old catchups from newer subs
// later, just by checking the append entry is on the correct term.
if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) {
// that doesn't send the leader term when catching up or if we would truncate as a result.
// We can reject old catchups from newer subs later, just by checking the append entry is on the correct term.
if !isNew && sub != nil && (ae.lterm == 0 || ae.pindex < n.pindex) && (!catchingUp || sub != n.catchup.sub) {
n.Unlock()
n.debug("AppendEntry ignoring old entry from previous catchup")
return
Expand Down
57 changes: 57 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3696,6 +3696,63 @@ func TestNRGLostQuorum(t *testing.T) {
require_False(t, n.lostQuorum())
}

func TestNRGParallelCatchupRollback(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

aeReply := "$TEST"
nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()

sub, err := nc.SubscribeSync(aeReply)
require_NoError(t, err)
defer sub.Drain()
require_NoError(t, nc.Flush())

// Timeline
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, lterm: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, lterm: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: nil, reply: aeReply})

// Trigger a catchup.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.pindex, 0)
require_NotNil(t, n.catchup)
require_Equal(t, n.catchup.cterm, aeMsg2.pterm)
require_Equal(t, n.catchup.cindex, aeMsg2.pindex)
csub := n.catchup.sub

// Receive the missed messages.
n.processAppendEntry(aeMsg1, csub)
require_Equal(t, n.pindex, 1)
n.processAppendEntry(aeMsg2, csub)
require_Equal(t, n.pindex, 2)
require_True(t, n.catchup == nil)

// Should respond to the heartbeat and allow the leader to commit.
n.processAppendEntry(aeHeartbeat, n.aesub)
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
ar := decodeAppendEntryResponse(msg.Data)
require_NotNil(t, ar)
require_True(t, ar.success)
require_Equal(t, ar.term, aeHeartbeat.term)
require_Equal(t, ar.index, aeHeartbeat.pindex)

// Now replay a message that was already received as a catchup entry.
// Likely due to running multiple catchups in parallel.
// Since our WAL is already ahead, we should not truncate based on this.
n.processAppendEntry(aeMsg1, csub)
require_Equal(t, n.pindex, 2)
}

// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
// proposing the next one.
// The test may fail if:
Expand Down