Skip to content

WIP: changes to merge gossip2 branch #5154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 45 commits into
base: mmcgee/gossip2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b22d9d7
setup type defs and fn defintions for parsing
ravyu-jump May 16, 2025
9841130
parsers for ping and pong
ravyu-jump May 16, 2025
dfafa9b
progress
ravyu-jump May 16, 2025
df53f71
rx ping/pong, tx ping, ping/pong serializers, helper functions, coale…
ravyu-jump May 16, 2025
11a23ee
init keyguard_client
ravyu-jump May 16, 2025
bb827ab
address PR feedback pt. 1
ravyu-jump May 19, 2025
bfe8f29
parse pull req
ravyu-jump May 27, 2025
42a8f3a
refactor serializer logic to avoid redundant memcpys
ravyu-jump May 27, 2025
7c71c0a
refactor parsing structs
ravyu-jump May 27, 2025
cbcc85b
PR feedback
ravyu-jump May 27, 2025
b627fa6
fixes
ravyu-jump May 27, 2025
0f224af
gossip tile send and sign callbacks
ravyu-jump May 27, 2025
1cc410a
whitespace lints
ravyu-jump May 27, 2025
bec6677
new,join functions; wire up sign and send callbacks
ravyu-jump May 28, 2025
2048df6
remove listen port config
ravyu-jump May 28, 2025
783f6b9
CRDS prepwork part 1 (INCOMPLETE)
ravyu-jump May 28, 2025
861b9dc
pr comments
ravyu-jump May 28, 2025
165204c
CRDS prepwork part 2
ravyu-jump May 29, 2025
36b4508
ping tracker: add support for entrypoints
ravyu-jump May 30, 2025
063312b
types refactor
ravyu-jump May 31, 2025
e475ddc
revert LRU logic in ping_tracker_track
ravyu-jump May 31, 2025
04ad53c
more types changes
ravyu-jump Jun 2, 2025
5c4235c
refactor verify
ravyu-jump Jun 3, 2025
b63065f
revert ping_tracker based entrypoint logic
ravyu-jump Jun 3, 2025
2cf77f1
refactor verify pt 2
ravyu-jump Jun 3, 2025
f73c4e4
prune view + sigverify
ravyu-jump Jun 3, 2025
1af5025
formatting
ravyu-jump Jun 3, 2025
e5219b2
ping parse fix
ravyu-jump Jun 3, 2025
3e18377
clean up ping/pong serializers, entrypoint filter on rx_pong, formatting
ravyu-jump Jun 3, 2025
900468d
crds mask iterator
ravyu-jump Jun 4, 2025
c3392c5
More CRDS related progress:
ravyu-jump Jun 5, 2025
dd0161f
cleanup
ravyu-jump Jun 5, 2025
04713e9
CRDS contact info table
ravyu-jump Jun 6, 2025
5714092
rename failed_insert circular buffer fields to match purged_list
ravyu-jump Jun 6, 2025
5f584d2
fix incorrect indexing in bloom filter bits array
ravyu-jump Jun 9, 2025
84f8163
move encoding code to mgs_ser
ravyu-jump Jun 9, 2025
acfd489
views for primitive types
ravyu-jump Jun 10, 2025
53c1ab3
rename crds bytes field
ravyu-jump Jun 10, 2025
85abfcd
merge ping/pong ser/parse structs
ravyu-jump Jun 11, 2025
e3127d8
contact info serialization
ravyu-jump Jun 11, 2025
84540a7
push state plumbing p1
ravyu-jump Jun 11, 2025
e1c0763
rx prune
ravyu-jump Jun 11, 2025
1c83f18
set contact info
ravyu-jump Jun 12, 2025
9f3250d
PR feedback
ravyu-jump Jun 12, 2025
8db10db
active set constants
ravyu-jump Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/firedancer-dev/commands/gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ gossip_topo( config_t * config ) {
net_tile->net.gossip_listen_port = config->gossip.port;

fd_topob_wksp( topo, "gossip" );
fd_topo_tile_t * gossip_tile = fd_topob_tile( topo, "gossip", "gossip", "metric_in", 0UL, 0, 0 );
fd_topo_tile_t * gossip_tile = fd_topob_tile( topo, "gossip", "gossip", "metric_in", 0UL, 0, 1 /* uses_keyswitch */ );

strncpy( gossip_tile->gossip.identity_key_path, config->paths.identity_key, sizeof(gossip_tile->gossip.identity_key_path) );
gossip_tile->gossip.entrypoints_cnt = config->gossip.entrypoints_cnt;
Expand Down
2 changes: 1 addition & 1 deletion src/app/firedancer/topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ static void
resolve_gossip_entrypoints( config_t * config ) {
ulong entrypoint_cnt = config->gossip.entrypoints_cnt;
for( ulong i=0UL; i<entrypoint_cnt; i++ ) {
if( FD_UNLIKELY( resolve_gossip_entrypoint( config->gossip.entrypoints[ i ], &config->gossip.resolved_entrypoints[ i ] ) ) ) {
if( FD_UNLIKELY( 0==resolve_gossip_entrypoint( config->gossip.entrypoints[ i ], &config->gossip.resolved_entrypoints[ i ] )) ) {
FD_LOG_ERR(( "failed to resolve address of [gossip.entrypoints] entry \"%s\"", config->gossip.entrypoints[ i ] ));
}
}
Expand Down
171 changes: 137 additions & 34 deletions src/discof/gossip/fd_gossip_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,35 @@ typedef struct {
} fd_gossip_out_ctx_t;

struct fd_gossip_tile_ctx {
fd_gossip_t * gossip;
fd_gossip_t * gossip;

fd_pubkey_t identity_key[1]; /* Just the public key */
fd_contact_info_t my_contact_info[1];

uint rng_seed;
ulong rng_idx;
uint rng_seed;
ulong rng_idx;

double ticks_per_ns;
long last_wallclock;
long last_tickcount;
double ticks_per_ns;
long last_wallclock;
long last_tickcount;

uchar buffer[ FD_NET_MTU ];
uchar buffer[ FD_NET_MTU ];

fd_gossip_in_ctx_t in[ 32UL ];
int in_kind[ 32UL ];
fd_gossip_in_ctx_t in[ 32UL ];
int in_kind[ 32UL ];

fd_gossip_out_ctx_t net_out[ 1 ];
fd_gossip_out_ctx_t gossip_out[ 1 ];
fd_gossip_out_ctx_t net_out[ 1 ];
fd_frag_meta_t * net_out_mcache;
ulong net_out_seq;
ulong net_out_depth;

fd_gossip_out_ctx_t gossip_out[ 1 ];
fd_gossip_out_ctx_t sign_out[ 1 ];

fd_keyguard_client_t keyguard_client[ 1 ];
fd_keyswitch_t * keyswitch;

fd_ip4_udp_hdrs_t net_out_hdr[ 1 ]; /* Used to construct outgoing network packets */
ushort net_id; /* Network ID for outgoing packets */
};

typedef struct fd_gossip_tile_ctx fd_gossip_tile_ctx_t;
Expand All @@ -66,13 +74,59 @@ scratch_footprint( fd_topo_tile_t const * tile ) {
return FD_LAYOUT_FINI( l, scratch_align() );
}

static void
gossip_send_fn( void * ctx,
uchar const * payload,
ulong payload_sz,
fd_ip4_port_t const * peer_address,
ulong tsorig ) {
fd_gossip_tile_ctx_t * gossip_ctx = (fd_gossip_tile_ctx_t *)ctx;

ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t);

uchar * packet = (uchar *)fd_chunk_to_laddr( gossip_ctx->net_out->mem, gossip_ctx->net_out->chunk );
fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
*hdr = *gossip_ctx->net_out_hdr;

fd_ip4_hdr_t * ip4 = hdr->ip4;
fd_udp_hdr_t * udp = hdr->udp;

ip4->net_tot_len = fd_ushort_bswap( (ushort)(packet_sz) );
udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
ip4->daddr = peer_address->addr;
udp->net_dport = peer_address->port;
ip4->check = fd_ip4_hdr_check_fast( ip4 );
ip4->net_id = fd_ushort_bswap( gossip_ctx->net_id++ );

/* TODO: Construct payload in place to avoid memcpy here. */
fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz );

/* Publish fragment */
ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
ulong sig = fd_disco_netmux_sig( peer_address->addr, peer_address->port, peer_address->addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last arg is ignored, should probably be set to 0.


fd_mcache_publish( gossip_ctx->net_out_mcache, gossip_ctx->net_out_depth, gossip_ctx->net_out_seq, sig, gossip_ctx->net_out->chunk, packet_sz, 0UL, tsorig, tspub );
gossip_ctx->net_out_seq = fd_seq_inc( gossip_ctx->net_out_seq, 1UL );
gossip_ctx->net_out->chunk = fd_dcache_compact_next( gossip_ctx->net_out->chunk, packet_sz, gossip_ctx->net_out->chunk0, gossip_ctx->net_out->wmark );
}

static void
gossip_sign_fn( void * ctx,
uchar const * data,
ulong data_sz,
int sign_type,
uchar * out_signature ) {
fd_gossip_tile_ctx_t * gossip_ctx = (fd_gossip_tile_ctx_t *)ctx;
fd_keyguard_client_sign( gossip_ctx->keyguard_client, out_signature, data, data_sz, sign_type );
}

static inline void
during_housekeeping( fd_gossip_tile_ctx_t * ctx ) {
if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
/* TODO: Need some kind of state machine here, to ensure we switch
in sync with the signing tile. Currently, we might send out a
badly signed message before the signing tile has switched. */
fd_gossip_set_identity( ctx->gossip, ctx->keyswitch->bytes );
// fd_gossip_set_identity( ctx->gossip, ctx->keyswitch->bytes );
fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
}

Expand All @@ -83,15 +137,16 @@ during_housekeeping( fd_gossip_tile_ctx_t * ctx ) {

static inline void
metrics_write( fd_gossip_tile_ctx_t * ctx ) {
fd_gossip_metrics_t const * metrics = fd_gossip_metrics( ctx->gossip );
(void)ctx;
// fd_gossip_metrics_t const * metrics = fd_gossip_metrics( ctx->gossip );

FD_MGAUGE_SET( GOSSIP, TABLE_SIZE, metrics->table_size );
FD_MGAUGE_SET( GOSSIP, TABLE_EXPIRED, metrics->table_expired );
FD_MGAUGE_SET( GOSSIP, TABLE_EVICTED, metrics->table_evicted );
// FD_MGAUGE_SET( GOSSIP, TABLE_SIZE, metrics->table_size );
// FD_MGAUGE_SET( GOSSIP, TABLE_EXPIRED, metrics->table_expired );
// FD_MGAUGE_SET( GOSSIP, TABLE_EVICTED, metrics->table_evicted );

FD_MGAUGE_SET( GOSSIP, PURGED_SIZE, metrics->purged_size );
// FD_MGAUGE_SET( GOSSIP, PURGED_SIZE, metrics->purged_size );

FD_MGAUGE_SET( GOSSIP, FAILED_SIZE, metrics->failed_size );
// FD_MGAUGE_SET( GOSSIP, FAILED_SIZE, metrics->failed_size );
}

static inline void
Expand Down Expand Up @@ -124,15 +179,17 @@ after_frag( fd_gossip_tile_ctx_t * ctx,
ulong sz,
ulong tsorig FD_PARAM_UNUSED,
ulong tspub FD_PARAM_UNUSED,
fd_stem_context_t * stem FD_PARAM_UNUSED ) {
fd_stem_context_t * stem FD_PARAM_UNUSED) {
long now = ctx->last_wallclock + (long)((double)(fd_tickcount()-ctx->last_tickcount)/ctx->ticks_per_ns);
if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
long now = ctx->last_wallclock + (long)((double)(fd_tickcount()-ctx->last_tickcount)/ctx->ticks_per_ns);

fd_gossip_advance( ctx->gossip, now );
fd_gossip_rx( ctx->gossip, ctx->buffer, sz, now );
} else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SHRED_VERSION ) ) {
FD_MGAUGE_SET( GOSSIP, SHRED_VERSION, (ushort)sig );
fd_gossip_set_expected_shred_version( ctx->gossip, 1, (ushort)sig );
ctx->my_contact_info->shred_version = (ushort)sig;
ctx->my_contact_info->wallclock_nanos = now;
fd_gossip_set_my_contact_info( ctx->gossip, ctx->my_contact_info );
} else {
FD_LOG_ERR(( "unexpected in_kind %d", ctx->in_kind[ in_idx ] ));
}
Expand All @@ -149,7 +206,7 @@ privileged_init( fd_topo_t * topo,
if( FD_UNLIKELY( !strcmp( tile->gossip.identity_key_path, "" ) ) )
FD_LOG_ERR(( "identity_key_path not set" ));

ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->gossip.identity_key_path, /* pubkey only: */ 1 ) );
fd_memcpy( ctx->my_contact_info->pubkey, fd_type_pun_const( fd_keyload_load( tile->gossip.identity_key_path, /* pubkey only: */ 1 ) ), 32UL );
FD_TEST( 4UL==getrandom( &ctx->rng_seed, 4UL, 0 ) );
FD_TEST( 8UL==getrandom( &ctx->rng_idx, 8UL, 0 ) );
}
Expand All @@ -168,8 +225,6 @@ out1( fd_topo_t const * topo,
}
}

