Skip to content

Commit a7e89d1

Browse files
committed
Make stream snapshots asynchronous
The monitorStream function can block for a long time when creating and installing a snapshot of a stream's state. This can lead to increased tail latency. This commit extends the RaftNode interface with a new InstallSnapshotAsync method. This method performs snapshot writing and WAL compaction in a in a separate goroutine, making the process non-blocking. The existing InstallSnapshot method is now a synchronous wrapper around the new asynchronous implementation. Signed-off-by: Daniele Sciascia <[email protected]>
1 parent e2661b5 commit a7e89d1

File tree

2 files changed

+145
-33
lines changed

2 files changed

+145
-33
lines changed

server/jetstream_cluster.go

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2519,9 +2519,21 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25192519
// fully recovered from disk.
25202520
isRecovering := true
25212521

2522-
doSnapshot := func() {
2522+
snapState := struct {
2523+
inProgress bool
2524+
curState SimpleState
2525+
ch chan InstalledSnapshot
2526+
}{
2527+
ch: make(chan InstalledSnapshot, 1),
2528+
}
2529+
2530+
wantSnapshot := func() bool {
25232531
if mset == nil || isRecovering || isRestore {
2524-
return
2532+
return false
2533+
}
2534+
2535+
if snapState.inProgress {
2536+
return false
25252537
}
25262538

25272539
// Before we actually calculate the detailed state and encode it, let's check the
@@ -2534,18 +2546,56 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25342546
// consumers on idle streams but better to be safe than sorry!
25352547
ne, nb := n.Size()
25362548
if curState == lastState && ne < compactNumMin && nb < compactSizeMin {
2537-
return
2549+
return false
2550+
}
2551+
2552+
snapState.curState = curState
2553+
return true
2554+
}
2555+
2556+
handleSnapshotErr := func(err error) {
2557+
switch err {
2558+
case nil:
2559+
lastState = snapState.curState
2560+
case errNoSnapAvailable, errNodeClosed, errCatchupsRunning:
2561+
// ignore the error
2562+
default:
2563+
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v",
2564+
mset.acc.Name, mset.name(), n.Group(), err)
2565+
}
2566+
2567+
}
2568+
2569+
doSnapshot := func() {
2570+
if wantSnapshot() {
2571+
// Make sure all pending data is flushed before allowing snapshots.
2572+
mset.flushAllPending()
2573+
err := n.InstallSnapshot(mset.stateSnapshot())
2574+
handleSnapshotErr(err)
25382575
}
2576+
}
25392577

2540-
// Make sure all pending data is flushed before allowing snapshots.
2541-
mset.flushAllPending()
2542-
if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
2543-
lastState = curState
2544-
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
2545-
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
2578+
doSnapshotAsync := func() {
2579+
if wantSnapshot() {
2580+
// Make sure all pending data is flushed before allowing snapshots.
2581+
mset.flushAllPending()
2582+
n.InstallSnapshotAsync(mset.stateSnapshot(), snapState.ch)
2583+
snapState.inProgress = true
25462584
}
25472585
}
25482586

2587+
snapshotDone := func(snap InstalledSnapshot) {
2588+
handleSnapshotErr(snap.Err)
2589+
snapState.inProgress = false
2590+
}
2591+
2592+
defer func() {
2593+
if snapState.inProgress {
2594+
s := <-snapState.ch
2595+
snapshotDone(s)
2596+
}
2597+
}()
2598+
25492599
// We will establish a restoreDoneCh no matter what. Will never be triggered unless
25502600
// we replace with the restore chan.
25512601
restoreDoneCh := make(<-chan error)
@@ -2617,6 +2667,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
26172667

