Skip to content

Commit bf2c60a

Browse files
Cherry-picks for v2.12.1 (#7430)
Includes the following: - #7427 - #7428 - #7429 - #7431 Signed-off-by: Neil Twigg <[email protected]>
2 parents 59361c9 + 350d9d7 commit bf2c60a

File tree

8 files changed

+145
-15
lines changed

8 files changed

+145
-15
lines changed

.goreleaser.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ builds:
2121
env:
2222
# This is the toolchain version we use for releases. To override, set the env var, e.g.:
2323
# GORELEASER_TOOLCHAIN="go1.22.8" TARGET='linux_amd64' goreleaser build --snapshot --clean --single-target
24-
- GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.25.2" }}
24+
- GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.25.3" }}
2525
- GO111MODULE=on
2626
- CGO_ENABLED=0
2727
goos:

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module github.com/nats-io/nats-server/v2
22

33
go 1.24.0
44

5-
toolchain go1.24.8
5+
toolchain go1.24.9
66

77
require (
88
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op

server/filestore.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5086,10 +5086,15 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
50865086
// Grab record info, but use the pre-computed record length.
50875087
ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq))
50885088
if err != nil {
5089+
mb.finishedWithCache()
5090+
mb.mu.Unlock()
5091+
fsUnlock()
50895092
return false, err
50905093
}
50915094
if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil {
50925095
mb.finishedWithCache()
5096+
mb.mu.Unlock()
5097+
fsUnlock()
50935098
return false, err
50945099
}
50955100
}

server/filestore_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10937,3 +10937,29 @@ func TestFileStoreDetectDeleteGapWithOnlySkipMsg(t *testing.T) {
1093710937
require_Len(t, mb.dmap.Size(), 0)
1093810938
})
1093910939
}
10940+
10941+
func TestFileStoreEraseMsgErr(t *testing.T) {
10942+
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
10943+
cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}
10944+
created := time.Now()
10945+
fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
10946+
require_NoError(t, err)
10947+
defer fs.Stop()
10948+
10949+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
10950+
require_NoError(t, err)
10951+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
10952+
require_NoError(t, err)
10953+
10954+
mb := fs.getFirstBlock()
10955+
mb.mu.Lock()
10956+
if mb.cache == nil {
10957+
mb.mu.Unlock()
10958+
t.Fatal("Expected cache to be initialized")
10959+
}
10960+
// Set to a bogus value such that the file rename fails while performing the message erase.
10961+
mb.mfn = _EMPTY_
10962+
mb.mu.Unlock()
10963+
fs.EraseMsg(2)
10964+
})
10965+
}

server/jetstream_cluster_1_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10395,6 +10395,58 @@ func TestJetStreamClusterCatchupSkipMsgDesync(t *testing.T) {
1039510395
}
1039610396
}
1039710397

10398+
func TestJetStreamClusterJszRaftLeaderReporting(t *testing.T) {
10399+
c := createJetStreamClusterExplicit(t, "R3S", 3)
10400+
defer c.shutdown()
10401+
10402+
nc, js := jsClientConnect(t, c.randomServer())
10403+
defer nc.Close()
10404+
10405+
_, err := js.AddStream(&nats.StreamConfig{
10406+
Name: "TEST",
10407+
Subjects: []string{"foo"},
10408+
Replicas: 3,
10409+
})
10410+
require_NoError(t, err)
10411+
10412+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
10413+
Durable: "DURABLE",
10414+
Replicas: 3,
10415+
})
10416+
require_NoError(t, err)
10417+
10418+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
10419+
for _, s := range c.servers {
10420+
_, _, jsa := s.globalAccount().getJetStreamFromAccount()
10421+
if !jsa.streamAssigned("TEST") {
10422+
return fmt.Errorf("stream not assigned on %s", s.Name())
10423+
}
10424+
if !jsa.consumerAssigned("TEST", "DURABLE") {
10425+
return fmt.Errorf("consumer not assigned on %s", s.Name())
10426+
}
10427+
}
10428+
return nil
10429+
})
10430+
10431+
sl := c.streamLeader(globalAccountName, "TEST")
10432+
cl := c.consumerLeader(globalAccountName, "TEST", "DURABLE")
10433+
10434+
for _, s := range c.servers {
10435+
jsi, err := s.Jsz(&JSzOptions{RaftGroups: true})
10436+
require_NoError(t, err)
10437+
if s == sl {
10438+
require_Equal(t, jsi.StreamsLeader, 1)
10439+
} else {
10440+
require_Equal(t, jsi.StreamsLeader, 0)
10441+
}
10442+
if s == cl {
10443+
require_Equal(t, jsi.ConsumersLeader, 1)
10444+
} else {
10445+
require_Equal(t, jsi.ConsumersLeader, 0)
10446+
}
10447+
}
10448+
}
10449+
1039810450
//
1039910451
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
1040010452
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

