@@ -42,6 +42,7 @@ type RaftNode interface {
4242 ProposeMulti (entries []* Entry ) error
4343 ForwardProposal (entry []byte ) error
4444 InstallSnapshot (snap []byte ) error
45+ InstallSnapshotAsync (snap []byte , ch chan <- InstalledSnapshot )
4546 SendSnapshot (snap []byte ) error
4647 NeedSnapshot () bool
4748 Applied (index uint64 ) (entries uint64 , bytes uint64 )
@@ -230,6 +231,7 @@ type raft struct {
230231 observer bool // The node is observing, i.e. not able to become leader
231232 initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
232233 scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
234+ snapshotting bool // Snapshot is in progress
233235}
234236
235237type proposedEntry struct {
@@ -307,6 +309,7 @@ var (
307309 errNodeClosed = errors .New ("raft: node is closed" )
308310 errBadSnapName = errors .New ("raft: snapshot name could not be parsed" )
309311 errNoSnapAvailable = errors .New ("raft: no snapshot available" )
312+ errSnapInProgress = errors .New ("raft: snapshot is already in progress" )
310313 errCatchupsRunning = errors .New ("raft: snapshot can not be installed while catchups running" )
311314 errSnapshotCorrupt = errors .New ("raft: snapshot corrupt" )
312315 errTooManyPrefs = errors .New ("raft: stepdown requires at most one preferred new leader" )
@@ -1242,49 +1245,103 @@ func (n *raft) SendSnapshot(data []byte) error {
12421245 return nil
12431246}
12441247
1245- // Used to install a snapshot for the given term and applied index. This will release
1246- // all of the log entries up to and including index. This should not be called with
1247- // entries that have been applied to the FSM but have not been applied to the raft state.
1248- func (n * raft ) InstallSnapshot (data []byte ) error {
1248+ type InstalledSnapshot struct {
1249+ Term uint64
1250+ Index uint64
1251+ Path string
1252+ Err error
1253+ }
1254+
1255+ func (n * raft ) installSnapshotAsync (encoded []byte , snap InstalledSnapshot ,
1256+ ch chan <- InstalledSnapshot ) {
1257+ go func () {
1258+ snap .Err = writeFileWithSync (snap .Path , encoded , defaultFilePerms )
1259+ n .Lock ()
1260+ if snap .Err == nil {
1261+ // Delete our previous snapshot file if it exists.
1262+ if n .snapfile != _EMPTY_ && n .snapfile != snap .Path {
1263+ os .Remove (n .snapfile )
1264+ }
1265+ // Remember our latest snapshot file.
1266+ n .snapfile = snap .Path
1267+ _ , snap .Err = n .wal .Compact (snap .Index + 1 )
1268+ if snap .Err != nil {
1269+ n .setWriteErrLocked (snap .Err )
1270+ } else {
1271+ var state StreamState
1272+ n .wal .FastState (& state )
1273+ n .papplied = snap .Index
1274+ n .bytes = state .Bytes
1275+ }
1276+ }
1277+ n .snapshotting = false
1278+ n .Unlock ()
1279+ ch <- snap
1280+ }()
1281+ }
1282+
1283+ // InstallSnapshotAsync installs a snapshot asynchronously. It writes the
1284+ // snapshot to disk and compacts the WAL in a separate goroutine. The caller
1285+ // is notified of the result on the provided channel.
1286+ func (n * raft ) InstallSnapshotAsync (data []byte , ch chan <- InstalledSnapshot ) {
12491287 if n .State () == Closed {
1250- return errNodeClosed
1288+ ch <- InstalledSnapshot {Err : errNodeClosed }
1289+ return
12511290 }
12521291
12531292 n .Lock ()
12541293 defer n .Unlock ()
12551294
1295+ if n .snapshotting {
1296+ ch <- InstalledSnapshot {Err : errSnapInProgress }
1297+ return
1298+ }
1299+
12561300 // If a write error has occurred already then stop here.
1257- if werr := n .werr ; werr != nil {
1258- return werr
1301+ if n .werr != nil {
1302+ ch <- InstalledSnapshot {Err : n .werr }
1303+ return
12591304 }
12601305
1261- // Check that a catchup isn't already taking place. If it is then we won't
1262- // allow installing snapshots until it is done.
1306+ // Check that a catchup isn't already taking place. If it is then we
1307+ // won't allow installing snapshots until it is done.
12631308 if len (n .progress ) > 0 || n .paused {
1264- return errCatchupsRunning
1309+ ch <- InstalledSnapshot {Err : errCatchupsRunning }
1310+ return
12651311 }
12661312
12671313 if n .applied == 0 {
1268- n . debug ( "Not snapshotting as there are no applied entries" )
1269- return errNoSnapAvailable
1314+ ch <- InstalledSnapshot { Err : errNoSnapAvailable }
1315+ return
12701316 }
12711317
1272- var term uint64
1273- if ae , _ := n .loadEntry (n .applied ); ae != nil {
1274- term = ae .term
1275- } else {
1276- n .debug ("Not snapshotting as entry %d is not available" , n .applied )
1277- return errNoSnapAvailable
1318+ ae , _ := n .loadEntry (n .applied )
1319+ if ae == nil {
1320+ ch <- InstalledSnapshot {Err : errNoSnapAvailable }
1321+ return
12781322 }
12791323
1280- n .debug ("Installing snapshot of %d bytes [%d:%d]" , len (data ), term , n .applied )
1281-
1282- return n .installSnapshot (& snapshot {
1283- lastTerm : term ,
1324+ peers := encodePeerState (& peerState {n .peerNames (), n .csz , n .extSt })
1325+ encoded := n .encodeSnapshot (& snapshot {
1326+ lastTerm : ae .term ,
12841327 lastIndex : n .applied ,
1285- peerstate : encodePeerState (& peerState {n .peerNames (), n .csz , n .extSt }),
1286- data : data ,
1287- })
1328+ peerstate : peers ,
1329+ data : data })
1330+
1331+ snapDir := filepath .Join (n .sd , snapshotsDir )
1332+ snapFile := filepath .Join (snapDir , fmt .Sprintf (snapFileT , ae .term , n .applied ))
1333+ snap := InstalledSnapshot {Term : ae .term , Index : n .applied , Path : snapFile }
1334+ n .installSnapshotAsync (encoded , snap , ch )
1335+ }
1336+
1337+ // InstallSnapshot installs a snapshot for the current applied index. This is a
1338+ // synchronous call that will block until the snapshot is installed, and will
1339+ // release all of the log entries up to the applied index.
1340+ func (n * raft ) InstallSnapshot (data []byte ) error {
1341+ ch := make (chan InstalledSnapshot , 1 )
1342+ n .InstallSnapshotAsync (data , ch )
1343+ snap := <- ch
1344+ return snap .Err
12881345}
12891346
12901347// Install the snapshot.
0 commit comments