Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ builds:
env:
# This is the toolchain version we use for releases. To override, set the env var, e.g.:
# GORELEASER_TOOLCHAIN="go1.22.8" TARGET='linux_amd64' goreleaser build --snapshot --clean --single-target
- GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.25.2" }}
- GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.25.3" }}
- GO111MODULE=on
- CGO_ENABLED=0
goos:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/nats-io/nats-server/v2

go 1.24.0

toolchain go1.24.8
toolchain go1.24.9

require (
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op
Expand Down
5 changes: 5 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5086,10 +5086,15 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// Grab record info, but use the pre-computed record length.
ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq))
if err != nil {
mb.finishedWithCache()
mb.mu.Unlock()
fsUnlock()
return false, err
}
if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil {
mb.finishedWithCache()
mb.mu.Unlock()
fsUnlock()
return false, err
}
}
Expand Down
26 changes: 26 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10937,3 +10937,29 @@ func TestFileStoreDetectDeleteGapWithOnlySkipMsg(t *testing.T) {
require_Len(t, mb.dmap.Size(), 0)
})
}

func TestFileStoreEraseMsgErr(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}
created := time.Now()
fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

_, _, err = fs.StoreMsg("foo", nil, nil, 0)
require_NoError(t, err)
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
require_NoError(t, err)

mb := fs.getFirstBlock()
mb.mu.Lock()
if mb.cache == nil {
mb.mu.Unlock()
t.Fatal("Expected cache to be initialized")
}
// Set to a bogus value such that the file rename fails while performing the message erase.
mb.mfn = _EMPTY_
mb.mu.Unlock()
fs.EraseMsg(2)
})
}
52 changes: 52 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10395,6 +10395,58 @@ func TestJetStreamClusterCatchupSkipMsgDesync(t *testing.T) {
}
}

func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DURABLE",
Replicas: 3,
})
require_NoError(t, err)

checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
for _, s := range c.servers {
_, _, jsa := s.globalAccount().getJetStreamFromAccount()
if !jsa.streamAssigned("TEST") {
return fmt.Errorf("stream not assigned on %s", s.Name())
}
if !jsa.consumerAssigned("TEST", "DURABLE") {
return fmt.Errorf("consumer not assigned on %s", s.Name())
}
}
return nil
})

sl := c.streamLeader(globalAccountName, "TEST")
cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE")

for _, s := range c.servers {
jsi, err := s.Jsz(&JSzOptions{RaftGroups: true})
require_NoError(t, err)
if s == sl {
require_Equal(t, jsi.StreamsLeader, 1)
} else {
require_Equal(t, jsi.StreamsLeader, 0)
}
if s == cl {
require_Equal(t, jsi.ConsumersLeader, 1)
} else {
require_Equal(t, jsi.ConsumersLeader, 0)
}
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
36 changes: 24 additions & 12 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2966,18 +2966,20 @@ type MetaClusterInfo struct {
// JSInfo has detailed information on JetStream.
type JSInfo struct {
JetStreamStats
ID string `json:"server_id"`
Now time.Time `json:"now"`
Disabled bool `json:"disabled,omitempty"`
Config JetStreamConfig `json:"config,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Messages uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
Total int `json:"total"`
ID string `json:"server_id"`
Now time.Time `json:"now"`
Disabled bool `json:"disabled,omitempty"`
Config JetStreamConfig `json:"config,omitempty"`
Limits *JSLimitOpts `json:"limits,omitempty"`
Streams int `json:"streams"`
StreamsLeader int `json:"streams_leader,omitempty"`
Consumers int `json:"consumers"`
ConsumersLeader int `json:"consumers_leader,omitempty"`
Messages uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
Total int `json:"total"`
}

func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail {
Expand Down Expand Up @@ -3197,6 +3199,16 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
jsi.Messages += streamState.Msgs
jsi.Bytes += streamState.Bytes
jsi.Consumers += streamState.Consumers
if opts.RaftGroups {
if node := stream.raftNode(); node == nil || node.Leader() {
jsi.StreamsLeader++
}
for _, consumer := range stream.getPublicConsumers() {
if node := consumer.raftNode(); node == nil || node.Leader() {
jsi.ConsumersLeader++
}
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ func (p *proxiesReload) Apply(s *Server) {
c.setAuthError(ErrAuthProxyNotTrusted)
c.authViolation()
}
s.Noticef("Reloaded: proxies trusted keys %q were removed", p.add)
s.Noticef("Reloaded: proxies trusted keys %q were removed", p.del)
}
if len(p.add) > 0 {
s.Noticef("Reloaded: proxies trusted keys %q were added", p.add)
Expand Down
35 changes: 35 additions & 0 deletions test/client_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -160,6 +161,18 @@ func TestClientConnectInfo(t *testing.T) {
}
}

type captureProxiesReloadLogger struct {
dummyLogger
ch chan string
}

func (l *captureProxiesReloadLogger) Noticef(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "proxies trusted keys") {
l.ch <- msg
}
}

func TestProxyKeyVerification(t *testing.T) {
u1, _ := nkeys.CreateUser()
u1Pub, _ := u1.PublicKey()
Expand Down Expand Up @@ -343,10 +356,32 @@ func TestProxyKeyVerification(t *testing.T) {
cid2 := currentCID
checkLeafNodeConnected(t, s)

logger := &captureProxiesReloadLogger{ch: make(chan string, 10)}
s.SetLogger(logger, false, false)

os.WriteFile(conf, fmt.Appendf(nil, tmpl, u3Pub, u2Pub), 0660)
if err := s.Reload(); err != nil {
t.Fatalf("Reload failed: %v", err)
}
for range 2 {
select {
case str := <-logger.ch:
if strings.Contains(str, "removed") {
if !strings.Contains(str, u1Pub) {
t.Fatalf("Expected removed trace to include %q, it did not: %s", u1Pub, str)
}
} else if strings.Contains(str, "added") {
if !strings.Contains(str, u3Pub) {
t.Fatalf("Expected added trace to include %q, it did not: %s", u3Pub, str)
}
} else {
t.Fatalf("Unexpected log: %q", str)
}
default:
t.Fatal("Expected a log, did not get one")
}
}

// Connections should get disconnected.
// We need to consume what is sent by the server, but for leaf we may
// get some LS+, etc... so just consumer until we get the io.EOF
Expand Down
Loading