if( FD_UNLIKELY( idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had no output link named %s", tile->name, tile->kind_id, name ));

void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, topo->links[ tile->out_link_id[ idx ] ].mtu );
Expand All @@ -181,21 +236,37 @@ static void
unprivileged_init( fd_topo_t * topo,
fd_topo_tile_t * tile ) {
void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );

FD_SCRATCH_ALLOC_INIT( l, scratch );
fd_gossip_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_tile_ctx_t), sizeof(fd_gossip_tile_ctx_t) );
void * gossip = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_align(), fd_gossip_footprint( tile->gossip.max_entries ) );

ctx->ticks_per_ns = fd_tempo_tick_per_ns( NULL );
ctx->last_wallclock = fd_log_wallclock();
ctx->last_tickcount = fd_tickcount();

fd_rng_t rng[ 1 ];
FD_TEST( fd_rng_join( fd_rng_new( rng, ctx->rng_seed, ctx->rng_idx ) ) );

/* TODO setup my_contact_info */
if( tile->gossip.has_expected_shred_version ) {
ctx->my_contact_info->shred_version = tile->gossip.expected_shred_version;
} else {
ctx->my_contact_info->shred_version = 0;
}
ctx->my_contact_info->wallclock_nanos = ctx->last_wallclock;

ctx->gossip = fd_gossip_join( fd_gossip_new( gossip,
rng,
tile->gossip.max_entries,
tile->gossip.has_expected_shred_version,
tile->gossip.expected_shred_version,
tile->gossip.entrypoints_cnt,
tile->gossip.entrypoints,
ctx->identity_key->uc ) );
ctx->my_contact_info,

