diff --git a/.goreleaser.yml b/.goreleaser.yml index 8612768d29..a7df569123 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -38,6 +38,8 @@ builds: - mips64le - s390x - ppc64le + # RISC-V currently only supported on Linux + - riscv64 goarm: - 6 - 7 @@ -52,6 +54,12 @@ builds: goarch: arm64 - goos: freebsd goarch: 386 + - goos: darwin + goarch: riscv64 + - goos: windows + goarch: riscv64 + - goos: freebsd + goarch: riscv64 mod_timestamp: "{{ .CommitTimestamp }}" nfpms: diff --git a/go.mod b/go.mod index 966ec0e54d..59a01685d3 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/nats-io/nkeys v0.4.11 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.42.0 - golang.org/x/sys v0.36.0 - golang.org/x/time v0.13.0 + golang.org/x/crypto v0.43.0 + golang.org/x/sys v0.37.0 + golang.org/x/time v0.14.0 ) diff --git a/go.sum b/go.sum index 3f4b9fba47..49aa238255 100644 --- a/go.sum +++ b/go.sum @@ -24,12 +24,12 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= -golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= +golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= -golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI= -golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server/jetstream.go b/server/jetstream.go index 05dac6ef4c..0aa2e527aa 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1348,8 +1348,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro var offlineReason string if !supported { apiLevel := getRequiredApiLevel(cfg.Metadata) - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported stream '%s > %s': %s", a.Name, cfg.StreamConfig.Name, offlineReason) } else { offlineReason = fmt.Sprintf("decoding error: %v", strictErr) s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr) @@ -1571,8 +1575,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro var offlineReason string if !supported { apiLevel := getRequiredApiLevel(cfg.Metadata) - offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) - s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel) + if strictErr != nil { + offlineReason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(strictErr.Error(), "json: ")) + } else { + offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel) + } + s.Warnf(" Detected unsupported consumer '%s > %s > %s': %s", a.Name, e.mset.name(), cfg.Name, offlineReason) } else { offlineReason = fmt.Sprintf("decoding error: %v", strictErr) s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr) @@ -1582,7 +1590,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro if !e.mset.closed.Load() { s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name()) e.mset.mu.Lock() - e.mset.offlineReason = "stopped" + e.mset.offlineReason = fmt.Sprintf("stopped - unsupported consumer %q", cfg.Name) e.mset.mu.Unlock() e.mset.stop(false, false) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 00ba6c5fc0..1cef734884 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -162,9 +162,15 @@ type unsupportedStreamAssignment struct { infoSub *subscription } -func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment) *unsupportedStreamAssignment { +func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, err error) *unsupportedStreamAssignment { reason := "stopped" - if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { + if err != nil { + if errstr := err.Error(); strings.HasPrefix(errstr, "json:") { + reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: ")) + } else { + reason = fmt.Sprintf("stopped - %s", errstr) + } + } else if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel) } @@ -240,9 +246,15 @@ type unsupportedConsumerAssignment struct { infoSub *subscription } -func newUnsupportedConsumerAssignment(ca *consumerAssignment) *unsupportedConsumerAssignment { +func newUnsupportedConsumerAssignment(ca *consumerAssignment, err error) *unsupportedConsumerAssignment { reason := "stopped" - if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { + if err != nil { + if errstr := err.Error(); strings.HasPrefix(errstr, "json:") { + reason = fmt.Sprintf("unsupported - config error: %s", strings.TrimPrefix(err.Error(), "json: ")) + } else { + reason = fmt.Sprintf("stopped - %s", errstr) + } + } else if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ { reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel) } @@ -4003,8 +4015,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) { // If unsupported, we can't register any further. if sa.unsupported != nil { sa.unsupported.setupInfoSub(s, sa) - apiLevel := getRequiredApiLevel(sa.Config.Metadata) - s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel) + s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason) js.mu.Unlock() // Need to stop the stream, we can't keep running with an old config. @@ -4133,8 +4144,7 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { // If unsupported, we can't register any further. if sa.unsupported != nil { sa.unsupported.setupInfoSub(s, sa) - apiLevel := getRequiredApiLevel(sa.Config.Metadata) - s.Warnf("Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", accName, stream, apiLevel) + s.Warnf("Detected unsupported stream '%s > %s': %s", accName, stream, sa.unsupported.reason) js.mu.Unlock() // Need to stop the stream, we can't keep running with an old config. @@ -4815,12 +4825,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // If unsupported, we can't register any further. if ca.unsupported != nil { ca.unsupported.setupInfoSub(s, ca) - apiLevel := getRequiredApiLevel(ca.Config.Metadata) - s.Warnf("Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", accName, stream, ca.Name, apiLevel) + s.Warnf("Detected unsupported consumer '%s > %s > %s': %s", accName, stream, ca.Name, ca.unsupported.reason) // Mark stream as unsupported as well if sa.unsupported == nil { - sa.unsupported = newUnsupportedStreamAssignment(s, sa) + sa.unsupported = newUnsupportedStreamAssignment(s, sa, fmt.Errorf("unsupported consumer %q", ca.Name)) } sa.unsupported.setupInfoSub(s, sa) js.mu.Unlock() @@ -8008,20 +8017,21 @@ func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) { func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error { var unsupported bool var cfg StreamConfig + var err error decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&cfg); err != nil { + if err = decoder.Decode(&cfg); err != nil { unsupported = true cfg = StreamConfig{} - if err = json.Unmarshal(sa.ConfigJSON, &cfg); err != nil { - return err + if err2 := json.Unmarshal(sa.ConfigJSON, &cfg); err2 != nil { + return err2 } } sa.Config = &cfg fixCfgMirrorWithDedupWindow(sa.Config) - if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { - sa.unsupported = newUnsupportedStreamAssignment(s, sa) + if unsupported || err != nil || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { + sa.unsupported = newUnsupportedStreamAssignment(s, sa, err) } return nil } @@ -8472,18 +8482,19 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { func decodeConsumerAssignmentConfig(ca *consumerAssignment) error { var unsupported bool var cfg ConsumerConfig + var err error decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&cfg); err != nil { + if err = decoder.Decode(&cfg); err != nil { unsupported = true cfg = ConsumerConfig{} - if err = json.Unmarshal(ca.ConfigJSON, &cfg); err != nil { - return err + if err2 := json.Unmarshal(ca.ConfigJSON, &cfg); err2 != nil { + return err2 } } ca.Config = &cfg - if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - ca.unsupported = newUnsupportedConsumerAssignment(ca) + if unsupported || err != nil || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { + ca.unsupported = newUnsupportedConsumerAssignment(ca, err) } return nil } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 6912ef791d..6d1ca28e4b 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9714,7 +9714,7 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes // Stream should also be reported as offline. // Specifically, as "stopped" because it's still supported, but can't run due to the unsupported consumer. - expectStreamInfo("stopped", "DowngradeConsumerTest") + expectStreamInfo("stopped - unsupported consumer \"DowngradeConsumerTest\"", "DowngradeConsumerTest") } // Consumer should be reported as offline, but healthz should report healthy to not block downgrades. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index cf1ba2211c..6762f4fcb8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21951,7 +21951,7 @@ func TestJetStreamOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { mset, err = s.globalAccount().lookupStream("DowngradeConsumerTest") require_NoError(t, err) require_True(t, mset.closed.Load()) - require_Equal(t, mset.offlineReason, "stopped") + require_Equal(t, mset.offlineReason, "stopped - unsupported consumer \"DowngradeConsumerTest\"") obs := mset.getPublicConsumers() require_Len(t, len(obs), 1) diff --git a/server/raft.go b/server/raft.go index d25aba38d2..7ea1ee54c8 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3594,9 +3594,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { isNew := sub != nil && sub == n.aesub // If we are/were catching up ignore old catchup subs, but only if catching up from an older server - // that doesn't send the leader term when catching up. We can reject old catchups from newer subs - // later, just by checking the append entry is on the correct term. - if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) { + // that doesn't send the leader term when catching up or if we would truncate as a result. + // We can reject old catchups from newer subs later, just by checking the append entry is on the correct term. + if !isNew && sub != nil && (ae.lterm == 0 || ae.pindex < n.pindex) && (!catchingUp || sub != n.catchup.sub) { n.Unlock() n.debug("AppendEntry ignoring old entry from previous catchup") return diff --git a/server/raft_test.go b/server/raft_test.go index 86db4fdf7c..e202237c9b 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -3696,6 +3696,63 @@ func TestNRGLostQuorum(t *testing.T) { require_False(t, n.lostQuorum()) } +func TestNRGParallelCatchupRollback(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + aeReply := "$TEST" + nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + sub, err := nc.SubscribeSync(aeReply) + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, lterm: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, lterm: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: nil, reply: aeReply}) + + // Trigger a catchup. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 0) + require_NotNil(t, n.catchup) + require_Equal(t, n.catchup.cterm, aeMsg2.pterm) + require_Equal(t, n.catchup.cindex, aeMsg2.pindex) + csub := n.catchup.sub + + // Receive the missed messages. + n.processAppendEntry(aeMsg1, csub) + require_Equal(t, n.pindex, 1) + n.processAppendEntry(aeMsg2, csub) + require_Equal(t, n.pindex, 2) + require_True(t, n.catchup == nil) + + // Should respond to the heartbeat and allow the leader to commit. + n.processAppendEntry(aeHeartbeat, n.aesub) + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + ar := decodeAppendEntryResponse(msg.Data) + require_NotNil(t, ar) + require_True(t, ar.success) + require_Equal(t, ar.term, aeHeartbeat.term) + require_Equal(t, ar.index, aeHeartbeat.pindex) + + // Now replay a message that was already received as a catchup entry. + // Likely due to running multiple catchups in parallel. + // Since our WAL is already ahead, we should not truncate based on this. + n.processAppendEntry(aeMsg1, csub) + require_Equal(t, n.pindex, 2) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: