diff --git a/src/discof/gossip/fd_gossip_tile.c b/src/discof/gossip/fd_gossip_tile.c index 9b2cec9c1b..acb6e85168 100644 --- a/src/discof/gossip/fd_gossip_tile.c +++ b/src/discof/gossip/fd_gossip_tile.c @@ -6,6 +6,7 @@ #include "../../disco/keyguard/fd_keyswitch.h" #include "../../disco/keyguard/fd_keyload.h" #include "../../disco/keyguard/fd_keyguard_client.h" +#include "../../disco/keyguard/fd_keyguard.h" #include @@ -46,9 +47,13 @@ struct fd_gossip_tile_ctx { fd_gossip_out_ctx_t net_out[ 1 ]; fd_gossip_out_ctx_t gossip_out[ 1 ]; + fd_gossip_out_ctx_t sign_out[ 1 ]; fd_keyguard_client_t keyguard_client[ 1 ]; fd_keyswitch_t * keyswitch; + + fd_ip4_udp_hdrs_t net_out_hdr[ 1 ]; /* Used to construct outgoing network packets */ + fd_stem_context_t * stem; /* This is ugly! */ }; typedef struct fd_gossip_tile_ctx fd_gossip_tile_ctx_t; @@ -66,6 +71,54 @@ scratch_footprint( fd_topo_tile_t const * tile ) { return FD_LAYOUT_FINI( l, scratch_align() ); } +static void +gossip_send_fn( void * ctx, + uchar const * payload, + ulong payload_sz, + fd_ip4_port_t const * peer_address, + ulong tsorig ) { + fd_gossip_tile_ctx_t * gossip_ctx = (fd_gossip_tile_ctx_t *)ctx; + + ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t); + gossip_ctx->net_out->chunk = fd_dcache_compact_next( gossip_ctx->net_out->chunk, packet_sz, gossip_ctx->net_out->chunk0, gossip_ctx->net_out->wmark ); + + uchar * packet = (uchar *)fd_chunk_to_laddr( gossip_ctx->net_out->mem, gossip_ctx->net_out->chunk ); + fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet; + *hdr = *gossip_ctx->net_out_hdr; + + fd_ip4_hdr_t * ip4 = hdr->ip4; + fd_udp_hdr_t * udp = hdr->udp; + + /* Update payload size in headers */ + ip4->net_tot_len = fd_ushort_bswap( (ushort)(packet_sz + sizeof(fd_ip4_hdr_t) + sizeof(fd_udp_hdr_t)) ); + udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) ); + + /* Fill in destination info */ + ip4->daddr = peer_address->addr; + udp->net_dport = peer_address->port; + + /* IP Checksum calculation */ + ip4->check = fd_ip4_hdr_check_fast( ip4 ); + /* TODO: ip4 net_id? */ + + /* Inject payload */ + fd_memcpy( packet + sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz ); + + /* Publish fragment */ + ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() ); + ulong sig = fd_disco_netmux_sig( peer_address->addr, peer_address->port, peer_address->addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) ); + fd_stem_publish( gossip_ctx->stem, 0UL, 0UL, gossip_ctx->net_out->chunk, packet_sz, sig, tsorig, tspub ); +} + +static void +gossip_sign_fn( void * ctx, + uchar const * data, + ulong sz, + uchar * signature ) { + fd_gossip_tile_ctx_t * gossip_ctx = (fd_gossip_tile_ctx_t *)ctx; + fd_keyguard_client_sign( gossip_ctx->keyguard_client, signature, data, sz, FD_KEYGUARD_SIGN_TYPE_ED25519 ); +} + static inline void during_housekeeping( fd_gossip_tile_ctx_t * ctx ) { if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) { @@ -124,12 +177,13 @@ after_frag( fd_gossip_tile_ctx_t * ctx, ulong sz, ulong tsorig FD_PARAM_UNUSED, ulong tspub FD_PARAM_UNUSED, - fd_stem_context_t * stem FD_PARAM_UNUSED ) { + fd_stem_context_t * stem ) { if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) { long now = ctx->last_wallclock + (long)((double)(fd_tickcount()-ctx->last_tickcount)/ctx->ticks_per_ns); fd_gossip_advance( ctx->gossip, now ); fd_gossip_rx( ctx->gossip, ctx->buffer, sz, now ); + ctx->stem = stem; } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SHRED_VERSION ) ) { FD_MGAUGE_SET( GOSSIP, SHRED_VERSION, (ushort)sig ); fd_gossip_set_expected_shred_version( ctx->gossip, 1, (ushort)sig ); @@ -181,7 +235,7 @@ static void unprivileged_init( fd_topo_t * topo, fd_topo_tile_t * tile ) { void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); - + FD_SCRATCH_ALLOC_INIT( l, scratch ); fd_gossip_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_tile_ctx_t), sizeof(fd_gossip_tile_ctx_t) ); void * gossip = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_align(), fd_gossip_footprint( tile->gossip.max_entries ) ); @@ -195,7 +249,12 @@ unprivileged_init( fd_topo_t * topo, tile->gossip.expected_shred_version, tile->gossip.entrypoints_cnt, tile->gossip.entrypoints, - ctx->identity_key->uc ) ); + ctx->identity_key->uc, + + gossip_send_fn, + (void*)ctx, + gossip_sign_fn, + (void*)ctx ) ); FD_TEST( ctx->gossip ); FD_MGAUGE_SET( GOSSIP, SHRED_VERSION, tile->gossip.expected_shred_version ); @@ -211,6 +270,7 @@ unprivileged_init( fd_topo_t * topo, ctx->last_wallclock = fd_log_wallclock(); ctx->last_tickcount = fd_tickcount(); + ulong sign_in_tile_idx = ULONG_MAX; for( ulong i=0UL; iin_cnt; i++ ) { fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ]; fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ]; @@ -223,13 +283,36 @@ unprivileged_init( fd_topo_t * topo, ctx->in_kind[ i ] = IN_KIND_SHRED_VERSION; } else if( FD_UNLIKELY( !strcmp( link->name, "net_gossip" ) ) ) { ctx->in_kind[ i ] = IN_KIND_NET; + } else if( FD_UNLIKELY( !strcmp( link->name, "sign_gossip" ) ) ) { + ctx->in_kind[ i ] = IN_KIND_SIGN; + sign_in_tile_idx = i; } else { FD_LOG_ERR(( "unexpected input link name %s", link->name )); } } + if( FD_UNLIKELY( sign_in_tile_idx==ULONG_MAX ) ) + FD_LOG_ERR(( "tile %s:%lu had no input link named sign_gossip", tile->name, tile->kind_id )); + *ctx->net_out = out1( topo, tile, "gossip_net" ); *ctx->gossip_out = out1( topo, tile, "gossip_out" ); + *ctx->sign_out = out1( topo, tile, "gossip_sign" ); + + fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id [ sign_in_tile_idx ] ]; + fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ ctx->sign_out->idx ] ]; + + if( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client, + sign_out->mcache, + sign_out->dcache, + sign_in->mcache, + sign_in->dcache ) ) ) { + FD_LOG_ERR(( "failed to join keyguard client" )); +} + + fd_ip4_udp_hdr_init( ctx->net_out_hdr, + FD_GOSSIP_MTU, + tile->gossip.ip_addr, + tile->gossip.ports.gossip ); ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL ); if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) ) diff --git a/src/flamenco/gossip/fd_active_set.h b/src/flamenco/gossip/fd_active_set.h index a5f08ed89b..68627706f2 100644 --- a/src/flamenco/gossip/fd_active_set.h +++ b/src/flamenco/gossip/fd_active_set.h @@ -22,7 +22,7 @@ (2) Peers sometimes request that we don't forward messages from other originating (origin) nodes to them, because they already have a lot of paths from that node. This is called a prune. - + Complication (1) is handled by keeping a list of the top 12 peers (sorted by stake) for each of 25 buckets of stakes. These buckets are all rotated together. @@ -82,7 +82,7 @@ fd_active_set_join( void * shas ); have pruned the origin, except if ignore_prunes_if_peer_is_origin is non-zero, in which case the list will include a peer if its pubkey matches the origin pubkey. - + Up to 12 peer nodes will be returned in out_nodes. The values returned in out_nodes are an internal peer index of the active set and should not be used for anything other than calling diff --git a/src/flamenco/gossip/fd_bloom.c b/src/flamenco/gossip/fd_bloom.c index 1f05a137eb..b6700a5bfb 100644 --- a/src/flamenco/gossip/fd_bloom.c +++ b/src/flamenco/gossip/fd_bloom.c @@ -49,7 +49,7 @@ fd_bloom_new( void * shmem, if( FD_UNLIKELY( false_positive_rate>=1.0 ) ) return NULL; if( FD_UNLIKELY( max_bits<1UL || max_bits>32768UL ) ) return NULL; - + if( FD_UNLIKELY( !rng ) ) return NULL; ulong num_keys = (ulong)( (double)max_bits*FD_BLOOM_LN_2 ); diff --git a/src/flamenco/gossip/fd_crds.c b/src/flamenco/gossip/fd_crds.c index 94ad38b931..f2d8067ce5 100644 --- a/src/flamenco/gossip/fd_crds.c +++ b/src/flamenco/gossip/fd_crds.c @@ -1,20 +1,12 @@ #include "fd_crds.h" +#include "fd_crds_value.h" -struct fd_crds_key { - uchar tag; - uchar pubkey[ 32UL ]; - union { - uchar vote_index; - uchar epoch_slots_index; - ushort duplicate_shred_index; - }; -}; - -typedef struct fd_crds_key fd_crds_key_t; +#define FD_CRDS_ALIGN 8UL +#define FD_CRDS_MAGIC (0xf17eda2c37c7d50UL) /* firedancer crds version 0*/ struct fd_crds_purged { uchar hash[ 32UL ]; - ulong wallclock_nanos; + long wallclock_nanos; }; typedef struct fd_crds_purged fd_crds_purged_t; @@ -24,35 +16,20 @@ typedef struct fd_crds_purged fd_crds_purged_t; are not arbitrary, and must conform to a strictly typed schema of around 10 different messages. */ -struct fd_crds_value_private { - /* The core operation of the CRDS is to "upsert" a value. Basically, - all of the message types are keyed by the originators public key, - and we only want to store the most recent message of each type. - - So we have a ContactInfo message for example. If a validator sends - us a new ContactInfo message, we want to replace the old one. This - lookup is serviced by a hash table, keyed by the public key of the - originator, and in a few special cases an additional field. For - example, votes are (originator_key, vote_index), since we need to - know about more than one vote from a given originator. - - This key field is the key for the hash table. */ - fd_crds_key_t key; - - /* When an originator creates a CRDS message, they attach their local - wallclock time to it. This time is used to determine when a - message should be upserted. If messages have the same key, the - newer one (as created by the originator) is used. */ - long wallclock_nanos; +struct fd_crds_entry_private { + /* value data (contains key) */ + fd_crds_value_t value[1]; - /* value data ... */ + /* Pool fields. Not in use when pool element is acquired */ + ulong pool_next; + int num_duplicates; /* The CRDS needs to perform a variety of actions on the message table quickly, so there are various indexes woven through them values to support these actions. They are ... - + lookup is used to enable the core map functionality - described for upserts above. */ + described for upserts defined by value->key. */ struct { ulong next; ulong prev; @@ -69,6 +46,8 @@ struct fd_crds_value_private { ulong right; ulong prio; ulong stake; + ulong next; /* next in the treap iteration order */ + ulong prev; /* previous in the treap iteration order */ } evict; /* Values in the table expire after a pre-determined amount of time, @@ -101,38 +80,47 @@ struct fd_crds_value_private { } hash; }; -#define POOL_NAME crds_pool -#define POOL_ELE_T fd_crds_value_t +#define POOL_NAME crds_pool +#define POOL_T fd_crds_entry_t +#define POOL_NEXT pool_next #include "../../util/tmpl/fd_pool.c" #define TREAP_NAME evict_treap -#define TREAP_T fd_crds_value_t +#define TREAP_T fd_crds_entry_t #define TREAP_QUERY_T void * /* We don't use query ... */ #define TREAP_CMP(q,e) (__extension__({ (void)(q); (void)(e); -1; })) /* which means we don't need to give a real implementation to cmp either */ #define TREAP_IDX_T ulong -#define TREAP_OPTIMIZE_ITERATION 1 #define TREAP_LT(e0,e1) ((e0)->evict.stake<(e1)->evict.stake) +#define TREAP_PARENT evict.parent +#define TREAP_LEFT evict.left +#define TREAP_RIGHT evict.right +#define TREAP_PRIO evict.prio + +#define TREAP_OPTIMIZE_ITERATION 1 +#define TREAP_NEXT evict.next +#define TREAP_PREV evict.prev + #include "../../util/tmpl/fd_treap.c" #define DLIST_NAME staked_expire_dlist -#define DLIST_ELE_T fd_crds_value_t +#define DLIST_ELE_T fd_crds_entry_t #define DLIST_PREV expire.prev #define DLIST_NEXT expire.next #include "../../util/tmpl/fd_dlist.c" #define DLIST_NAME unstaked_expire_dlist -#define DLIST_ELE_T fd_crds_value_t +#define DLIST_ELE_T fd_crds_entry_t #define DLIST_PREV expire.prev #define DLIST_NEXT expire.next #include "../../util/tmpl/fd_dlist.c" #define TREAP_NAME hash_treap -#define TREAP_T fd_crds_value_t +#define TREAP_T fd_crds_entry_t #define TREAP_QUERY_T void * /* We don't use query ... */ #define TREAP_CMP(q,e) (__extension__({ (void)(q); (void)(e); -1; })) /* which means we don't need to give a real implementation to cmp either */ @@ -142,22 +130,27 @@ struct fd_crds_value_private { #define TREAP_PREV hash.prev #define TREAP_LT(e0,e1) ((e0)->hash.hash<(e1)->hash.hash) +#define TREAP_PARENT hash.parent +#define TREAP_LEFT hash.left +#define TREAP_RIGHT hash.right +#define TREAP_PRIO hash.prio + #include "../../util/tmpl/fd_treap.c" static inline ulong -lookup_hash( fd_crds_key_t * key, - ulong seed ) { - ulong hash = fd_hash( seed, key.tag, 1UL ); - hash = fd_hash( hash, key.pubkey, 32UL ); - switch( key.tag ) { +lookup_hash( fd_crds_key_t const * key, + ulong seed ) { + ulong hash = fd_hash( seed, &key->tag, 1UL ); + hash = fd_hash( hash, key->pubkey, 32UL ); + switch( key->tag ) { case FD_CRDS_TAG_VOTE: - hash = fd_hash( hash, key->vote_index, 1UL ); + hash = fd_hash( hash, &key->vote_index, 1UL ); break; case FD_CRDS_TAG_EPOCH_SLOTS: - hash = fd_hash( hash, key->epoch_slots_index, 1UL ); + hash = fd_hash( hash, &key->epoch_slots_index, 1UL ); break; case FD_CRDS_TAG_DUPLICATE_SHRED: - hash = fd_hash( hash, key->duplicate_shred_index, 2UL ); + hash = fd_hash( hash, &key->duplicate_shred_index, 2UL ); break; default: break; @@ -166,8 +159,8 @@ lookup_hash( fd_crds_key_t * key, } static inline int -lookup_eq( fd_crds_key_t * key0, - fd_crds_key_t * key1 ) { +lookup_eq( fd_crds_key_t const * key0, + fd_crds_key_t const * key1 ) { if( FD_UNLIKELY( key0->tag!=key1->tag ) ) return 0; if( FD_UNLIKELY( !memcmp( key0->pubkey, key1->pubkey, 32UL ) ) ) return 0; switch( key0->tag ) { @@ -184,9 +177,9 @@ lookup_eq( fd_crds_key_t * key0, } #define MAP_NAME lookup_map -#define MAP_ELE_T fd_crds_value_t +#define MAP_ELE_T fd_crds_entry_t #define MAP_KEY_T fd_crds_key_t -#define MAP_KEY key +#define MAP_KEY value->key #define MAP_IDX_T ulong #define MAP_NEXT lookup.next #define MAP_PREV lookup.prev @@ -197,30 +190,23 @@ lookup_eq( fd_crds_key_t * key0, #include "../../util/tmpl/fd_map_chain.c" struct fd_crds_private { - fd_crds_value_t * pool; + fd_crds_entry_t * pool; - evict_treap_t * evict_treap; - expire_dlist_t * expire_dlist; - hash_treap_t * hash_treap; - lookup_map_t * lookup_map; + evict_treap_t * evict_treap; + staked_expire_dlist_t * staked_expire_dlist; + unstaked_expire_dlist_t *unstaked_expire_dlist; + hash_treap_t * hash_treap; + lookup_map_t * lookup_map; - ulong purged_len; - ulong purged_idx; - ulong purged_cap; - fd_crds_purge_t * purged_list; + ulong purged_len; + ulong purged_idx; + ulong purged_cap; + fd_crds_purged_t * purged_list; int has_staked_node; -} + ulong magic; +}; -long -fd_crds_value_wallclock( fd_crds_value_t const * value ) { - return value->wallclock_nanos; -} - -uchar const * -fd_crds_value_pubkey( fd_crds_value_t const * value ) { - return value->key.pubkey; -} FD_FN_CONST ulong fd_crds_align( void ) { @@ -288,7 +274,7 @@ fd_crds_new( void * shmem, crds->evict_treap = evict_treap_join( evict_treap_new( _evict_treap, ele_max ) ); FD_TEST( crds->evict_treap ); - evict_treap_seed( crds->evict_treap, ele_max, fd_rng_ulong( rng ) ); + evict_treap_seed( crds->pool, ele_max, fd_rng_ulong( rng ) ); crds->staked_expire_dlist = staked_expire_dlist_join( staked_expire_dlist_new( _staked_expire_dlist ) ); FD_TEST( crds->staked_expire_dlist ); @@ -298,7 +284,7 @@ fd_crds_new( void * shmem, crds->hash_treap = hash_treap_join( hash_treap_new( _hash_treap, ele_max ) ); FD_TEST( crds->hash_treap ); - hash_treap_seed( crds->hash_treap, ele_max, fd_rng_ulong( rng ) ); + hash_treap_seed( crds->pool, ele_max, fd_rng_ulong( rng ) ); crds->lookup_map = lookup_map_join( lookup_map_new( _lookup_map, ele_max, fd_rng_ulong( rng ) ) ); FD_TEST( crds->lookup_map ); @@ -344,32 +330,32 @@ fd_crds_expire( fd_crds_t * crds, static const long STAKED_EXPIRE_DURATION_NANOS = 432000L*SLOT_DURATION_NANOS; static const long UNSTAKED_EXPIRE_DURATION_NANOS = 15L*1000L*1000L*1000L; - while( !staked_expire_dlist_is_empty( crds->expire_dlist ) ) { - fd_crds_value_t const * head = staked_expire_dlist_ele_peek_head_const( crds->expire_dlist crds->pool ); + while( !staked_expire_dlist_is_empty( crds->staked_expire_dlist, crds->pool ) ) { + fd_crds_entry_t * head = staked_expire_dlist_ele_peek_head( crds->staked_expire_dlist, crds->pool ); if( FD_LIKELY( head->expire.wallclock_nanosstaked_expire_dlist, crds->pool ); hash_treap_ele_remove( crds->hash_treap, head, crds->pool ); - lookup_map_ele_remove( crds->lookup_map, head, crds->pool ); + lookup_map_ele_remove( crds->lookup_map, head->value->key, NULL, crds->pool ); evict_treap_ele_remove( crds->evict_treap, head, crds->pool ); - crds_pool_release( crds->pool, head ); + crds_pool_ele_release( crds->pool, head ); } long unstaked_expire_duration_nanos = fd_long_if( crds->has_staked_node, UNSTAKED_EXPIRE_DURATION_NANOS, STAKED_EXPIRE_DURATION_NANOS ); - while( !unstaked_expire_dlist_is_empty( crds->expire_dlist ) ) { - fd_crds_value_t const * head = unstaked_expire_dlist_ele_peek_head_const( crds->expire_dlist, crds->pool ); + while( !unstaked_expire_dlist_is_empty( crds->unstaked_expire_dlist, crds->pool ) ) { + fd_crds_entry_t * head = unstaked_expire_dlist_ele_peek_head( crds->unstaked_expire_dlist, crds->pool ); - if( FD_LIKELY( head->expire.wallclock_nanoexpire.wallclock_nanosunstaked_expire_dlist, crds->pool ); hash_treap_ele_remove( crds->hash_treap, head, crds->pool ); - lookup_map_ele_remove( crds->lookup_map, head, crds->pool ); + lookup_map_ele_remove( crds->lookup_map, head->value->key, NULL, crds->pool ); evict_treap_ele_remove( crds->evict_treap, head, crds->pool ); - crds_pool_release( crds->pool, head ); + crds_pool_ele_release( crds->pool, head ); } while( crds->purged_len ) { @@ -381,12 +367,12 @@ fd_crds_expire( fd_crds_t * crds, } } -fd_crds_value_t * +fd_crds_entry_t * fd_crds_acquire( fd_crds_t * crds ) { - if( FD_UNLIKELY( !crds_pool_free( crds->pool )==0UL ) ) { - evict_treap_fwd_iter_t head = evict_treap_fwd_iter_init( crds->evict_treap ); + if( FD_UNLIKELY( crds_pool_free( crds->pool )==0UL ) ) { + evict_treap_fwd_iter_t head = evict_treap_fwd_iter_init( crds->evict_treap, crds->pool ); FD_TEST( !evict_treap_fwd_iter_done( head ) ); - fd_crds_value_t * evict = evict_treap_fwd_iter_ele( iter ); + fd_crds_entry_t * evict = evict_treap_fwd_iter_ele( head, crds->pool ); if( FD_LIKELY( !evict->evict.stake ) ) { unstaked_expire_dlist_ele_remove( crds->unstaked_expire_dlist, evict, crds->pool ); @@ -395,48 +381,52 @@ fd_crds_acquire( fd_crds_t * crds ) { } hash_treap_ele_remove( crds->hash_treap, evict, crds->pool ); - lookup_map_ele_remove( crds->lookup_map, evict, crds->pool ); + lookup_map_ele_remove( crds->lookup_map, evict->value->key, NULL, crds->pool ); return evict; } else { - return crds_pool_acquire( crds->pool ); + return crds_pool_ele_acquire( crds->pool ); } } void fd_crds_release( fd_crds_t * crds, - fd_crds_value_t * value ) { - crds_pool_release( crds->pool, value ); + fd_crds_entry_t * value ) { + crds_pool_ele_release( crds->pool, value ); } static inline int -overrides( fd_crds_value_t const * value, - fd_crds_value_t const * candidate ) { - switch( value->key.tag ) { +overrides( fd_crds_entry_t const * value, + fd_crds_entry_t const * candidate ) { + long val_wc = fd_crds_value_wallclock( value->value ); + long cand_wc = fd_crds_value_wallclock( candidate->value ); + switch( value->value->key->tag ) { /* FIXME: gross */ case FD_CRDS_TAG_CONTACT_INFO: - if( FD_UNLIKELY( candidate->contact_info.outset>value->contact_info.outset ) ) return 1; - else if( FD_UNLIKELY( candidate->contact_info.outsetcontact_info.outset ) ) return 0; - else if( FD_UNLIKELY( candidate->wallclock>value->wallclock ) ) return 1; - else if( FD_UNLIKELY( candidate->wallclockwallclock ) ) return 0; + if( FD_UNLIKELY( candidate->value->contact_info.instance_creation_wallclock_nanos>value->value->contact_info.instance_creation_wallclock_nanos ) ) return 1; + else if( FD_UNLIKELY( candidate->value->contact_info.instance_creation_wallclock_nanosvalue->contact_info.instance_creation_wallclock_nanos ) ) return 0; + else if( FD_UNLIKELY( cand_wc>val_wc ) ) return 1; + else if( FD_UNLIKELY( cand_wcnode_instance.token==value->node_instance.token ) ) break; - else if( FD_LIKELY( memcmp( candidate->node_instance.from, value->node_instance.from, 32UL ) ) ) break; - else if( FD_UNLIKELY( candidate->wallclock>value->wallclock ) ) return 1; - else if( FD_UNLIKELY( candidate->wallclockwallclock ) ) return 0; - else return !!candidate->node_instance.tokennode_instance.token; + + if( FD_LIKELY( candidate->value->node_instance.token==value->value->node_instance.token ) ) break; + else if( FD_LIKELY( memcmp( candidate->value->node_instance.from, value->value->node_instance.from, 32UL ) ) ) break; + else if( FD_UNLIKELY( cand_wc>val_wc ) ) return 1; + else if( FD_UNLIKELY( cand_wcvalue->node_instance.tokenvalue->node_instance.token); default: break; } - if( FD_LIKELY( candidate->wallclock>value->wallclock ) ) return 1; - else if( FD_LIKELY( candidate->wallclockwallclock ) ) return 0; + if( FD_UNLIKELY( cand_wc>val_wc ) ) return 1; + else if( FD_UNLIKELY( cand_wchash.hashhash.hash; } +int fd_crds_upserts( fd_crds_t * crds, - fd_crds_value_t * candidate ) { - fd_crds_value_t const * value = lookup_map_ele_query_const( crds->lookup_map, &value->key, NULL, crds->pool ); + fd_crds_entry_t * candidate ) { + fd_crds_entry_t const * value = lookup_map_ele_query_const( crds->lookup_map, candidate->value->key, NULL, crds->pool ); if( FD_UNLIKELY( !value ) ) return 1; return overrides( value, candidate ); @@ -454,14 +444,14 @@ insert_purged( fd_crds_t * crds, int fd_crds_insert( fd_crds_t * crds, - fd_crds_value_t * value, + fd_crds_entry_t * value, int from_push_message ) { /* TODO: Why Agave tracks route? PushRespose etc ... */ - fd_crds_value_t * replace = lookup_map_ele_query( crds->lookup_map, &value->key, NULL, crds->pool ); + fd_crds_entry_t * replace = lookup_map_ele_query( crds->lookup_map, value->value->key, NULL, crds->pool ); if( FD_LIKELY( replace ) ) { if( FD_UNLIKELY( !overrides( replace, value ) ) ) { if( FD_UNLIKELY( replace->hash.hash!=value->hash.hash ) ) { - insert_purged( crds, fd_crds_value_hash( replace ), replace->wallclock_nanos ); + insert_purged( crds, fd_crds_value_hash( replace->value ), fd_crds_value_wallclock( replace->value ) ); return -1; } @@ -473,7 +463,7 @@ fd_crds_insert( fd_crds_t * crds, return replace->num_duplicates++; } - insert_purged( crds, fd_crds_value_hash( replace ), replace->wallclock_nanos ); + insert_purged( crds, fd_crds_value_hash( replace->value ), fd_crds_value_wallclock( replace->value ) ); evict_treap_ele_remove( crds->evict_treap, replace, crds->pool ); if( FD_LIKELY( replace->evict.stake ) ) { @@ -482,8 +472,8 @@ fd_crds_insert( fd_crds_t * crds, unstaked_expire_dlist_ele_remove( crds->unstaked_expire_dlist, replace, crds->pool ); } hash_treap_ele_remove( crds->hash_treap, replace, crds->pool ); - lookup_map_ele_remove( crds->lookup_map, replace, crds->pool ); - crds_pool_release( crds->pool, replace ); + lookup_map_ele_remove( crds->lookup_map, replace->value->key, NULL, crds->pool ); + crds_pool_ele_release( crds->pool, replace ); } crds->has_staked_node |= value->evict.stake ? 1 : 0; @@ -511,8 +501,9 @@ fd_crds_mask_iter_init( fd_crds_t const * crds, fd_crds_mask_iter_t it = { .mask = mask, .mask_bits = mask_bits, - .iter = hash_treap_fwd_iter_init( crds->hash_treap ), + .iter = hash_treap_fwd_iter_init( crds->hash_treap, crds->pool ), }; + return it; } fd_crds_mask_iter_t @@ -521,5 +512,5 @@ fd_crds_mask_iter_next( fd_crds_mask_iter_t it ); int fd_crds_mask_iter_done( fd_crds_mask_iter_t it ); -fd_crds_value_t const * +fd_crds_entry_t const * fd_crds_mask_iter_value( fd_crds_mask_iter_t it ); diff --git a/src/flamenco/gossip/fd_crds.h b/src/flamenco/gossip/fd_crds.h index 232d170df8..0cfaaa9395 100644 --- a/src/flamenco/gossip/fd_crds.h +++ b/src/flamenco/gossip/fd_crds.h @@ -2,9 +2,10 @@ #define HEADER_fd_src_flamenco_gossip_fd_crds_h #include "../../util/fd_util.h" +#include "../../util/net/fd_net_headers.h" -struct fd_crds_value_private; -typedef struct fd_crds_value_private fd_crds_value_t; +struct fd_crds_entry_private; +typedef struct fd_crds_entry_private fd_crds_entry_t; struct fd_crds_private; typedef struct fd_crds_private fd_crds_t; @@ -14,25 +15,17 @@ typedef struct fd_crds_mask_iter_private fd_crds_mask_iter_t; FD_PROTOTYPES_BEGIN -long -fd_crds_value_wallclock( fd_crds_value_t const * value ); - -uchar const * -fd_crds_value_pubkey( fd_crds_value_t const * value ); - -uchar const * -fd_crds_value_hash( fd_crds_value_t const * value ); - FD_FN_CONST ulong fd_crds_align( void ); FD_FN_CONST ulong -fd_crds_footprint( ulong ele_max ); +fd_crds_footprint( ulong ele_max, ulong purged_max ); void * fd_crds_new( void * shmem, fd_rng_t * rng, - ulong ele_max ); + ulong ele_max, + ulong purged_max ); fd_crds_t * fd_crds_join( void * shcrds ); @@ -41,7 +34,7 @@ fd_crds_join( void * shcrds ); store. CRDS values from staked nodes expire roughly an epoch after they are created, and values from non-staked nodes expire after 15 seconds. - + There is one exception, when the node is first bootstrapping, and has not yet seen any staked nodes, values do not expire at all. */ @@ -60,7 +53,7 @@ fd_crds_expire( fd_crds_t * crds, are also excluded from the sampling. Peers with a different shred version than us, or with an invalid gossip socket address are also excluded from the sampling. - + If no valid peer can be found, the returned fd_ip4_port_t will be zeroed out. The caller should check for this case and handle it appropriately. On success, the returned fd_ip4_port_t is a socket @@ -85,7 +78,7 @@ fd_crds_sample_peer( fd_crds_t const * crds ); does this by evicting an existing value from the pool and structures if there is no free space. */ -fd_crds_value_t * +fd_crds_entry_t * fd_crds_acquire( fd_crds_t * crds ); /* fd_crds_release releases a CRDS value back to the storage pool. The @@ -96,14 +89,14 @@ fd_crds_acquire( fd_crds_t * crds ); void fd_crds_release( fd_crds_t * crds, - fd_crds_value_t * value ); + fd_crds_entry_t * value ); /* fd_crds_upserts checks if inserting the value into the CRDS would succeed. An insert will fail if the value is already present in the CRDS with a newer timestamp, or if the value is not present. */ - +int fd_crds_upserts( fd_crds_t * crds, - fd_crds_value_t * value ); + fd_crds_entry_t * value ); /* fd_crds_insert inserts and indexes a previously acquired CRDS value into the data store, so that it can be returned by future queries. @@ -113,9 +106,10 @@ fd_crds_upserts( fd_crds_t * crds, release the value when it expires, or when it must be evicted to make room for a new value. */ -void +int fd_crds_insert( fd_crds_t * crds, - fd_crds_value_t * value ); + fd_crds_entry_t * value, + int from_push_msg ); ulong fd_crds_purged_len( fd_crds_t * crds ); @@ -135,7 +129,7 @@ fd_crds_mask_iter_next( fd_crds_mask_iter_t it ); int fd_crds_mask_iter_done( fd_crds_mask_iter_t it ); -fd_crds_value_t const * +fd_crds_entry_t const * fd_crds_mask_iter_value( fd_crds_mask_iter_t it ); ulong diff --git a/src/flamenco/gossip/fd_crds_value.h b/src/flamenco/gossip/fd_crds_value.h index a8925aa2a2..90735ddb0a 100644 --- a/src/flamenco/gossip/fd_crds_value.h +++ b/src/flamenco/gossip/fd_crds_value.h @@ -2,5 +2,137 @@ #define HEADER_fd_src_flamenco_gossip_fd_crds_value_h #include "../../util/fd_util.h" +#include "../../util/net/fd_net_headers.h" + +#define FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ( 0) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_SERVE_REPAIR_QUIC ( 1) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ( 2) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_RPC_PUBSUB ( 3) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_SERVE_REPAIR ( 4) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_TPU ( 5) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_TPU_FORWARDS ( 6) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_TPU_FORWARDS_QUIC ( 7) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_TPU_QUIC ( 8) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_TPU_VOTE ( 9) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_TVU (10) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_TVU_QUIC (11) +#define FD_GOSSIP_CONTACT_INFO_SOCKET_TPU_VOTE_QUIC (12) + +#define FD_GOSSIP_CLIENT_SOLANA (0) +#define FD_GOSSIP_CLIENT_JITO (1) +#define FD_GOSSIP_CLIENT_FD (2) +#define FD_GOSSIP_CLIENT_AGAVE (3) + +#define FD_CRDS_TAG_LEGACY_CONTACT_INFO ( 0) +#define FD_CRDS_TAG_VOTE ( 1) +#define FD_CRDS_TAG_LOWEST_SLOT ( 2) +#define FD_CRDS_TAG_SNAPSHOT_HASHES ( 3) +#define FD_CRDS_TAG_ACCOUNT_HASHES ( 4) +#define FD_CRDS_TAG_EPOCH_SLOTS ( 5) +#define FD_CRDS_TAG_LEGACY_VERSION_V1 ( 6) +#define FD_CRDS_TAG_LEGACY_VERSION_V2 ( 7) +#define FD_CRDS_TAG_NODE_INSTANCE ( 8) +#define FD_CRDS_TAG_DUPLICATE_SHRED ( 9) +#define FD_CRDS_TAG_INC_SNAPSHOT_HASHES (10) +#define FD_CRDS_TAG_CONTACT_INFO (11) +#define FD_CRDS_TAG_RESTART_LAST_VOTED_FORK_SLOTS (12) +#define FD_CRDS_TAG_RESTART_HEAVIEST_FORK (13) + +struct fd_crds_key { + uchar tag; + uchar pubkey[ 32UL ]; + union { + uchar vote_index; + uchar epoch_slots_index; + ushort duplicate_shred_index; + }; +}; + +typedef struct fd_crds_key fd_crds_key_t; + +struct fd_gossip_crds_contact_info { + long instance_creation_wallclock_nanos; + ushort shred_version; + + struct { + uchar client; + + ushort major; + ushort minor; + ushort patch; + + int has_commit; + uint commit; + uint feature_set; + } version; + + struct { + /* WARNING: in gossip contact info message, + ports are encoded in host form. The parser will + perform the conversion */ + fd_ip4_port_t addr; + } sockets[ 13UL ]; +}; + +typedef struct fd_gossip_crds_contact_info fd_gossip_crds_contact_info_t; + +struct fd_gossip_crds_vote { + ulong slot; + uchar * txn; /* TODO: avoid pointers here */ + ulong txn_sz; +}; + +typedef struct fd_gossip_crds_vote fd_gossip_crds_vote_t; + +struct fd_gossip_crds_node_instance { + uchar token[ 32UL ]; /* This is the node instance token */ + uchar from[ 32UL ]; /* This is the public key of the node that sent this message */ + ulong wallclock_nanos; /* Wallclock time when this message was created */ +}; +typedef struct fd_gossip_crds_node_instance fd_gossip_crds_node_instance_t; + +struct fd_crds_value { + /* The core operation of the CRDS is to "upsert" a value. Basically, + all of the message types are keyed by the originators public key, + and we only want to store the most recent message of each type. + + So we have a ContactInfo message for example. If a validator sends + us a new ContactInfo message, we want to replace the old one. This + lookup is serviced by a hash table, keyed by the public key of the + originator, and in a few special cases an additional field. For + example, votes are (originator_key, vote_index), since we need to + know about more than one vote from a given originator. + + This key field is the key for the hash table. */ + fd_crds_key_t key[1]; + + /* When an originator creates a CRDS message, they attach their local + wallclock time to it. This time is used to determine when a + message should be upserted. If messages have the same key, the + newer one (as created by the originator) is used. + + Messages encode wallclock in millis, firedancer converts + them into nanos internally. */ + long wallclock_nanos; + uchar signature[64UL]; /* signable data is always offset + sizeof(signature); signable_sz = sz - sizeof(signature) */ + union { + fd_gossip_crds_contact_info_t contact_info; + fd_gossip_crds_vote_t vote; + fd_gossip_crds_node_instance_t node_instance; + }; +}; +typedef struct fd_crds_value fd_crds_value_t; + +FD_PROTOTYPES_BEGIN +long +fd_crds_value_wallclock( fd_crds_value_t const * value ); + +uchar const * +fd_crds_value_pubkey( fd_crds_value_t const * value ); + +uchar const * +fd_crds_value_hash( fd_crds_value_t const * value ); + +FD_PROTOTYPES_END #endif /* HEADER_fd_src_flamenco_gossip_fd_crds_value_h */ diff --git a/src/flamenco/gossip/fd_gossip.c b/src/flamenco/gossip/fd_gossip.c index c71c2d06e2..04025d1b68 100644 --- a/src/flamenco/gossip/fd_gossip.c +++ b/src/flamenco/gossip/fd_gossip.c @@ -1,171 +1,338 @@ #include "fd_gossip.h" #include "fd_gossip_types.h" +#include "fd_gossip_msg.h" #include "fd_crds.h" #include "fd_active_set.h" #include "fd_prune_finder.h" #include "fd_ping_tracker.h" +#include "../../ballet/ed25519/fd_ed25519.h" +#include "../../ballet/sha256/fd_sha256.h" -static int -parse_message( uchar const * data, - ulong data_sz, - fd_gossip_message_t * message ) { - - return 0; -} +struct fd_gossip_private { + uchar identity_pubkey[ 32UL ]; -static int -rx_pull_request( fd_gossip_t * gossip, - fd_gossip_pull_request_t const * pull_request, - long now ) { - /* TODO: Implement data budget? */ + fd_gossip_metrics_t metrics[1]; - fd_gossip_crds_data_t const * data = pull_request->value->data; - if( FD_UNLIKELY( data->tag!=FD_GOSSIP_VALUE_CONTACT_INFO ) ) return FD_GOSSIP_RX_ERR_PULL_REQUEST_NOT_CONTACT_INFO; + // fd_crds_t * crds; + // fd_active_set_t * active_set; + fd_ping_tracker_t * ping_tracker; - fd_gossip_contact_info_t const * contact_info = data->contact_info; - if( FD_UNLIKELY( !memcmp( data->contact_info->pubkey, gossip->identity_pubkey, 32UL ) ) ) return FD_GOSSIP_RX_ERR_PULL_REQUEST_LOOPBACK; + fd_sha512_t sha512[1]; - if( FD_UNLIKELY( !is_valid_address( node ) ) ) return FD_GOSSIP_RX_ERR_PULL_REQUEST_INVALID_ADDRESS; + /* Callbacks */ + fd_gossip_sign_fn sign_fn; + void * sign_ctx; - fd_gossip_crds_filter_t const * filter = pull_request->filter; + fd_gossip_send_fn send_fn; + void * send_ctx; +}; - /* TODO: Jitter? */ - long clamp_wallclock_lower_nanos = now - 15L*1000L*1000L*1000L; - long clamp_wallclock_upper_nanos = now + 15L*1000L*1000L*1000L; - if( FD_UNLIKELY( contact_info->wallclockwallclock>clamp_wallclock_upper_nanos ) ) return FD_GOSSIP_RX_ERR_PULL_REQUEST_WALLCLOCK; +ulong +fd_gossip_align( void ) { + return fd_ping_tracker_align(); +} - ulong packet_sz = 0UL; - uchar packet[ 1232UL; ]; +ulong +fd_gossip_footprint( ulong max_values ) { + (void) max_values; + ulong l; + l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) ); + // l = FD_LAYOUT_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values ) ); + // l = FD_LAYOUT_APPEND( l, fd_active_set_align(), fd_active_set_footprint() ); + l = FD_LAYOUT_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint() ); + l = FD_LAYOUT_FINI( l, fd_gossip_align() ); + return l; +} - for( fd_crds_iter_t it=fd_crds_mask_iter_init( gossip->crds, mask, mask_bits ); !fd_crds_mask_iter_done( it ); it=fd_crds_mask_iter_next(it) ) { - fd_crds_value_t * candidate = fd_crds_mask_iter_value( it ); +void * +fd_gossip_new( void * shmem, + fd_rng_t * rng, + ulong max_values, + int has_expected_shred_version, + ushort expected_shred_version, + ulong entrypoints_cnt, + fd_ip4_port_t const * entrypoints, + uchar const * identity_pubkey, + fd_gossip_send_fn send_fn, + void * send_ctx, + fd_gossip_sign_fn sign_fn, + void * sign_ctx ) { + if( FD_UNLIKELY( !shmem ) ) { + FD_LOG_ERR(( "NULL shmem" )); + } + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_gossip_align() ) ) ) { + FD_LOG_ERR(( "misaligned shmem" )); + } - /* TODO: Add jitter here? */ - if( FD_UNLIKELY( fd_crds_value_wallclock( candidate )>contact_info->wallclock ) ) continue; + FD_SCRATCH_ALLOC_INIT( l, shmem ); + fd_gossip_t * gossip = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) ); + void * ping_tracker = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint() ); - ulong serialized_sz; - error = serialize_crds_value_into_packet( candidate, packet, 1232UL-packet_sz, &serialized_sz ); - if( FD_LIKELY( !error ) ) { - packet_sz += serialized_sz; - } else { - /* CRDS value can't fit into the packet anymore, just ship what - we have now and start a new one. */ - gossip->tx_fn( gossip->tx_ctx, packet, packet_sz ); - packet_sz = 0UL; - } - } + fd_ping_tracker_new( ping_tracker, rng ); - /* TODO: Send packet if there's anything leftover */ + fd_sha512_init( gossip->sha512 ); + gossip->send_fn = send_fn; + gossip->send_ctx = send_ctx; + gossip->sign_fn = sign_fn; + gossip->sign_ctx = sign_ctx; - return 0; -} -static int -rx_pull_response( fd_gossip_t * gossip, - fd_gossip_pull_response_t const * pull_response, - long now ) { - /* TODO: use epoch_duration and make timeouts ... ? */ + fd_gossip_set_expected_shred_version( gossip, has_expected_shred_version, expected_shred_version ); + fd_gossip_set_identity( gossip, identity_pubkey ); - for( ulong i=0UL; ivalues_len; i++ ) { - int upserts = fd_crds_upserts( gossip->crds, pull_response->values[ i ] ); - if( FD_UNLIKELY( !upserts ) ) { - failed_inserts_append( gossip, pull_response->values[ i ] ); - continue; - } - /* TODO: Is this jittered in Agave? */ - long accept_after_nanos; - if( FD_UNLIKELY( !memcmp( pull_response->sender_pubkey, gossip->identity_pubkey, 32UL ) ) ) { - accept_after_nanos = 0L; - } else if( stake( pull_response->sender_pubkey ) ) { - accept_after_nanos = now-15L*1000L*1000L*1000L; - } else { - accept_after_nanos = now-432000L*1000L*1000L*1000L; - } + return gossip; +} - if( FD_LIKELY( accept_after_nanos<=fd_crds_value_wallclock( pull_response->values[ i ] ) ) ) { - fd_crds_insert( gossip->crds, pull_response->values[ i ], now ); - fd_crds_update_record_timestamp( pull_response->sender_pubkey, now ); - } else if( fd_crds_has_contact_info( pull_response->sender_pubkey ) ) { - fd_crds_insert( gossip->crds, pull_response->values[ i ], now ); - } else { - failed_inserts_append( gossip, pull_response->values[ i ] ); - } +fd_gossip_t * +fd_gossip_join( void * shgossip ) { + if( FD_UNLIKELY( !shgossip ) ) { + FD_LOG_ERR(( "NULL shgossip" )); + } + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgossip, fd_gossip_align() ) ) ) { + FD_LOG_ERR(( "misaligned shgossip" )); } - return 0; + FD_SCRATCH_ALLOC_INIT( l, shgossip ); + fd_gossip_t * gossip = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) ); + void * ping_tracker = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint() ); + + gossip->ping_tracker = fd_ping_tracker_join( ping_tracker ); + /* No need to join fd_sha512? */ + + return gossip; } static int -rx_push( fd_gossip_t * gossip, - fd_gossip_push_t const * push, - long now ) { - uchar const * relayer_pubkey = push->sender_pubkey; - - for( ulong i=0UL; ivalues_len; i++ ) { - fd_gossip_crds_value_t * value = push->values[ i ]; - if( FD_UNLIKELY( value->timestamptimestamp>now+30L*1000L*1000L*1000L ) ) continue; +parse_message( uchar const * data, + ulong data_sz, + fd_gossip_message_t * message ) { + ulong decoded_sz = fd_gossip_msg_parse( message, data, data_sz ); + if( FD_UNLIKELY( !decoded_sz ) ) return FD_GOSSIP_RX_PARSE_ERR; + return FD_GOSSIP_RX_OK; +} - uchar const * origin_pubkey = fd_crds_value_pubkey( value ); +static int +verify_signatures( fd_gossip_message_t const * message, + uchar const * payload, + fd_sha512_t * sha ) { + + /* Optimize for CRDS composites (push/pull) that don't have an outer signable + data */ + if( FD_UNLIKELY( message->signable_sz != 0 ) ) { + /* TODO: Special case for prune */ + int err = fd_ed25519_verify( payload+message->signable_data_offset, + message->signable_sz, + message->signature, + message->pubkey, + sha ); + if( FD_UNLIKELY( err!=FD_ED25519_SUCCESS ) ) return err; + } - int error = fd_crds_insert( gossip->crds, value, now ); - ulong num_duplicates = 0UL; - if( FD_UNLIKELY( error>0 ) ) num_duplicates = (ulong)error; - else if( FD_UNLIKELY( error<0 ) ) num_duplicates = ULONG_MAX; + /* Verify CRDS entries */ + for( ulong i=0UL; icrds_cnt; i++ ) { + int err = fd_ed25519_verify( payload + message->crds[i].offset+64UL, + message->crds[i].sz-64UL, + message->crds[i].crd_val.signature, + message->crds[i].crd_val.key->pubkey, + sha ); - fd_prune_finder_record( gossip->prune_finder, origin_pubkey, relayer_pubkey, num_duplicates ); + /* Full message must be dropped if any one value fails verify */ + if( FD_UNLIKELY( err!=FD_ED25519_SUCCESS ) ) return err; } - return 0; + return FD_GOSSIP_RX_OK; } +// static int +// rx_pull_request( fd_gossip_t * gossip, +// fd_gossip_pull_request_t const * pull_request, +// long now ) { +// /* TODO: Implement data budget? */ + +// fd_gossip_crds_data_t const * data = pull_request->value->data; +// if( FD_UNLIKELY( data->tag!=FD_GOSSIP_VALUE_CONTACT_INFO ) ) return FD_GOSSIP_RX_ERR_PULL_REQUEST_NOT_CONTACT_INFO; + +// fd_gossip_contact_info_t const * contact_info = data->contact_info; +// if( FD_UNLIKELY( !memcmp( data->contact_info->pubkey, gossip->identity_pubkey, 32UL ) ) ) return FD_GOSSIP_RX_ERR_PULL_REQUEST_LOOPBACK; + +// if( FD_UNLIKELY( !is_valid_address( node ) ) ) return FD_GOSSIP_RX_ERR_PULL_REQUEST_INVALID_ADDRESS; + +// fd_gossip_crds_filter_t const * filter = pull_request->filter; + +// /* TODO: Jitter? */ +// long clamp_wallclock_lower_nanos = now - 15L*1000L*1000L*1000L; +// long clamp_wallclock_upper_nanos = now + 15L*1000L*1000L*1000L; +// if( FD_UNLIKELY( contact_info->wallclockwallclock>clamp_wallclock_upper_nanos ) ) return FD_GOSSIP_RX_ERR_PULL_REQUEST_WALLCLOCK; + +// ulong packet_sz = 0UL; +// uchar packet[ 1232UL ]; + +// for( fd_crds_iter_t it=fd_crds_mask_iter_init( gossip->crds, mask, mask_bits ); !fd_crds_mask_iter_done( it ); it=fd_crds_mask_iter_next(it) ) { +// fd_crds_value_t * candidate = fd_crds_mask_iter_value( it ); + +// /* TODO: Add jitter here? */ +// if( FD_UNLIKELY( fd_crds_value_wallclock( candidate )>contact_info->wallclock ) ) continue; + +// ulong serialized_sz; +// error = serialize_crds_value_into_packet( candidate, packet, 1232UL-packet_sz, &serialized_sz ); +// if( FD_LIKELY( !error ) ) { +// packet_sz += serialized_sz; +// } else { +// /* CRDS value can't fit into the packet anymore, just ship what +// we have now and start a new one. */ +// gossip->tx_fn( gossip->tx_ctx, packet, packet_sz ); +// packet_sz = 0UL; +// } +// } + +// /* TODO: Send packet if there's anything leftover */ + +// return 0; +// } + +// static int +// rx_pull_response( fd_gossip_t * gossip, +// fd_gossip_pull_response_t const * pull_response, +// long now ) { +// /* TODO: use epoch_duration and make timeouts ... ? */ + +// for( ulong i=0UL; ivalues_len; i++ ) { +// int upserts = fd_crds_upserts( gossip->crds, pull_response->values[ i ] ); + +// if( FD_UNLIKELY( !upserts ) ) { +// failed_inserts_append( gossip, pull_response->values[ i ] ); +// continue; +// } + +// /* TODO: Is this jittered in Agave? */ +// long accept_after_nanos; +// if( FD_UNLIKELY( !memcmp( pull_response->sender_pubkey, gossip->identity_pubkey, 32UL ) ) ) { +// accept_after_nanos = 0L; +// } else if( stake( pull_response->sender_pubkey ) ) { +// accept_after_nanos = now-15L*1000L*1000L*1000L; +// } else { +// accept_after_nanos = now-432000L*1000L*1000L*1000L; +// } + +// if( FD_LIKELY( accept_after_nanos<=fd_crds_value_wallclock( pull_response->values[ i ] ) ) ) { +// fd_crds_insert( gossip->crds, pull_response->values[ i ], now ); +// fd_crds_update_record_timestamp( pull_response->sender_pubkey, now ); +// } else if( fd_crds_has_contact_info( pull_response->sender_pubkey ) ) { +// fd_crds_insert( gossip->crds, pull_response->values[ i ], now ); +// } else { +// failed_inserts_append( gossip, pull_response->values[ i ] ); +// } +// } + +// return 0; +// } + +// static int +// rx_push( fd_gossip_t * gossip, +// fd_gossip_push_t const * push, +// long now ) { +// uchar const * relayer_pubkey = push->sender_pubkey; + +// for( ulong i=0UL; ivalues_len; i++ ) { +// fd_gossip_crds_value_t * value = push->values[ i ]; +// if( FD_UNLIKELY( value->timestamptimestamp>now+30L*1000L*1000L*1000L ) ) continue; + +// uchar const * origin_pubkey = fd_crds_value_pubkey( value ); + +// int error = fd_crds_insert( gossip->crds, value, now ); +// ulong num_duplicates = 0UL; +// if( FD_UNLIKELY( error>0 ) ) num_duplicates = (ulong)error; +// else if( FD_UNLIKELY( error<0 ) ) num_duplicates = ULONG_MAX; + +// fd_prune_finder_record( gossip->prune_finder, origin_pubkey, relayer_pubkey, num_duplicates ); +// } + +// return 0; +// } + static int rx_prune( fd_gossip_t * gossip, fd_gossip_prune_t const * prune, long now ) { - if( FD_UNLIKELY( now-500L*1000L*1000L>prune->data->wallclock ) ) return FD_GOSSIP_RX_ERR_PRUNE_TIMEOUT; - else if( FD_UNLIKELY( !memcmp( gossip->identity_pubkey, prune->data->destination, 32UL ) ) ) return FD_GOSSIP_RX_ERR_PRUNE_DESTINATION; + if( FD_UNLIKELY( now-FD_MILLI_TO_NANOSEC(500L)>(long)prune->wallclock_nanos ) ) return FD_GOSSIP_RX_PRUNE_ERR_STALE; + else if( FD_UNLIKELY( !!memcmp( gossip->identity_pubkey, prune->destination, 32UL ) ) ) return FD_GOSSIP_RX_PRUNE_ERR_DESTINATION; - ulong identity_stake = ??; - for( ulong i=0UL; idata->prunes_len; i++ ) { - ulong origin_stake = ??; + ulong identity_stake = 0UL; /* FIXME */ + for( ulong i=0UL; iprunes_len; i++ ) { + ulong origin_stake = 0UL; /* FIXME */ fd_active_set_prune( gossip->active_set, gossip->identity_pubkey, - gossip->identity_stake, - prune->data->pubkey, - prune->data->destination, - prune->data->prunes[ i ], + identity_stake, + prune->from, + prune->destination, + prune->prunes[ i ], origin_stake ); } + return FD_GOSSIP_RX_OK; } static int -rx_ping( fd_gossip_t * gossip, - fd_gossip_ping_t * ping ) { - fd_gossip_message_t * message = new_outgoing( gossip ); - - message->tag = FD_GOSSIP_MESSAGE_PONG; - fd_memcpy( message->pong->from, gossip->identity_pubkey, 32UL ); - message->pong->hash = hash_ping_token( ping->token ); - gossip->sign_fn( gossip->sign_ctx, message->pong->hash, 32UL, message->pong->signature ); - - /* TODO: Send it */ +rx_ping( fd_gossip_t * gossip, + fd_gossip_ping_pong_t * ping, + fd_ip4_port_t * peer_address, + long now ) { + /* Construct and send the pong response */ + uchar payload[ 1232UL ]; + ulong i = fd_gossip_init_msg_payload( payload, 1232UL, FD_GOSSIP_MESSAGE_PONG ); + + fd_memcpy( payload+i, gossip->identity_pubkey, 32UL ) ; i+=32UL ; /* Pubkey */ + fd_ping_tracker_hash_ping_token( payload+i, ping->token ) ; i+=32UL ; /* Hash */ + gossip->sign_fn( gossip->sign_ctx, payload+i, 32UL, payload+i+32UL ); i+=32UL+64UL; /* Signature (performed on hash) */ + + gossip->send_fn( gossip->send_ctx, payload, i, peer_address, (ulong)now ); + return FD_GOSSIP_RX_OK; } static int -rx_pong( fd_gossip_t * gossip, - fd_gossip_pong_t * pong ) { - for( ulong i=0UL; i<2UL; i++ ) { - - if( FD_LIKELY( hash_ping_token( ) ) ) { - return FD_GOSSIP_RX_SUCCESS; - } - } +rx_pong( fd_gossip_t * gossip, + fd_gossip_ping_pong_t * pong, + fd_ip4_port_t * peer_address, + long now ) { + fd_ping_tracker_register( gossip->ping_tracker, + pong->from, + 0UL, /* FIXME: Get stake */ + peer_address, + pong->hash, + now ); + return 0; +} - return FD_GOSSIP_RX_ERR_PONG_UNMATCHED; +/* FIXME: This feels like it should be higher up the rx processing stack (i.e., tile level)*/ +static int +strip_network_hdrs( uchar const * data, + ulong data_sz, + uchar ** const payload, + ulong * payload_sz, + fd_ip4_port_t * peer_address ) { + fd_eth_hdr_t const * eth = (fd_eth_hdr_t const *)data; + fd_ip4_hdr_t const * ip4 = (fd_ip4_hdr_t const *)( (ulong)eth + sizeof(fd_eth_hdr_t) ); + fd_udp_hdr_t const * udp = (fd_udp_hdr_t const *)( (ulong)ip4 + FD_IP4_GET_LEN( *ip4 ) ); + + if( FD_UNLIKELY( (ulong)udp+sizeof(fd_udp_hdr_t) > (ulong)eth+data_sz ) ) + FD_LOG_ERR(( "Malformed UDP header" )); + ulong udp_sz = fd_ushort_bswap( udp->net_len ); + if( FD_UNLIKELY( udp_sz(ulong)eth+data_sz ) ) + FD_LOG_ERR(( "Malformed UDP payload" )); + + *payload = (uchar *)( (ulong)udp + sizeof(fd_udp_hdr_t) ); + *payload_sz = payload_sz_; + + peer_address->addr = ip4->saddr; + peer_address->port = udp->net_sport; + return FD_GOSSIP_RX_OK; } int @@ -173,43 +340,55 @@ fd_gossip_rx( fd_gossip_t * gossip, uchar const * data, ulong data_sz, long now ) { - fd_gossip_message_t message[ 1 ]; - int error = parse_message( data, data_sz, message ); - if( FD_UNLIKELY( error ) ) return error; - error = verify_signatures( gossip, message ); - if( FD_UNLIKELY( error ) ) return error; + uchar * gossip_payload; + ulong gossip_payload_sz; + fd_ip4_port_t peer_address[1]; - error = filter_shred_version( gossip, message ); + int error = strip_network_hdrs( data, + data_sz, + &gossip_payload, + &gossip_payload_sz, + peer_address ); if( FD_UNLIKELY( error ) ) return error; - error = check_duplicate_instance( gossip, message ); + fd_gossip_message_t message[ 1 ]; + ulong decode_sz = fd_gossip_msg_parse( message, gossip_payload, gossip_payload_sz ); + if( FD_UNLIKELY( !!decode_sz ) ) return FD_GOSSIP_RX_PARSE_ERR; + + error = verify_signatures( message, data, gossip->sha512 ); if( FD_UNLIKELY( error ) ) return error; + // error = filter_shred_version( gossip, message ); + // if( FD_UNLIKELY( error ) ) return error; + + // error = check_duplicate_instance( gossip, message ); + // if( FD_UNLIKELY( error ) ) return error; + /* TODO: This should verify ping tracker active for pull request */ - error = verify_gossip_address( gossip, message ); + // error = verify_gossip_address( gossip, message ); if( FD_UNLIKELY( error ) ) return error; /* TODO: Implement traffic shaper / bandwidth limiter */ switch( message->tag ) { case FD_GOSSIP_MESSAGE_PULL_REQUEST: - error = rx_pull_request( gossip, message->pull_request ); + // error = rx_pull_request( gossip, message->pull_request ); break; case FD_GOSSIP_MESSAGE_PULL_RESPONSE: - error = rx_pull_response( gossip, message->pull_response ); + // error = rx_pull_response( gossip, message->pull_response ); break; case FD_GOSSIP_MESSAGE_PUSH: - error = rx_push( gossip, message->push ); + // error = rx_push( gossip, message->push ); break; case FD_GOSSIP_MESSAGE_PRUNE: - error = rx_prune( gossip, message->prune ); + error = rx_prune( gossip, message->prune, now ); break; case FD_GOSSIP_MESSAGE_PING: - error = rx_ping( gossip, message->ping ); + error = rx_ping( gossip, message->piong, peer_address, now ); break; case FD_GOSSIP_MESSAGE_PONG: - error = rx_pong( gossip, message->pong ); + error = rx_pong( gossip, message->piong, peer_address, now ); break; default: FD_LOG_CRIT(( "Unknown gossip message type %d", message->tag )); @@ -236,8 +415,23 @@ static void tx_ping( fd_gossip_t * gossip, long now ) { uchar const * peer_pubkey; - while( fd_ping_tracker_pop_request( gossip->ping_tracker, now, &peer_pubkey ) ) { - /* TODO: Generate and send a ping message ... */ + uchar const * ping_token; + fd_ip4_port_t const * peer_address; + while( fd_ping_tracker_pop_request( gossip->ping_tracker, + now, + &peer_pubkey, + &peer_address, + &ping_token ) ) { + + /* Construct and send ping message */ + uchar payload[ 1232UL ]; + ulong i = fd_gossip_init_msg_payload( payload, 1232UL, FD_GOSSIP_MESSAGE_PING ); + + fd_memcpy( payload+i, gossip->identity_pubkey, 32UL ) ; i+=32UL ; /* Pubkey */ + fd_memcpy( payload+i, ping_token, 32UL ) ; ; /* Ping token */ + gossip->sign_fn( gossip->sign_ctx, payload+i, 32UL, payload+i+32UL ); i+=32UL+64UL; /* Signature (on token) */ + + gossip->send_fn( gossip->send_ctx, payload, i, peer_address, (ulong)now ); } } @@ -349,7 +543,7 @@ tx_pull_request( fd_gossip_t * gossip, for( ulong i=0UL; ifailed_inserts_len; i++ ) { /* TODO: Make the failed insert list also a bplus, for fast finding of matching hashes? */ - fd_gossip_crds_value_t * value = gossip->failed_inserts[ (gossip->failed_inserts_idx+i) % gossip->failed_inserts_len ]; + fd_crds_value_t * value = gossip->failed_inserts[ (gossip->failed_inserts_idx+i) % gossip->failed_inserts_len ]; uchar const * hash = fd_crds_value_hash( value ); if( FD_LIKELY( (fd_ulong_load_8( hash )>>shift)!=mask ) ) continue; fd_bloom_insert( filter, hash, 32UL ); diff --git a/src/flamenco/gossip/fd_gossip.h b/src/flamenco/gossip/fd_gossip.h index 1d847478d5..0dd085fa07 100644 --- a/src/flamenco/gossip/fd_gossip.h +++ b/src/flamenco/gossip/fd_gossip.h @@ -4,6 +4,15 @@ #include "../../util/rng/fd_rng.h" #include "../../util/net/fd_net_headers.h" +#define FD_GOSSIP_RX_OK (0) + +#define FD_GOSSIP_RX_PARSE_ERR (1) + +#define FD_GOSSIP_RX_VERIFY_NO_SIGNABLE_DATA (1) + +#define FD_GOSSIP_RX_PRUNE_ERR_STALE (1) +#define FD_GOSSIP_RX_PRUNE_ERR_DESTINATION (2) + /* TODO: When we get a pull request, respond with ContactInfos first if we have any available that are responsive. */ @@ -64,6 +73,16 @@ struct fd_gossip_metrics { typedef struct fd_gossip_metrics fd_gossip_metrics_t; +typedef void (*fd_gossip_send_fn)( void * ctx, + uchar const * data, + ulong sz, + fd_ip4_port_t const * peer_address, + ulong now ); +typedef void (*fd_gossip_sign_fn)( void * ctx, + uchar const * data, + ulong sz, + uchar * signature ); + FD_PROTOTYPES_BEGIN FD_FN_CONST ulong @@ -80,7 +99,11 @@ fd_gossip_new( void * shmem, ushort expected_shred_version, ulong entrypoints_cnt, fd_ip4_port_t const * entrypoints, - uchar const * identity_pubkey ); + uchar const * identity_pubkey, + fd_gossip_send_fn send_fn, + void * send_ctx, + fd_gossip_sign_fn sign_fn, + void * sign_ctx ); fd_gossip_t * fd_gossip_join( void * shgossip ); @@ -121,6 +144,8 @@ fd_gossip_set_identity( fd_gossip_t * gossip, periodically rotated, with one new peer entering and one old peer leaving, based on stake weights. + now is the current time in nanoseconds. + Only actions which are necessary and useful will be performed, and the function is idempotent and fast otherwise. advance should be called as often as possible. */ @@ -134,6 +159,10 @@ fd_gossip_advance( fd_gossip_t * gossip, otherwise no assumptions are made about the contents of the packet, in particular it might be malformed, corrupted, malicious, and so on. + now is the current time in nanoseconds, and is used to determine + whether the packet is stale or not, and to update the internal state + of the gossip protocol. + Receiving a packet might cause response packets to need to be sent back to the gossip network. The response packets are queued for later sending. The caller is responsible for sending the response diff --git a/src/flamenco/gossip/fd_gossip_msg.c b/src/flamenco/gossip/fd_gossip_msg.c new file mode 100644 index 0000000000..1c72a26876 --- /dev/null +++ b/src/flamenco/gossip/fd_gossip_msg.c @@ -0,0 +1,6 @@ +#include "fd_gossip_msg.h" +#include "fd_gossip_types.h" +void +fd_gossip_msg_init( fd_gossip_message_t * msg ) { + msg->tag = FD_GOSSIP_MESSAGE_LAST + 1; /* default to invalid message tag as a canary */ +} diff --git a/src/flamenco/gossip/fd_gossip_msg.h b/src/flamenco/gossip/fd_gossip_msg.h new file mode 100644 index 0000000000..45c8d0d59b --- /dev/null +++ b/src/flamenco/gossip/fd_gossip_msg.h @@ -0,0 +1,92 @@ +#ifndef HEADER_fd_src_flamenco_gossip_fd_gossip_msg_h +#define HEADER_fd_src_flamenco_gossip_fd_gossip_msg_h + +#include "fd_gossip_types.h" +#include "fd_crds_value.h" + +/* Deriving maximum number of CRDS values a message can hold: + - Maximum bytes the CRDS array can hold is + 1232(MTU)-4(msg disc)-32(pubkey)-8(crds len)=1188b + - Smallest CRDS value is 64+4+48=116b + (64b signature + 4b discriminant + 48b slot hashes) + - So, maximum number of CRDS values is 1188/(64+4+48) ~= 10 + - TODO: We might want to use a more conservative estimate that only includes + the size of the signature and discriminant. */ +#define FD_GOSSIP_MSG_MAX_CRDS (10UL) + + +/* Gossip messages encode wallclock in millis, while we + parse them into nanoseconds for internal use. */ +#define FD_NANOSEC_TO_MILLI(_ts_) ((long)(_ts_/1000000)) +#define FD_MILLI_TO_NANOSEC(_ts_) ((long)(_ts_*1000000)) + + + +struct fd_gossip_message { + uchar tag; // uint in rust bincode + union { + fd_gossip_pull_request_t pull_request[ 1 ]; + fd_gossip_pull_response_t pull_response[ 1 ]; /* CRDS Composite Type */ + fd_gossip_push_t push[ 1 ]; /* CRDS Composite Type */ + fd_gossip_prune_t prune[ 1 ]; + fd_gossip_ping_pong_t piong[ 1 ]; + }; + + /* Begin parsed gossip message metadata + + FIXME: These are strictly to operate on a parsed + gossip message that is received in encoded form. The structure + is a little awkward, especially if using this same struct to encode + a message. Crux of the problem is half the fd_gossip_message fields function as metadata + for the encoded message/payload, and the other half owns the data it parses + (namely the inner message types defined above) via memcpys. We can: + - Split this into two structs, one for metadata and one for the + inner message types. This would be a little cleaner, but also a little + more work to maintain. + - Leave it as is, and just document the structure. This isn't as clean + but is less work to maintain. */ + + /* Signature related metadata, analagous to Agave's Signable trait (at least on the rx side) + FIXME: Prune does not define signable data as a contiguous region, which is really annoying */ + struct{ + /* Should these be offsets in payload instead? */ + uchar pubkey[32UL]; + uchar signature[64UL]; + + ulong signable_data_offset; /* offset to start of signable region in payload */ + ulong signable_sz; + }; + + uchar has_shred_version; + ushort shred_version; + + /* For CRDS composites, this holds information about the CRDS values necessary + to perform an insertion into the CRDS and signature verification */ + ulong crds_cnt; /* number of CRDS values in the message, if any */ + struct { + ulong offset; /* offset to start of CRDS value in payload */ + ulong sz; /* size of CRDS value in payload */ + + fd_crds_value_t crd_val; + } crds[ FD_GOSSIP_MSG_MAX_CRDS ]; + +}; + +typedef struct fd_gossip_message fd_gossip_message_t; + +void +fd_gossip_msg_init( fd_gossip_message_t * msg ); + +ulong +fd_gossip_msg_parse( fd_gossip_message_t * msg, + uchar const * payload, + ulong payload_sz ); + +/* Initializes a payload buffer for a gossip message with tag encoded. + Returns offset into the buffer after tag, where the inner message + should begin. */ +ulong +fd_gossip_init_msg_payload( uchar * payload, + ulong payload_sz, + uchar tag ); +#endif /* HEADER_fd_src_flamenco_gossip_fd_gossip_msg_h */ diff --git a/src/flamenco/gossip/fd_gossip_msg_parse.c b/src/flamenco/gossip/fd_gossip_msg_parse.c new file mode 100644 index 0000000000..38d81750b3 --- /dev/null +++ b/src/flamenco/gossip/fd_gossip_msg_parse.c @@ -0,0 +1,143 @@ +#include "fd_gossip_msg.h" +// #include "../../ballet/txn/fd_compact_u16.h" +#include "../../disco/fd_disco_base.h" + + +/* Adapted from fd_txn_parse.c */ +#define CHECK_INIT( payload, payload_sz ) \ + uchar const * _payload = (payload); \ + ulong _payload_sz = (payload_sz); \ + ulong _bytes_consumed = 0; \ + ulong i = 0; \ + (void) _payload; \ + (void) _bytes_consumed; \ + +#define CHECK( cond ) do { \ + if( FD_UNLIKELY( !(cond) ) ) { \ + return 0; \ + } \ +} while( 0 ) + +#define CHECK_LEFT( n ) CHECK( (n)<=(_payload_sz-i) ) + +// #define READ_CHECKED_COMPACT_U16( out_sz, var_name, where ) \ +// do { \ +// ulong _where = (where); \ +// ulong _out_sz = fd_cu16_dec_sz( _payload+_where, _payload_sz-_where ); \ +// CHECK( _out_sz ); \ +// (var_name) = fd_cu16_dec_fixed( _payload+_where, _out_sz ); \ +// (out_sz) = _out_sz; \ +// } while( 0 ) + +FD_FN_UNUSED +static ulong +fd_gossip_msg_crds_arr_parse( fd_gossip_message_t * msg, + uchar const * payload, + ulong payload_sz, + ulong crds_cnt ) { + /* TODO */ + msg->crds_cnt = crds_cnt; + if( FD_UNLIKELY( crds_cnt>FD_GOSSIP_MSG_MAX_CRDS ) ) { + FD_LOG_ERR(( "Too many CRDS values in message: %lu. Possibly need to recompute FD_GOSSIP_MSG_MAX_CRDS", crds_cnt )); + } + + + (void)payload; + return payload_sz; +} + +static ulong +fd_gossip_msg_ping_pong_parse( fd_gossip_message_t * msg, + uchar const * payload, + ulong payload_sz ) { + CHECK_INIT( payload, payload_sz ); + fd_gossip_ping_pong_t * piong = msg->piong; + CHECK_LEFT( 32UL ); memcpy( piong->from, payload+i, 32UL ); i+=32UL; /* Pubkey */ + CHECK_LEFT( 32UL ); memcpy( piong->hash, payload+i, 32UL ); i+=32UL; /* Token/Hash */ + CHECK_LEFT( 64UL ); memcpy( piong->signature, payload+i, 64UL ); i+=64UL; /* Signature */ + + /* metadata */ + fd_memcpy( msg->pubkey, piong->from, 32UL ); + fd_memcpy( msg->signature, piong->signature, 64UL ); + + msg->signable_data_offset = 32UL; + msg->signable_sz = 32UL; + + return i; +} + +static ulong +fd_gossip_pull_req_parse( fd_gossip_message_t * msg, + uchar const * payload, + ulong payload_sz ) { + CHECK_INIT( payload, payload_sz ); + fd_gossip_pull_request_t * pull_request = msg->pull_request; + + /* Parse filter + FIXME: can we avoid memcpy and just pass offsets here? */ + fd_gossip_crds_filter_t * filter = pull_request->filter; + fd_gossip_bloom_t * bloom = filter->bloom; + CHECK_LEFT( 8UL ); bloom->keys_len = FD_LOAD( ulong, payload+i ); i+=8UL; + CHECK_LEFT( bloom->keys_len*8UL ); fd_memcpy( bloom->keys, payload+i, bloom->keys_len*8UL ); i+=bloom->keys_len*8UL; + + uchar has_bits = 0; + CHECK_LEFT( 1UL ); has_bits = FD_LOAD( uchar, payload+i ); i++; + if( has_bits ) { + CHECK_LEFT( 8UL ); bloom->bits_len = FD_LOAD( ulong, payload+i ); i+=8UL; + CHECK_LEFT( bloom->bits_len*8UL ); fd_memcpy( bloom->bits, payload+i, bloom->bits_len*8UL ); i+=bloom->bits_len*8UL; + CHECK_LEFT( 8UL ); /* bits_len (TODO: check this vs bitvec len above?) */; i+=8UL; + } else { + bloom->bits_len = 0UL; + } + CHECK_LEFT( 8UL ); bloom->num_bits_set = FD_LOAD( ulong, payload+i ); i+=8UL; + + CHECK_LEFT( 8UL ); filter->mask = FD_LOAD( ulong, payload+i ); i+=8UL; + CHECK_LEFT( 4UL ); filter->mask_bits = FD_LOAD( uint, payload+i ); i+=4UL; + + /* Parse contact info */ + i+=fd_gossip_msg_crds_arr_parse( msg, payload+i, payload_sz-i, 1UL ); + + /* No signable data in pull request outside of contact info CRDS, which is handled separately */ + msg->signable_sz = 0UL; + return i; +} + +ulong +fd_gossip_msg_parse( fd_gossip_message_t * msg, + uchar const * payload, + ulong payload_sz ) { + CHECK_INIT( payload, payload_sz ); + CHECK( payload_sz<=FD_GOSSIP_MTU ); + + /* Extract enum discriminant/tag (4b encoded) */ + uint tag = 0; + CHECK_LEFT( 4UL ); tag = payload[ i ]; i+=4; + CHECK( tag<=FD_GOSSIP_MESSAGE_LAST ); + msg->tag = (uchar)tag; + + ulong inner_decoded_sz = 0UL; + switch( msg->tag ){ + case FD_GOSSIP_MESSAGE_PULL_REQUEST: + case FD_GOSSIP_MESSAGE_PULL_RESPONSE: + case FD_GOSSIP_MESSAGE_PUSH: + case FD_GOSSIP_MESSAGE_PRUNE: + FD_LOG_ERR(( "Gossip message type %d parser not implemented", msg->tag )); + break; + case FD_GOSSIP_MESSAGE_PING: + inner_decoded_sz = fd_gossip_msg_ping_pong_parse( msg, payload+i, payload_sz-i ); + CHECK( inner_decoded_sz==payload_sz-i ); + break; + default: + return 0; + } + i += inner_decoded_sz; + CHECK( i<=payload_sz ); + + /* Need to increment inner offsets by 4b to account for tag + TODO: make this less error prone (at this point message is technically validated) */ + msg->signable_data_offset += 4UL; + for( ulong j=0; jcrds_cnt; j++ ) { + msg->crds[j].offset += 4UL; + } + return i; +} diff --git a/src/flamenco/gossip/fd_gossip_msg_ser.c b/src/flamenco/gossip/fd_gossip_msg_ser.c new file mode 100644 index 0000000000..ae40c8a5a7 --- /dev/null +++ b/src/flamenco/gossip/fd_gossip_msg_ser.c @@ -0,0 +1,30 @@ +#include "fd_gossip_msg.h" + +#define CHECK_INIT( payload, payload_sz ) \ + uchar const * _payload = (payload); \ + ulong _payload_sz = (payload_sz); \ + ulong _bytes_consumed = 0; \ + ulong i = 0; \ + (void) _payload; \ + (void) _bytes_consumed; \ + +#define CHECK( cond ) do { \ + if( FD_UNLIKELY( !(cond) ) ) { \ + return 0; \ + } \ +} while( 0 ) + +#define CHECK_LEFT( n ) CHECK( (n)<=(_payload_sz-i) ) + +ulong +fd_gossip_init_msg_payload( uchar * payload, + ulong payload_sz, + uchar tag ) { + CHECK_INIT( payload, payload_sz ); + CHECK_LEFT( 4UL ); /* Tag/Discriminant is actually 4b */ + if( FD_UNLIKELY( tag>FD_GOSSIP_MESSAGE_LAST ) ) { + FD_LOG_ERR(( "Invalid message tag %d", tag )); + } + payload[i] = tag; i+=4UL; + return i; /* Return size of payload so far */ +} diff --git a/src/flamenco/gossip/fd_gossip_types.h b/src/flamenco/gossip/fd_gossip_types.h index d55c0d5f94..04c6fef269 100644 --- a/src/flamenco/gossip/fd_gossip_types.h +++ b/src/flamenco/gossip/fd_gossip_types.h @@ -1,4 +1,8 @@ +#ifndef HEADER_fd_src_flamenco_gossip_fd_gossip_types_h +#define HEADER_fd_src_flamenco_gossip_fd_gossip_types_h + #include "fd_gossip.h" +#include "fd_crds_value.h" #define FD_GOSSIP_MESSAGE_PULL_REQUEST (0) #define FD_GOSSIP_MESSAGE_PULL_RESPONSE (1) @@ -7,6 +11,9 @@ #define FD_GOSSIP_MESSAGE_PING (4) #define FD_GOSSIP_MESSAGE_PONG (5) +/* Gossip message tag can never exceed this. */ +#define FD_GOSSIP_MESSAGE_LAST (FD_GOSSIP_MESSAGE_PONG) + #define FD_GOSSIP_VALUE_VOTE ( 1) #define FD_GOSSIP_VALUE_LOWEST_SLOT ( 2) #define FD_GOSSIP_VALUE_EPOCH_SLOTS ( 5) @@ -17,10 +24,10 @@ #define FD_GOSSIP_VALUE_RESTART_HEAVIEST_FORK (13) struct fd_gossip_bloom { - ulong keys_len; - ulong * keys; - ulong bits_len; - ulong * bits; + ulong keys_len; + ulong keys[ 150UL ]; /* max num keys if len(bits) == 1 */ + ulong bits_len; + ulong bits[ 150UL ]; /* max num bits if len(keys) == 1 */ ulong num_bits_set; }; @@ -34,78 +41,61 @@ struct fd_gossip_crds_filter { typedef struct fd_gossip_crds_filter fd_gossip_crds_filter_t; -struct fd_gossip_crds_data { - uchar tag; - union { - fd_gossip_vote_t vote[ 1 ]; - fd_gossip_lowest_slot_t lowest_slot[ 1 ]; - fd_gossip_epoch_slots_t epoch_slots[ 1 ]; - fd_gossip_duplicate_shred_t duplicate_shred[ 1 ]; - fd_gossip_snapshot_hashes_t snapshot_hashes[ 1 ]; - fd_gossip_contact_info_t contact_info[ 1 ]; - fd_gossip_restart_last_voted_fork_slots_t restart_last_voted_fork_slots[ 1 ]; - fd_gossip_restart_heaviest_fork_t restart_heaviest_fork[ 1 ]; - }; -}; typedef struct fd_gossip_crds_data fd_gossip_crds_data_t; -struct fd_gossip_crds_value { - uchar signature[ 64UL ]; - fd_gossip_crds_data_t data[ 1 ]; -}; -typedef struct fd_gossip_crds_value fd_gossip_crds_value_t; struct fd_gossip_pull_request { fd_gossip_crds_filter_t filter[ 1 ]; - fd_gossip_crds_value_t value[ 1 ]; + // fd_gossip_crds_value_t value[ 1 ]; }; typedef struct fd_gossip_pull_request fd_gossip_pull_request_t; struct fd_gossip_pull_response { - uchar sender_pubkey[ 32UL ]; - ulong values_len; - fd_gossip_crds_value_t * values; + uchar sender_pubkey[ 32UL ]; + ulong values_len; + // fd_gossip_crds_value_t values[ ]; }; typedef struct fd_gossip_pull_response fd_gossip_pull_response_t; struct fd_gossip_push { - uchar sender_pubkey[ 32UL ]; - ulong values_len; - fd_gossip_crds_value_t * values; + uchar sender_pubkey[ 32UL ]; + ulong values_len; + // fd_gossip_crds_value_t values[ ]; }; typedef struct fd_gossip_push fd_gossip_push_t; -struct fd_gossip_message_ping { +struct fd_gossip_message_ping_pong { uchar from[ 32UL ]; - uchar token[ 32UL ]; + union{ + uchar hash[ 32UL ]; /* Hash of the last ping */ + uchar token[ 32UL ]; /* Token to be used in the pong */ + }; uchar signature[ 64UL ]; }; -typedef struct fd_gossip_message_ping fd_gossip_ping_t; +typedef struct fd_gossip_message_ping_pong fd_gossip_ping_pong_t; -struct fd_gossip_message_pong { +struct fd_gossip_message_prune { uchar from[ 32UL ]; - uchar hash[ 32UL ]; + ulong prunes_len; + /* 33 pubkeys fit in MTU (rounded down): + 1232b (MTU) + - 4b (discriminant in gossip message) + - 32b + 8b + 64b + 32b + 8b (other fields in prune message) + = 1084b (remaining for prunes arr) + + 1084b/32 ~= 33 */ + uchar prunes[ 33UL ][ 32UL ]; uchar signature[ 64UL ]; -}; + uchar destination[ 32UL ]; + long wallclock_nanos; /* needs to be converted when parsed from a gossip message */ -typedef struct fd_gossip_message_pong fd_gossip_pong_t; - -struct fd_gossip_message { - uchar tag; - union { - fd_gossip_pull_request_t pull_request[ 1 ]; - fd_gossip_pull_response_t pull_response[ 1 ]; - fd_gossip_push_t push[ 1 ]; - fd_gossip_prune_t prune[ 1 ]; - fd_gossip_ping_t ping[ 1 ]; - fd_gossip_pong_t pong[ 1 ]; - }; }; +typedef struct fd_gossip_message_prune fd_gossip_prune_t; -typedef struct fd_gossip_message fd_gossip_message_t; +#endif /* HEADER_fd_src_flamenco_gossip_fd_gossip_types_h */ diff --git a/src/flamenco/gossip/fd_ping_tracker.c b/src/flamenco/gossip/fd_ping_tracker.c index 483965d72a..b794f9195f 100644 --- a/src/flamenco/gossip/fd_ping_tracker.c +++ b/src/flamenco/gossip/fd_ping_tracker.c @@ -201,6 +201,16 @@ fd_ping_tracker_join( void * shpt ) { return ping_tracker; } +static inline void +hash_ping_token( uchar const * ping_token, + uchar expected_pong_token[ static 32UL ], + fd_sha256_t * sha ) { + fd_sha256_init( sha ); + fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL ); + fd_sha256_append( sha, ping_token, 32UL ); + fd_sha256_fini( sha, expected_pong_token ); +} + static void remove_tracking( fd_ping_tracker_t * ping_tracker, fd_ping_peer_t * peer ) { @@ -235,10 +245,7 @@ fd_ping_tracker_track( fd_ping_tracker_t * ping_tracker, peer->state = FD_PING_TRACKER_STATE_INVALID; for( ulong i=0UL; i<32UL; i++ ) peer->ping_token[ i ] = fd_rng_uchar( ping_tracker->rng ); - fd_sha256_init( ping_tracker->sha ); - fd_sha256_append( ping_tracker->sha, "SOLANA_PING_PONG", 16UL ); - fd_sha256_append( ping_tracker->sha, peer->ping_token, 32UL ); - fd_sha256_fini( ping_tracker->sha, peer->expected_pong_token ); + hash_ping_token( peer->ping_token, peer->expected_pong_token, ping_tracker->sha ); invalid_list_ele_push_head( ping_tracker->invalid, peer, ping_tracker->pool ); peer_map_ele_insert( ping_tracker->peers, peer, ping_tracker->pool ); @@ -253,7 +260,7 @@ fd_ping_tracker_track( fd_ping_tracker_t * ping_tracker, pool_ele_release( ping_tracker->pool, peer ); return; } - + if( FD_UNLIKELY( peer_address->addr!=peer->address.addr || peer_address->port!=peer->address.port ) ) { /* Node changed address, update the address. Any existing pongs are no longer valid. */ @@ -263,10 +270,7 @@ fd_ping_tracker_track( fd_ping_tracker_t * ping_tracker, peer->next_ping_nanos = now; peer->state = FD_PING_TRACKER_STATE_INVALID; for( ulong i=0UL; i<32UL; i++ ) peer->ping_token[ i ] = fd_rng_uchar( ping_tracker->rng ); - fd_sha256_init( ping_tracker->sha ); - fd_sha256_append( ping_tracker->sha, "SOLANA_PING_PONG", 16UL ); - fd_sha256_append( ping_tracker->sha, peer->ping_token, 32UL ); - fd_sha256_fini( ping_tracker->sha, peer->expected_pong_token ); + hash_ping_token( peer->ping_token, peer->expected_pong_token, ping_tracker->sha ); invalid_list_ele_push_head( ping_tracker->invalid, peer, ping_tracker->pool ); } } @@ -297,10 +301,6 @@ fd_ping_tracker_register( fd_ping_tracker_t * ping_tracker, remove_tracking( ping_tracker, peer ); peer->state = FD_PING_TRACKER_STATE_VALID; for( ulong i=0UL; i<32UL; i++ ) peer->ping_token[ i ] = fd_rng_uchar( ping_tracker->rng ); - fd_sha256_init( ping_tracker->sha ); - fd_sha256_append( ping_tracker->sha, "SOLANA_PING_PONG", 16UL ); - fd_sha256_append( ping_tracker->sha, peer->ping_token, 32UL ); - fd_sha256_fini( ping_tracker->sha, peer->expected_pong_token ); waiting_list_ele_push_tail( ping_tracker->waiting, peer, ping_tracker->pool ); } @@ -344,7 +344,7 @@ fd_ping_tracker_pop_request( fd_ping_tracker_t * ping_tracker, else if( FD_LIKELY( peer_invalid->next_ping_nanosnext_ping_nanos && peer_invalid->next_ping_nanosnext_ping_nanos ) ) next = peer_invalid; else if( FD_LIKELY( peer_refreshing->next_ping_nanosnext_ping_nanos && peer_refreshing->next_ping_nanosnext_ping_nanos ) ) next = peer_refreshing; else next = peer_waiting; - + if( FD_UNLIKELY( next->last_rx_nanosaddr==random_address1->addr ); FD_TEST( out_address->port==random_address1->port ); - + uchar valid_pong_token[ 32UL ]; fd_sha256_t sha[1]; FD_TEST( fd_sha256_join( fd_sha256_new( sha ) ) );