gossip_send_fn,
(void*)ctx,
gossip_sign_fn,
(void*)ctx ) );
FD_TEST( ctx->gossip );

FD_MGAUGE_SET( GOSSIP, SHRED_VERSION, tile->gossip.expected_shred_version );
Expand All @@ -207,10 +278,7 @@ unprivileged_init( fd_topo_t * topo,
ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->keyswitch_obj_id ) );
FD_TEST( ctx->keyswitch );

ctx->ticks_per_ns = fd_tempo_tick_per_ns( NULL );
ctx->last_wallclock = fd_log_wallclock();
ctx->last_tickcount = fd_tickcount();

ulong sign_in_tile_idx = ULONG_MAX;
for( ulong i=0UL; i<tile->in_cnt; i++ ) {
fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
Expand All @@ -223,13 +291,48 @@ unprivileged_init( fd_topo_t * topo,
ctx->in_kind[ i ] = IN_KIND_SHRED_VERSION;
} else if( FD_UNLIKELY( !strcmp( link->name, "net_gossip" ) ) ) {
ctx->in_kind[ i ] = IN_KIND_NET;
} else if( FD_UNLIKELY( !strcmp( link->name, "sign_gossip" ) ) ) {
ctx->in_kind[ i ] = IN_KIND_SIGN;
sign_in_tile_idx = i;
} else {
FD_LOG_ERR(( "unexpected input link name %s", link->name ));
}
}

