diff --git a/src/adapter.zig b/src/adapter.zig index 247a1e9d5..56f08ce28 100644 --- a/src/adapter.zig +++ b/src/adapter.zig @@ -50,12 +50,12 @@ pub const EpochContextManager = struct { return self.contexts.contains(@intCast(epoch)); } - pub fn setEpoch(self: *Self, epoch: Epoch) void { - self.contexts.realign(@intCast(epoch)); + pub fn setEpoch(self: *Self, epoch: Epoch) !void { + try self.contexts.realign(@intCast(epoch)); } - pub fn setSlot(self: *Self, slot: Slot) void { - self.contexts.realign(@intCast(self.schedule.getEpoch(slot))); + pub fn setSlot(self: *Self, slot: Slot) !void { + try self.contexts.realign(@intCast(self.schedule.getEpoch(slot))); } pub fn release(self: *Self, context: *const sig.core.EpochContext) void { @@ -112,18 +112,15 @@ pub const RpcEpochContextService = struct { fn refresh(self: *Self) !void { const response = try self.rpc_client.getSlot(self.allocator, .{}); defer response.deinit(); - const old_slot = try response.result() - self.state.schedule.slots_per_epoch; - const last_epoch = self.state.schedule.getEpoch(old_slot); + const this_slot = try response.result(); + const this_epoch = self.state.schedule.getEpoch(this_slot); + const old_slot = this_slot -| self.state.schedule.slots_per_epoch; - self.state.setEpoch(last_epoch); - - const ls1 = try self.getLeaderSchedule(old_slot); - const ctx1 = EpochContext{ .staked_nodes = .{}, .leader_schedule = ls1 }; - try self.state.put(last_epoch, ctx1); + try self.state.setEpoch(this_epoch); for (0..3) |epoch_offset| { const selected_slot = old_slot + epoch_offset * self.state.schedule.slots_per_epoch; - const selected_epoch = last_epoch + epoch_offset; + const selected_epoch = this_epoch + epoch_offset -| 1; std.debug.assert(selected_epoch == self.state.schedule.getEpoch(selected_slot)); if (self.state.contains(selected_epoch)) { @@ -132,8 +129,9 @@ pub const RpcEpochContextService = struct { if (self.getLeaderSchedule(selected_slot)) |ls2| { const ctx2 = EpochContext{ .staked_nodes = .{}, .leader_schedule = ls2 }; + errdefer self.allocator.free(ls2); try self.state.put(selected_epoch, ctx2); - } else |e| if (selected_epoch == last_epoch) { + } else |e| if (selected_epoch == this_epoch) { return e; } } @@ -147,3 +145,23 @@ pub const RpcEpochContextService = struct { return schedule.slot_leaders; } }; + +test "epochctx" { + if (true) return error.SkipZigTest; + const allocator = std.testing.allocator; + + const genesis_config = try sig.accounts_db.GenesisConfig + .init(allocator, "data/genesis-files/testnet_genesis.bin"); + defer genesis_config.deinit(allocator); + + var rpc_client = sig.rpc.Client.init(allocator, .Testnet, .{}); + defer rpc_client.deinit(); + + var epoch_context_manager = try sig.adapter.EpochContextManager + .init(allocator, genesis_config.epoch_schedule); + defer epoch_context_manager.deinit(); + var rpc_epoch_ctx_service = sig.adapter.RpcEpochContextService + .init(allocator, .noop, &epoch_context_manager, rpc_client); + + try rpc_epoch_ctx_service.refresh(); +} diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index b4b4ae8e9..414bcf2b1 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -878,6 +878,12 @@ fn shredCollector() !void { var rpc_client = sig.rpc.Client.init(allocator, genesis_config.cluster_type, .{}); defer rpc_client.deinit(); + var shred_network_conf = config.current.shred_network; + shred_network_conf.start_slot = shred_network_conf.start_slot orelse blk: { + const response = try rpc_client.getSlot(allocator, .{}); + break :blk try response.result(); + }; + const repair_port: u16 = config.current.shred_network.repair_port; const turbine_recv_port: u16 = config.current.shred_network.turbine_recv_port; @@ -949,13 +955,8 @@ fn shredCollector() !void { const my_contact_info = sig.gossip.data.ThreadSafeContactInfo.fromContactInfo(gossip_service.my_contact_info); // shred networking - var shred_col_conf = config.current.shred_network; - shred_col_conf.start_slot = shred_col_conf.start_slot orelse blk: { - const response = try rpc_client.getSlot(allocator, .{}); - break :blk try response.result(); - }; var shred_network_manager = try sig.shred_network.start( - shred_col_conf, + shred_network_conf, .{ .allocator = allocator, .logger = app_base.logger.unscoped(), diff --git a/src/ledger/shred_inserter/shred_inserter.zig b/src/ledger/shred_inserter/shred_inserter.zig index f4cc130b6..9965ad589 100644 --- a/src/ledger/shred_inserter/shred_inserter.zig +++ b/src/ledger/shred_inserter/shred_inserter.zig @@ -947,7 +947,7 @@ pub const ShredInserter = struct { recovered: usize, ) void { const start, const end = erasure_meta.dataShredsIndices(); - self.logger.debug().logf( + self.logger.trace().logf( \\datapoint: blockstore-erasure \\ slot: {[slot]} \\ start_index: {[start_index]} diff --git a/src/shred_network/service.zig b/src/shred_network/service.zig index cdcc896df..125f4547a 100644 --- a/src/shred_network/service.zig +++ b/src/shred_network/service.zig @@ -79,16 +79,20 @@ pub fn start( .{}, ); var arena = service_manager.arena.allocator(); + const defers = &service_manager.defers; // use this instead of defer statements const repair_socket = try bindUdpReusable(conf.repair_port); const turbine_socket = try bindUdpReusable(conf.turbine_recv_port); - var retransmit_channel = try sig.sync.Channel(sig.net.Packet).init(deps.allocator); - defer retransmit_channel.deinit(); + // channels + const unverified_shred_channel = try Channel(Packet).create(deps.allocator); + try defers.deferCall(Channel(Packet).destroy, .{unverified_shred_channel}); + const shreds_to_insert_channel = try Channel(Packet).create(deps.allocator); + try defers.deferCall(Channel(Packet).destroy, .{shreds_to_insert_channel}); + const retransmit_channel = try Channel(sig.net.Packet).create(deps.allocator); + try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel}); // receiver (threads) - const unverified_shred_channel = try Channel(Packet).create(deps.allocator); - const verified_shred_channel = try Channel(Packet).create(deps.allocator); const shred_receiver = try arena.create(ShredReceiver); shred_receiver.* = .{ .allocator = deps.allocator, @@ -116,8 +120,8 @@ pub fn start( deps.exit, deps.registry, unverified_shred_channel, - verified_shred_channel, - &retransmit_channel, + shreds_to_insert_channel, + retransmit_channel, deps.epoch_context_mgr.slotLeaders(), }, ); @@ -139,7 +143,7 @@ pub fn start( deps.exit, deps.logger.unscoped(), deps.registry, - verified_shred_channel, + shreds_to_insert_channel, shred_tracker, deps.shred_inserter, deps.epoch_context_mgr.slotLeaders(), @@ -155,7 +159,7 @@ pub fn start( .my_contact_info = deps.my_contact_info, .epoch_context_mgr = deps.epoch_context_mgr, .gossip_table_rw = deps.gossip_table_rw, - .receiver = &retransmit_channel, + .receiver = retransmit_channel, .maybe_num_retransmit_threads = deps.n_retransmit_threads, .overwrite_stake_for_testing = deps.overwrite_turbine_stake_for_testing, .exit = deps.exit, @@ -183,7 +187,7 @@ pub fn start( deps.exit, ); const repair_svc = try arena.create(RepairService); - try service_manager.defers.deferCall(RepairService.deinit, .{repair_svc}); + try defers.deferCall(RepairService.deinit, .{repair_svc}); repair_svc.* = try RepairService.init( deps.allocator, deps.logger.unscoped(), diff --git a/src/sync/channel.zig b/src/sync/channel.zig index 6b05e81d0..60bf88f86 100644 --- a/src/sync/channel.zig +++ b/src/sync/channel.zig @@ -95,12 +95,43 @@ pub fn Channel(T: type) type { }; } + pub fn deinit(channel: *Self) void { + var head = channel.head.index.raw; + var tail = channel.tail.index.raw; + var block = channel.head.block.raw; + + head &= ~((@as(usize, 1) << SHIFT) - 1); + tail &= ~((@as(usize, 1) << SHIFT) - 1); + + while (head != tail) { + const offset = (head >> SHIFT) % LAP; + + if (offset >= BLOCK_CAP) { + const next = block.?.next.raw; + channel.allocator.destroy(block.?); + block = next; + } + + head +%= (1 << SHIFT); + } + + if (block) |b| { + channel.allocator.destroy(b); + } + } + pub fn create(allocator: Allocator) !*Self { const channel = try allocator.create(Self); channel.* = try Self.init(allocator); return channel; } + /// to deinit channels created with `create` + pub fn destroy(channel: *Self) void { + channel.deinit(); + channel.allocator.destroy(channel); + } + pub fn close(channel: *Self) void { channel.closed.store(true, .monotonic); channel.condition.broadcast(); @@ -356,31 +387,6 @@ pub fn Channel(T: type) type { // The channel is empty if the indices are pointing at the same slot. return (head >> SHIFT) == (tail >> SHIFT); } - - pub fn deinit(channel: *Self) void { - var head = channel.head.index.raw; - var tail = channel.tail.index.raw; - var block = channel.head.block.raw; - - head &= ~((@as(usize, 1) << SHIFT) - 1); - tail &= ~((@as(usize, 1) << SHIFT) - 1); - - while (head != tail) { - const offset = (head >> SHIFT) % LAP; - - if (offset >= BLOCK_CAP) { - const next = block.?.next.raw; - channel.allocator.destroy(block.?); - block = next; - } - - head +%= (1 << SHIFT); - } - - if (block) |b| { - channel.allocator.destroy(b); - } - } }; } diff --git a/src/sync/shared_memory.zig b/src/sync/shared_memory.zig index 4b5ff7de2..82f3968b4 100644 --- a/src/sync/shared_memory.zig +++ b/src/sync/shared_memory.zig @@ -24,7 +24,7 @@ pub fn SharedPointerWindow( center: std.atomic.Value(usize), lock: std.Thread.RwLock = .{}, deinit_context: DeinitContext, - discard_buf: []?Rc(T), + discard_buf: std.atomic.Value(?[*]?Rc(T)), const Self = @This(); @@ -34,13 +34,12 @@ pub fn SharedPointerWindow( start: usize, deinit_context: DeinitContext, ) !Self { - const discard_buf = try allocator.alloc(?Rc(T), len); return .{ .allocator = allocator, .window = try Window(Rc(T)).init(allocator, len, start), .deinit_context = deinit_context, .center = std.atomic.Value(usize).init(start), - .discard_buf = discard_buf, + .discard_buf = std.atomic.Value(?[*]?Rc(T)).init(null), }; } @@ -48,17 +47,21 @@ pub fn SharedPointerWindow( for (self.window.state) |maybe_item| if (maybe_item) |item| { self.releaseItem(item); }; - self.window.deinit(); + self.window.deinit(self.allocator); + if (self.discard_buf.load(.monotonic)) |buf| { + self.allocator.free(buf[0..self.window.state.len]); + } } pub fn put(self: *Self, index: usize, value: T) !void { const ptr = try Rc(T).create(self.allocator); + errdefer ptr.deinit(self.allocator); ptr.payload().* = value; const item_to_release = blk: { self.lock.lock(); defer self.lock.unlock(); - break :blk self.window.put(index, ptr) catch null; + break :blk try self.window.put(index, ptr); }; if (item_to_release) |old| { @@ -86,15 +89,17 @@ pub fn SharedPointerWindow( return self.window.contains(index); } - pub fn realign(self: *Self, new_center: usize) void { + pub fn realign(self: *Self, new_center: usize) !void { if (new_center == self.center.load(.monotonic)) return; + const discard_buf = try self.acquireDiscardBuf(); + defer self.releaseDiscardBuf(discard_buf); const items_to_release = blk: { self.lock.lock(); defer self.lock.unlock(); self.center.store(new_center, .monotonic); - break :blk self.window.realignGet(new_center, self.discard_buf); + break :blk self.window.realignGet(new_center, discard_buf); }; for (items_to_release) |maybe_item| { @@ -114,6 +119,19 @@ pub fn SharedPointerWindow( self.allocator.free(bytes_to_free); } } + + fn acquireDiscardBuf(self: *Self) ![]?Rc(T) { + return if (self.discard_buf.swap(null, .acquire)) |buf| + buf[0..self.window.state.len] + else + try self.allocator.alloc(?Rc(T), self.window.state.len); + } + + fn releaseDiscardBuf(self: *Self, buf: []?Rc(T)) void { + if (self.discard_buf.swap(buf.ptr, .release)) |extra_buf| { + self.allocator.free(extra_buf[0..self.window.state.len]); + } + } }; } @@ -143,6 +161,29 @@ pub fn normalizeDeinitFunction( } }.f, - else => @compileError("unsupported deinit function type"), + else => if (DeinitContext == Allocator and + @TypeOf(deinitFn) == @TypeOf(Allocator.free) and deinitFn == Allocator.free) + struct { + fn free(v: *V, allocator: Allocator) void { + allocator.free(v.*); + } + }.free + else + @compileError("unsupported deinit function type"), }; } + +test "SharedPointerWindow frees memory" { + const allocator = std.testing.allocator; + var window = try SharedPointerWindow([]u8, Allocator.free, Allocator) + .init(allocator, 3, 1, allocator); + defer window.deinit(); + const first = try allocator.alloc(u8, 1); + try window.put(0, first); + const second = try allocator.alloc(u8, 1); + try window.put(0, second); + const third = try allocator.alloc(u8, 1); + try window.put(1, third); + const fourth = try allocator.alloc(u8, 1); + try window.put(2, fourth); +}