7
7
use crate :: errors:: { CommitError , MismatchedRackIdError , ReconfigurationError } ;
8
8
use crate :: validators:: ValidatedReconfigureMsg ;
9
9
use crate :: {
10
- CoordinatorState , Envelope , Epoch , PersistentState , PlatformId , messages:: * ,
10
+ Alarm , CoordinatorState , Envelope , Epoch , PersistentState , PlatformId ,
11
+ messages:: * ,
11
12
} ;
12
13
use gfss:: shamir:: Share ;
13
14
use omicron_uuid_kinds:: RackUuid ;
@@ -110,10 +111,14 @@ impl Node {
110
111
if last_prepared_epoch != Some ( epoch) {
111
112
error ! (
112
113
self . log,
113
- "Commit message occurred out of order" ;
114
+ "Protocol invariant violation: commit message out of order" ;
114
115
"epoch" => %epoch,
115
116
"last_prepared_epoch" => ?last_prepared_epoch
116
117
) ;
118
+ self . persistent_state . add_alarm ( Alarm :: OutOfOrderCommit {
119
+ last_prepared_epoch,
120
+ commit_epoch : epoch,
121
+ } ) ;
117
122
return Err ( CommitError :: OutOfOrderCommit ) ;
118
123
}
119
124
if let Some ( prepare) = self . persistent_state . prepares . get ( & epoch) {
@@ -136,11 +141,19 @@ impl Node {
136
141
//
137
142
// Nexus should instead tell this node to retrieve a `Prepare`
138
143
// from another node that has already committed.
139
- error ! (
140
- self . log,
141
- "tried to commit a configuration, but missing prepare msg" ;
142
- "epoch" => %epoch
144
+ //
145
+ // This is a protocol bug because nexus must have seen an
146
+ // acknowledgement that this node had a `PrepareMsg` or erroneously
147
+ // sent a `Commit` anyway. This is a less serious error than other
148
+ // invariant violations since it can be recovered from. However, it
149
+ // is still worthy of an alarm, as the most likely case is a disk/
150
+ // ledger failure.
151
+ let err_msg = concat ! (
152
+ "Protocol invariant violation: triedi to " ,
153
+ "commit a configuration, but missing prepare msg"
143
154
) ;
155
+ error ! ( self . log, "{err_msg}" ; "epoch" => %epoch) ;
156
+ self . persistent_state . add_alarm ( Alarm :: MissingPrepare { epoch } ) ;
144
157
return Err ( CommitError :: MissingPrepare ) ;
145
158
}
146
159
@@ -178,6 +191,7 @@ impl Node {
178
191
msg : PeerMsg ,
179
192
) -> Option < PersistentState > {
180
193
match msg {
194
+ PeerMsg :: Prepare ( msg) => self . handle_prepare ( outbox, from, msg) ,
181
195
PeerMsg :: PrepareAck ( epoch) => {
182
196
self . handle_prepare_ack ( from, epoch) ;
183
197
None
@@ -196,6 +210,138 @@ impl Node {
196
210
self . coordinator_state . as_ref ( )
197
211
}
198
212
213
+ fn handle_prepare (
214
+ & mut self ,
215
+ outbox : & mut Vec < Envelope > ,
216
+ from : PlatformId ,
217
+ msg : PrepareMsg ,
218
+ ) -> Option < PersistentState > {
219
+ // TODO: check rack_id
220
+ if let Some ( last_prepared_epoch) =
221
+ self . persistent_state . last_prepared_epoch ( )
222
+ {
223
+ // Is this an old request?
224
+ if msg. config . epoch < last_prepared_epoch {
225
+ warn ! ( self . log, "Received stale prepare" ;
226
+ "from" => %from,
227
+ "msg_epoch" => %msg. config. epoch,
228
+ "last_prepared_epoch" => %last_prepared_epoch
229
+ ) ;
230
+ // TODO: Respond to sender?
231
+ return None ;
232
+ }
233
+
234
+ // Idempotent request
235
+ if msg. config . epoch == last_prepared_epoch {
236
+ return None ;
237
+ }
238
+
239
+ // Does the last committed epoch match what we have?
240
+ let msg_last_committed_epoch =
241
+ msg. config . previous_configuration . as_ref ( ) . map ( |p| p. epoch ) ;
242
+ let last_committed_epoch =
243
+ self . persistent_state . last_committed_epoch ( ) ;
244
+ if msg_last_committed_epoch != last_committed_epoch {
245
+ // If the msg contains an older last_committed_epoch than what
246
+ // we have, then out of order commits have occurred, as we know
247
+ // this prepare is later than what we've seen. This is a critical
248
+ // protocol invariant that has been violated.
249
+ //
250
+ // If the msg contains a newer last_committed_epoch than what
251
+ // we have, then we have likely missed a commit and are behind
252
+ // by more than one reconfiguration. The protocol currently does
253
+ // not allow this. Future protocol implementations may provide a
254
+ // capability to "jump" configurations.
255
+ //
256
+ // If there is a `None`/`Some` mismatch, then again, an invariant
257
+ // has been violated somewhere. The coordinator should know
258
+ // whether this is a "new" node or not, which would have a "None"
259
+ // configuration.
260
+ let err_msg = concat ! (
261
+ "Protocol invariant violation: " ,
262
+ "PrepareMsg last_committed_epoch does not match ours"
263
+ ) ;
264
+ error ! ( self . log, "{err_msg}" ;
265
+ "msg_last_committed_epoch" => ?msg_last_committed_epoch,
266
+ "last_committed_epoch" => ?last_committed_epoch
267
+ ) ;
268
+ self . persistent_state . add_alarm (
269
+ Alarm :: PrepareLastCommittedEpochMismatch {
270
+ prepare_last_committed_epoch : msg_last_committed_epoch,
271
+ persisted_prepare_last_committed_epoch :
272
+ last_committed_epoch,
273
+ } ,
274
+ ) ;
275
+ return Some ( self . persistent_state . clone ( ) ) ;
276
+ }
277
+
278
+ // The prepare is up-to-date with respect to our persistent state
279
+ } ;
280
+
281
+ // Are we currently trying to coordinate?
282
+ if let Some ( cs) = & self . coordinator_state {
283
+ let coordinating_epoch = cs. reconfigure_msg ( ) . epoch ( ) ;
284
+ if coordinating_epoch > msg. config . epoch {
285
+ warn ! ( self . log, "Received stale prepare while coordinating" ;
286
+ "from" => %from,
287
+ "msg_epoch" => %msg. config. epoch,
288
+ "epoch" => %cs. reconfigure_msg( ) . epoch( )
289
+ ) ;
290
+ return None ;
291
+ }
292
+ if coordinating_epoch == msg. config . epoch {
293
+ // TODO: Track these in an "invariant violation alarm" struct
294
+ // that can be queried.
295
+ let err_msg = concat ! (
296
+ "Protocol invariant violation: Nexus has told different" ,
297
+ "nodes to coordinate the same reconfiguration."
298
+ ) ;
299
+ error ! ( self . log, "{err_msg}" ;
300
+ "epoch" => %coordinating_epoch,
301
+ "from" => %from,
302
+ "id" => %self . platform_id
303
+
304
+ ) ;
305
+ self . persistent_state . add_alarm (
306
+ Alarm :: DifferentNodesCoordinatingSameEpoch {
307
+ epoch : coordinating_epoch,
308
+ them : from,
309
+ us : self . platform_id . clone ( ) ,
310
+ } ,
311
+ ) ;
312
+ return Some ( self . persistent_state . clone ( ) ) ;
313
+ }
314
+
315
+ // This prepare is for a newer configuration than the one we
316
+ // are currently coordinating. We must implicitly cancel our
317
+ // coordination as Nexus has moved on.
318
+ let cancel_msg = concat ! (
319
+ "Received a prepare for newer configuration." ,
320
+ "Cancelling our coordination."
321
+ ) ;
322
+ info ! ( self . log, "{cancel_msg}" ;
323
+ "msg_epoch" => %msg. config. epoch,
324
+ "epoch" => %coordinating_epoch,
325
+ "from" => %from
326
+ ) ;
327
+ }
328
+
329
+ // Acknowledge the PrepareMsg
330
+ outbox. push ( Envelope {
331
+ to : from,
332
+ from : self . platform_id . clone ( ) ,
333
+ msg : PeerMsg :: PrepareAck ( msg. config . epoch ) ,
334
+ } ) ;
335
+
336
+ // Add the prepare to our `PersistentState`
337
+ self . persistent_state . prepares . insert ( msg. config . epoch , msg) ;
338
+
339
+ // Stop coordinating
340
+ self . coordinator_state = None ;
341
+
342
+ Some ( self . persistent_state . clone ( ) )
343
+ }
344
+
199
345
// Handle receipt of a `PrepareAck` message
200
346
fn handle_prepare_ack ( & mut self , from : PlatformId , epoch : Epoch ) {
201
347
// Are we coordinating for this epoch?
0 commit comments