server/monitor.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2966,18 +2966,20 @@ type MetaClusterInfo struct {
29662966
// JSInfo has detailed information on JetStream.
29672967
type JSInfo struct {
29682968
JetStreamStats
2969-
ID string `json:"server_id"`
2970-
Now time.Time `json:"now"`
2971-
Disabled bool `json:"disabled,omitempty"`
2972-
Config JetStreamConfig `json:"config,omitempty"`
2973-
Limits *JSLimitOpts `json:"limits,omitempty"`
2974-
Streams int `json:"streams"`
2975-
Consumers int `json:"consumers"`
2976-
Messages uint64 `json:"messages"`
2977-
Bytes uint64 `json:"bytes"`
2978-
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
2979-
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
2980-
Total int `json:"total"`
2969+
ID string `json:"server_id"`
2970+
Now time.Time `json:"now"`
2971+
Disabled bool `json:"disabled,omitempty"`
2972+
Config JetStreamConfig `json:"config,omitempty"`
2973+
Limits *JSLimitOpts `json:"limits,omitempty"`
2974+
Streams int `json:"streams"`
2975+
StreamsLeader int `json:"streams_leader,omitempty"`
2976+
Consumers int `json:"consumers"`
2977+
ConsumersLeader int `json:"consumers_leader,omitempty"`
2978+
Messages uint64 `json:"messages"`
2979+
Bytes uint64 `json:"bytes"`
2980+
Meta *MetaClusterInfo `json:"meta_cluster,omitempty"`
2981+
AccountDetails []*AccountDetail `json:"account_details,omitempty"`
2982+
Total int `json:"total"`
29812983
}
29822984

29832985
func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail {
@@ -3197,6 +3199,16 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
31973199
jsi.Messages += streamState.Msgs
31983200
jsi.Bytes += streamState.Bytes
31993201
jsi.Consumers += streamState.Consumers
3202+
if opts.RaftGroups {
3203+
if node := stream.raftNode(); node == nil || node.Leader() {
3204+
jsi.StreamsLeader++
3205+
}
3206+
for _, consumer := range stream.getPublicConsumers() {
3207+
if node := consumer.raftNode(); node == nil || node.Leader() {
3208+
jsi.ConsumersLeader++
3209+
}
3210+
}
3211+
}
32003212
}
32013213
}
32023214

server/reload.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ func (p *proxiesReload) Apply(s *Server) {
10771077
c.setAuthError(ErrAuthProxyNotTrusted)
10781078
c.authViolation()
10791079
}
1080-
s.Noticef("Reloaded: proxies trusted keys %q were removed", p.add)
1080+
s.Noticef("Reloaded: proxies trusted keys %q were removed", p.del)
10811081
}
10821082
if len(p.add) > 0 {
10831083
s.Noticef("Reloaded: proxies trusted keys %q were added", p.add)

test/client_auth_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919
"net"
2020
"os"
21+
"strings"
2122
"testing"
2223
"time"
2324

@@ -160,6 +161,18 @@ func TestClientConnectInfo(t *testing.T) {
160161
}
161162
}
162163

164+
type captureProxiesReloadLogger struct {
165+
dummyLogger
166+
ch chan string
167+
}
168+
169+
func (l *captureProxiesReloadLogger) Noticef(format string, args ...any) {
170+
msg := fmt.Sprintf(format, args...)
171+
if strings.Contains(msg, "proxies trusted keys") {
172+
l.ch <- msg
173+
}
174+
}
175+
163176
func TestProxyKeyVerification(t *testing.T) {
164177
u1, _ := nkeys.CreateUser()
165178
u1Pub, _ := u1.PublicKey()
@@ -343,10 +356,32 @@ func TestProxyKeyVerification(t *testing.T) {
343356
cid2 := currentCID
344357
checkLeafNodeConnected(t, s)
345358

359+
logger := &captureProxiesReloadLogger{ch: make(chan string, 10)}
360+
s.SetLogger(logger, false, false)
361+
346362
os.WriteFile(conf, fmt.Appendf(nil, tmpl, u3Pub, u2Pub), 0660)
347363
if err := s.Reload(); err != nil {
348364
t.Fatalf("Reload failed: %v", err)
349365
}
366+
for range 2 {
367+
select {
368+
case str := <-logger.ch:
369+
if strings.Contains(str, "removed") {
370+
if !strings.Contains(str, u1Pub) {
371+
t.Fatalf("Expected removed trace to include %q, it did not: %s", u1Pub, str)
372+
}
373+
} else if strings.Contains(str, "added") {
374+
if !strings.Contains(str, u3Pub) {
375+
t.Fatalf("Expected added trace to include %q, it did not: %s", u3Pub, str)
376+
}
377+
} else {
378+
t.Fatalf("Unexpected log: %q", str)
379+
}
380+
default:
381+
t.Fatal("Expected a log, did not get one")
382+
}
383+
}
384+
350385
// Connections should get disconnected.
351386
// We need to consume what is sent by the server, but for leaf we may
352387
// get some LS+, etc... so just consumer until we get the io.EOF

0 commit comments

Comments
 (0)