Skip to content

Commit

Permalink
WIP filter gossip by snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkezone committed Jan 12, 2025
1 parent 3c6d049 commit 0fe2f2a
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions src/accountsdb/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const PeerSearchResult = struct {
untrusted_inc_snapshot_count: usize = 0,
};

const SnapshopType = enum {
Full,
Incremental,
};

/// finds valid contact infos which we can download a snapshot from.
/// valid contact infos are:
/// - not me
Expand Down Expand Up @@ -153,6 +158,8 @@ pub fn findPeersToDownloadFromAssumeCapacity(
}
}

// if requested full, if requested inc, if requested both

valid_peers.appendAssumeCapacity(.{
.contact_info = peer_contact_info,
.full_snapshot = snapshot_hashes.full,
Expand All @@ -164,6 +171,77 @@ pub fn findPeersToDownloadFromAssumeCapacity(
return result;
}

/// downloads full and incremental snapshots from peers found in gossip.
/// note: gossip_service must be running.
pub fn reliablyDownloadSnapshotsFromGossip(
allocator: std.mem.Allocator,
logger_: Logger,
// if null, then we trust any peer for snapshot download
maybe_trusted_validators: ?[]const Pubkey,
gossip_service: *GossipService,
output_dir: std.fs.Dir,
min_mb_per_sec: usize,
snapshot_type: SnapshopType,
) !void {
const logger = logger_.withScope(LOG_SCOPE);
logger
.info()
.logf("starting snapshot download with min download speed: {d} MB/s", .{min_mb_per_sec});
//
// TODO: maybe make this bigger? or dynamic?
var contact_info_buf: [1_000]ThreadSafeContactInfo = undefined;

const my_contact_info = gossip_service.my_contact_info;

var available_snapshot_peers = std.ArrayList(PeerSnapshotHash).init(allocator);
defer available_snapshot_peers.deinit();

var slow_peer_pubkeys = std.ArrayList(Pubkey).init(allocator);
defer slow_peer_pubkeys.deinit();

while (true) {
std.time.sleep(std.time.ns_per_s * 5); // wait while gossip table updates

// only hold gossip table lock for this block
{
const gossip_table, var gossip_table_lg = gossip_service.gossip_table_rw.readWithLock();
defer gossip_table_lg.unlock();

const contacts = gossip_table.getThreadSafeContactInfos(&contact_info_buf, 0);

try available_snapshot_peers.ensureTotalCapacity(contacts.len);
const result = try findPeersToDownloadFromAssumeCapacity(
allocator,
gossip_table,
contacts,
my_contact_info.shred_version,
my_contact_info.pubkey,
slow_peer_pubkeys.items,
maybe_trusted_validators,
// this is cleared and populated
&available_snapshot_peers,
);

var write_buf: [512]u8 = undefined;
var i: usize = 0;
inline for (@typeInfo(PeerSearchResult).Struct.fields) |field| {
if (@field(result, field.name) != 0) {
const r = try std.fmt.bufPrint(write_buf[i..], "{s}: {d} ", .{ field.name, @field(result, field.name) });
i += r.len;
}
}
logger
.info()
.logf("searched for snapshot peers: {s}", .{write_buf[0..i]});
}

if (snapshot_type == SnapshopType.Full) {

} else {

}
}

/// downloads full and incremental snapshots from peers found in gossip.
/// note: gossip_service must be running.
pub fn downloadSnapshotsFromGossip(
Expand Down

0 comments on commit 0fe2f2a

Please sign in to comment.