if( FD_UNLIKELY( sign_in_tile_idx==ULONG_MAX ) )
FD_LOG_ERR(( "tile %s:%lu had no input link named sign_gossip", tile->name, tile->kind_id ));

*ctx->net_out = out1( topo, tile, "gossip_net" );
*ctx->gossip_out = out1( topo, tile, "gossip_out" );
*ctx->sign_out = out1( topo, tile, "gossip_sign" );

fd_topo_link_t * net_out_link = &topo->links[ tile->out_link_id[ ctx->net_out->idx ] ];
ctx->net_out_mcache = net_out_link->mcache;
ctx->net_out_seq = fd_mcache_seq_query( fd_mcache_seq_laddr( ctx->net_out_mcache ) );
ctx->net_out_depth = fd_mcache_depth( ctx->net_out_mcache );

/* Optional out links (?) */
for( ulong i=0UL; i<tile->out_cnt; i++ ) {
fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
if( FD_UNLIKELY( !strcmp( link->name, "gossip_out" ) ) ) {
*ctx->gossip_out = out1( topo, tile, "gossip_out" );
}
}

fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id [ sign_in_tile_idx ] ];
fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ ctx->sign_out->idx ] ];

if( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
sign_out->mcache,
sign_out->dcache,
sign_in->mcache,
sign_in->dcache ) )==NULL ) {
FD_LOG_ERR(( "failed to join keyguard client" ));
}

fd_ip4_udp_hdr_init( ctx->net_out_hdr,
FD_GOSSIP_MTU,
tile->gossip.ip_addr,
tile->gossip.ports.gossip );

ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
Expand Down
3 changes: 3 additions & 0 deletions src/flamenco/gossip/Local.mk
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
$(call add-hdrs,fd_gossip.h fd_gossip_types.h fd_ping_tracker.h)
$(call add-objs,fd_gossip fd_ping_tracker fd_gossip_msg_ser fd_gossip_msg_parse fd_gossip_msg_ser,fd_flamenco)

$(call add-hdrs,fd_bloom.h)
$(call add-objs,fd_bloom fd_active_set fd_ping_tracker,fd_flamenco)

Expand Down
Loading
Loading