26182668
for {
26192669
select {
2670+
case s := <-snapState.ch:
2671+
snapshotDone(s)
26202672
case <-s.quitCh:
26212673
// Server shutting down, but we might receive this before qch, so try to snapshot.
26222674
doSnapshot()
@@ -2726,7 +2778,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
27262778
// Check about snapshotting
27272779
// If we have at least min entries to compact, go ahead and try to snapshot/compact.
27282780
if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs {
2729-
doSnapshot()
2781+
doSnapshotAsync()
27302782
}
27312783

27322784
case isLeader = <-lch:
@@ -2822,7 +2874,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
28222874
stopDirectMonitoring()
28232875

28242876
case <-t.C:
2825-
doSnapshot()
2877+
doSnapshotAsync()
28262878

28272879
case <-uch:
28282880
// keep stream assignment current

server/raft.go

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type RaftNode interface {
4242
ProposeMulti(entries []*Entry) error
4343
ForwardProposal(entry []byte) error
4444
InstallSnapshot(snap []byte) error
45+
InstallSnapshotAsync(snap []byte, ch chan<- InstalledSnapshot)
4546
SendSnapshot(snap []byte) error
4647
NeedSnapshot() bool
4748
Applied(index uint64) (entries uint64, bytes uint64)
@@ -230,6 +231,7 @@ type raft struct {
230231
observer bool // The node is observing, i.e. not able to become leader
231232
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
232233
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
234+
snapshotting bool // Snapshot is in progress
233235
}
234236

235237
type proposedEntry struct {
@@ -307,6 +309,7 @@ var (
307309
errNodeClosed = errors.New("raft: node is closed")
308310
errBadSnapName = errors.New("raft: snapshot name could not be parsed")
309311
errNoSnapAvailable = errors.New("raft: no snapshot available")
312+
errSnapInProgress = errors.New("raft: snapshot is already in progress")
310313
errCatchupsRunning = errors.New("raft: snapshot can not be installed while catchups running")
311314
errSnapshotCorrupt = errors.New("raft: snapshot corrupt")
312315
errTooManyPrefs = errors.New("raft: stepdown requires at most one preferred new leader")
@@ -1242,49 +1245,106 @@ func (n *raft) SendSnapshot(data []byte) error {
12421245
return nil
12431246
}
12441247

1245-
// Used to install a snapshot for the given term and applied index. This will release
1246-
// all of the log entries up to and including index. This should not be called with
1247-
// entries that have been applied to the FSM but have not been applied to the raft state.
1248-
func (n *raft) InstallSnapshot(data []byte) error {
1248+
type InstalledSnapshot struct {
1249+
Term uint64
1250+
Index uint64
1251+
Path string
1252+
Err error
1253+
}
1254+
1255+
func (n *raft) installSnapshotAsync(encoded []byte, snap InstalledSnapshot,
1256+
ch chan<- InstalledSnapshot) {
1257+
go func() {
1258+
snap.Err = writeFileWithSync(snap.Path, encoded, defaultFilePerms)
1259+
n.Lock()
1260+
if n.State() == Closed {
1261+
snap.Err = errNodeClosed
1262+
}
1263+
if snap.Err == nil {
1264+
// Delete our previous snapshot file if it exists.
1265+
if n.snapfile != _EMPTY_ && n.snapfile != snap.Path {
1266+
os.Remove(n.snapfile)
1267+
}
1268+
// Remember our latest snapshot file.
1269+
n.snapfile = snap.Path
1270+
_, snap.Err = n.wal.Compact(snap.Index + 1)
1271+
if snap.Err != nil {
1272+
n.setWriteErrLocked(snap.Err)
1273+
} else {
1274+
var state StreamState
1275+
n.wal.FastState(&state)
1276+
n.papplied = snap.Index
1277+
n.bytes = state.Bytes
1278+
}
1279+
}
1280+
n.snapshotting = false
1281+
n.Unlock()
1282+
ch <- snap
1283+
}()
1284+
}
1285+
1286+
// InstallSnapshotAsync installs a snapshot asynchronously. It writes the
1287+
// snapshot to disk and compacts the WAL in a separate goroutine. The caller
1288+
// is notified of the result on the provided channel.
1289+
func (n *raft) InstallSnapshotAsync(data []byte, ch chan<- InstalledSnapshot) {
12491290
if n.State() == Closed {
1250-
return errNodeClosed
1291+
ch <- InstalledSnapshot{Err: errNodeClosed}
1292+
return
12511293
}
12521294

12531295
n.Lock()
12541296
defer n.Unlock()
12551297

1298+
if n.snapshotting {
1299+
ch <- InstalledSnapshot{Err: errSnapInProgress}
1300+
return
1301+
}
1302+
12561303
// If a write error has occurred already then stop here.
1257-
if werr := n.werr; werr != nil {
1258-
return werr
1304+
if n.werr != nil {
1305+
ch <- InstalledSnapshot{Err: n.werr}
1306+
return
12591307
}
12601308

1261-
// Check that a catchup isn't already taking place. If it is then we won't
1262-
// allow installing snapshots until it is done.
1309+
// Check that a catchup isn't already taking place. If it is then we
1310+
// won't allow installing snapshots until it is done.
12631311
if len(n.progress) > 0 || n.paused {
1264-
return errCatchupsRunning
1312+
ch <- InstalledSnapshot{Err: errCatchupsRunning}
1313+
return
12651314
}
12661315

12671316
if n.applied == 0 {
1268-
n.debug("Not snapshotting as there are no applied entries")
1269-
return errNoSnapAvailable
1317+
ch <- InstalledSnapshot{Err: errNoSnapAvailable}
1318+
return
12701319
}
12711320

1272-
var term uint64
1273-
if ae, _ := n.loadEntry(n.applied); ae != nil {
1274-
term = ae.term
1275-
} else {
1276-
n.debug("Not snapshotting as entry %d is not available", n.applied)
1277-
return errNoSnapAvailable
1321+
ae, _ := n.loadEntry(n.applied)
1322+
if ae == nil {
1323+
ch <- InstalledSnapshot{Err: errNoSnapAvailable}
1324+
return
12781325
}
12791326

1280-
n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied)
1281-
1282-
return n.installSnapshot(&snapshot{
1283-
lastTerm: term,
1327+
encoded := n.encodeSnapshot(&snapshot{
1328+
lastTerm: ae.term,
12841329
lastIndex: n.applied,
12851330
peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}),
12861331
data: data,
12871332
})
1333+
1334+
snapDir := filepath.Join(n.sd, snapshotsDir)
1335+
snapFile := filepath.Join(snapDir, fmt.Sprintf(snapFileT, ae.term, n.applied))
1336+
snap := InstalledSnapshot{Term: ae.term, Index: n.applied, Path: snapFile}
1337+
n.installSnapshotAsync(encoded, snap, ch)
1338+
}
1339+
1340+
// InstallSnapshot installs a snapshot for the current applied index. This is a
1341+
// synchronous call that will block until the snapshot is installed, and will
1342+
// release all of the log entries up to the applied index.
1343+
func (n *raft) InstallSnapshot(data []byte) error {
1344+
ch := make(chan InstalledSnapshot, 1)
1345+
n.InstallSnapshotAsync(data, ch)
1346+
snap := <-ch
1347+
return snap.Err
12881348
}
12891349

12901350
// Install the snapshot.

0 commit comments

Comments
 (0)