@@ -76,7 +76,7 @@ func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common
7676
7777// handleDelegateMessagePublication converts the MessagePublication into a DelegateObservation and sends it to the delegateObsvSendC channel.
7878// This should only be called by a delegated guardian for the chain.
79- func (p * Processor ) handleDelegateMessagePublication (k * node_common.MessagePublication ) error {
79+ func (p * Processor ) handleDelegateMessagePublication (k * node_common.MessagePublication ) error {
8080 p .logger .Info ("handleDelegateMessagePublication: CALLED - converting message to delegate observation" ,
8181 zap .String ("msgID" , k .MessageIDString ()),
8282 zap .Uint32 ("emitter_chain" , uint32 (k .EmitterChain )),
@@ -466,62 +466,66 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQu
466466}
467467
468468// handleDelegateObservation processes a delegate observation
469- func (p * Processor ) handleDelegateObservation (ctx context.Context , m * gossipv1.DelegateObservation ) {
470- if p .logger .Core ().Enabled (zapcore .DebugLevel ) {
471- p .logger .Debug ("received delegate observation" ,
469+ func (p * Processor ) handleDelegateObservation (ctx context.Context , m * gossipv1.DelegateObservation ) error {
470+ if p .logger .Core ().Enabled (zapcore .DebugLevel ) {
471+ p .logger .Debug ("received delegate observation" ,
472472 // TODO(delegated-guardian-sets): Add additional relevant fields if necessary
473- zap .Uint32 ("emitter_chain" , m .EmitterChain ),
474- zap .Uint64 ("sequence" , m .Sequence ),
475- zap .String ("txhash" , hex .EncodeToString (m .TxHash )),
476- zap .String ("txhash_b58" , base58 .Encode (m .TxHash )),
473+ zap .Uint32 ("emitter_chain" , m .EmitterChain ),
474+ zap .Uint64 ("sequence" , m .Sequence ),
475+ zap .String ("txhash" , hex .EncodeToString (m .TxHash )),
476+ zap .String ("txhash_b58" , base58 .Encode (m .TxHash )),
477477 zap .String ("guardian_addr" , hex .EncodeToString (m .GuardianAddr )),
478- )
479- }
478+ )
479+ }
480+
481+ c , err := vaa .ChainIDFromNumber (m .EmitterChain )
482+ if err != nil {
483+ p .logger .Warn ("invalid delegate observation emitter chain" , zap .Error (err ))
484+ return nil
485+ }
480486
481- c := vaa .ChainID (m .EmitterChain )
482-
483487 cfg := p .dgc .GetChainConfig (c )
484488 if cfg == nil {
485489 p .logger .Debug ("ignoring delegate observation for chain without delegate chain config" ,
486490 zap .Stringer ("emitter_chain" , c ),
487491 zap .Uint64 ("sequence" , m .Sequence ),
488492 )
489- return
493+ return nil
490494 }
491495
492- _ , ok := cfg .KeyIndex (p .ourAddr )
496+ _ , ok := cfg .KeyIndex (p .ourAddr )
493497 if ok {
494- p .logger .Debug ("ignoring delegate observation since we are a delegated guardian for this chain" ,
498+ p .logger .Debug ("ignoring delegate observation since we are a delegated guardian for this chain" ,
495499 zap .Stringer ("emitter_chain" , c ),
496- zap .Uint64 ("sequence" , m .Sequence ),
500+ zap .Uint64 ("sequence" , m .Sequence ),
497501 )
498- return
502+ return nil
499503 }
500504
501505 addr := common .BytesToAddress (m .GuardianAddr )
502506 _ , ok = cfg .KeyIndex (addr )
503507 if ! ok {
504- p .logger .Debug ("ignoring delegate observation from non-delegated guardian for this chain" ,
508+ p .logger .Debug ("ignoring delegate observation from non-delegated guardian for this chain" ,
505509 zap .Stringer ("emitter_chain" , c ),
506- zap .Uint64 ("sequence" , m .Sequence ),
510+ zap .Uint64 ("sequence" , m .Sequence ),
507511 zap .String ("guardian" , addr .Hex ()),
508512 )
509- return
513+ return nil
510514 }
511515
512- p .handleCanonicalDelegateObservation (ctx , cfg , m )
516+ return p .handleCanonicalDelegateObservation (ctx , cfg , m )
513517}
514518
515519// handleCanonicalDelegateObservation processes a delegate observation as a canonical guardian
516520// This function assumes cfg corresponds to m.EmitterChain
517521// TODO(delegated-guardian-sets): Should ^ be explicitly asserted?
518- func (p * Processor ) handleCanonicalDelegateObservation (ctx context.Context , cfg * DelegateGuardianChainConfig , m * gossipv1.DelegateObservation ) {
522+ func (p * Processor ) handleCanonicalDelegateObservation (ctx context.Context , cfg * DelegateGuardianChainConfig , m * gossipv1.DelegateObservation ) error {
519523 mp , err := delegateObservationToMessagePublication (m )
520- if err != nil {
521- p .logger .Warn ("failed to convert delegate observation to message publication" , zap .Error (err ))
522- return
523- }
524-
524+ if err != nil {
525+ p .logger .Warn ("failed to convert delegate observation to message publication" , zap .Error (err ))
526+ return nil
527+ }
528+
525529 hash := mp .CreateDigest ()
526530
527531 // Get / create our state entry.
@@ -539,15 +543,16 @@ func (p *Processor) handleCanonicalDelegateObservation(ctx context.Context, cfg
539543 s .observations [addr ] = m
540544
541545 if ! s .submitted {
542- p .checkForDelegateQuorum (ctx , mp , s , cfg )
546+ return p .checkForDelegateQuorum (ctx , mp , s , cfg )
543547 }
548+ return nil
544549}
545550
546- // checkForDelegateQuorum checks for quorum after a delegate observation has been added to the state. If quorum is met, it runs the converted
551+ // checkForDelegateQuorum checks for quorum after a delegate observation has been added to the state. If quorum is met, it runs the converted
547552// MessagePublication through the normal message pipeline.
548553// This function assumes mp corresponds to s
549554// TODO(delegated-guardian-sets): Should ^ be explicitly asserted?
550- func (p * Processor ) checkForDelegateQuorum (ctx context.Context , mp * node_common.MessagePublication , s * delegateState , dgs * DelegateGuardianChainConfig ) {
555+ func (p * Processor ) checkForDelegateQuorum (ctx context.Context , mp * node_common.MessagePublication , s * delegateState , dgs * DelegateGuardianChainConfig ) error {
551556 // TODO(delegated-guardian-sets): Handle case for when delegate guardian set changes
552557 // Check if we have more delegate observations than required for quorum.
553558 if len (s .observations ) < dgs .Quorum () {
@@ -556,14 +561,14 @@ func (p *Processor) checkForDelegateQuorum(ctx context.Context, mp *node_common.
556561 if p .logger .Level ().Enabled (zapcore .DebugLevel ) {
557562 p .logger .Debug ("quorum not yet met" ,
558563 zap .Stringer ("emitter_chain" , c ),
559- zap .Uint64 ("sequence" , mp .Sequence ),
564+ zap .Uint64 ("sequence" , mp .Sequence ),
560565 )
561566 }
562- return
567+ return nil
563568 }
564569
565570 s .submitted = true
566- p .handleMessagePublication (ctx , mp )
571+ return p .handleMessagePublication (ctx , mp )
567572}
568573
569574// delegateObservationToMessagePublication converts a DelegateObservation into a MessagePublication that can be passed through the normal processor pipeline.
@@ -577,31 +582,35 @@ func delegateObservationToMessagePublication(d *gossipv1.DelegateObservation) (*
577582 return nil , fmt .Errorf ("delegate observation tx_hash too short: got %d; want at least %d" , txIDLen , node_common .TxIDLenMin )
578583 }
579584
580- addr , err := vaa .BytesToAddress (d .EmitterAddress )
581- if err != nil {
582- return nil , fmt .Errorf ("invalid delegate observation emitter address: %w" , err )
585+ if d .ConsistencyLevel > math .MaxUint8 {
586+ return nil , fmt .Errorf ("invalid delegate observation consistency : %d" , d .ConsistencyLevel )
583587 }
584588
585589 c , err := vaa .ChainIDFromNumber (d .EmitterChain )
586590 if err != nil {
587591 return nil , fmt .Errorf ("invalid delegate observation emitter chain: %w" , err )
588592 }
589593
590- mp := & node_common.MessagePublication {
591- TxID : d .TxHash ,
592- Timestamp : time .Unix (int64 (d .Timestamp ), 0 ), // Timestamp is uint32 representing seconds since UNIX epoch so is safe to convert.
593- Nonce : d .Nonce ,
594- Sequence : d .Sequence ,
595- ConsistencyLevel : uint8 (d .ConsistencyLevel ),
596- EmitterChain : c ,
597- EmitterAddress : addr ,
598- Payload : d .Payload ,
599- IsReobservation : false ,
600- Unreliable : false ,
601- // verificationState intentionally left at the default (NotVerified).
602- }
603-
604- return mp , nil
594+ addr , err := vaa .BytesToAddress (d .EmitterAddress )
595+ if err != nil {
596+ return nil , fmt .Errorf ("invalid delegate observation emitter address: %w" , err )
597+ }
598+
599+ mp := & node_common.MessagePublication {
600+ TxID : d .TxHash ,
601+ Timestamp : time .Unix (int64 (d .Timestamp ), 0 ), // Timestamp is uint32 representing seconds since UNIX epoch so is safe to convert.
602+ Nonce : d .Nonce ,
603+ Sequence : d .Sequence ,
604+ ConsistencyLevel : uint8 (d .ConsistencyLevel ),
605+ EmitterChain : c ,
606+ EmitterAddress : addr ,
607+ Payload : d .Payload ,
608+ IsReobservation : false ,
609+ Unreliable : false ,
610+ // verificationState intentionally left at the default (NotVerified).
611+ }
612+
613+ return mp , nil
605614}
606615
607616// messagePublicationToDelegateObservation converts a MessagePublication into a DelegateObservation to be sent by a delegated guardian.
0 commit comments