Skip to content

Commit

Permalink
fix(shred-network): various bugs (Syndica#481)
Browse files Browse the repository at this point in the history
This PR was originally motivated by errors that occur all over the entire application due to memory corruption. This happens regardless of what mode the validator is run in, as long as the shred network is running. This has been a problem since the retransmit service was combined with the shred collector, which introduced the bug. In the process of debugging this issue I also fixed various other problems in shred-network.

### Widespread Memory Corruption

In `shred_network.service.start`, the `retransmit_channel` was being deinitialized before use due to `defer retransmit_channel.deinit()` being present in the start function. The start function returns immediately, so we can't be deiniting the channels here. This is a use-after-free and triggers widespread memory corruption throughout the entire application, resulting in errors like segmentation faults in gossip and any other running services.

The fix for this is to not use a `defer` statement within the start function, but instead only deinit the channel when the service itself is deinitialized. This is currently accomplished by calling `service_manager.deferCall`. Perhaps it is worth considering another design to avoid the footgun of adding ordinary defers to the start function if you're not familiar with the ServiceManager pattern. See https://github.com/orgs/Syndica/projects/2/views/10?pane=issue&itemId=92733572

### RPC Client Panics

In the shred-collector command, one instance of the RPC Client was being used in multiple threads despite not being thread safe. This was causing various panics in code called by the RPC Client. I fixed this in `cmd.zig` by moving the sole call to the client in the main thread to occur before the client is passed to another thread.

### Leaks in Shred Network

Channels pointers that are created for Shred Network were not being destroyed. The Channel's internal resources were being cleaned up, but the Channel pointer itself was not. I added a `destroy` method to complement the Channel's `create` method, and made use of that in the `shred_network.start`

### Thread Safety in SharedWindowPointer

There was a potential problem where the `discard_buf` could potentially be used by multiple threads, leading to memory corruption. I did not observe any errors due to this, but it was a flaw in the general thread safety of the struct. Now, instead of just mutating the shared buffer from any thread, an atomic pointer ensures that only a single thread at a time has access to the buffer. If another thread really needs a buffer at the same time, it will alloc/free a dedicated buffer. This is unlikely to occur with the current way the struct is used, but it's there to ensure the struct is fully thread safe.

### Leak in SharedWindowPointer

The `discard_buf` and its contents were not being freed by SharedWindowPointer. I added a test to reveal this, and fixed the leak in the deinit function.

### RpcEpochContextService issues

RpcEpochContextService had various minor problems that I fixed.

It would acquire a context outside the loop, then again loop over the same value and try to acquire the same context. This can be fully handled by the loop itself, so I removed the logic from outside the loop.

There was also a potential memory leak on error which needed an errdefer.

The intended design of the context manager is to only return an error if the *current* epoch's context cannot be acquired. Silent failure should be allowed only for other contexts. By accident, this was only returning an error if there was a failure retrieving the *last* epoch context.

I added a skipped test that you can unskip to manually run the RpcEpochContextManager. It helped me understand some of these bugs. Ideally in the future this test can be unskipped but it will require some changes to the RPC client to allow mocking of the RPC endpoint.
  • Loading branch information
dnut authored Jan 7, 2025
1 parent 74d3c43 commit 00ec2fe
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 62 deletions.
44 changes: 31 additions & 13 deletions src/adapter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ pub const EpochContextManager = struct {
return self.contexts.contains(@intCast(epoch));
}

pub fn setEpoch(self: *Self, epoch: Epoch) void {
self.contexts.realign(@intCast(epoch));
pub fn setEpoch(self: *Self, epoch: Epoch) !void {
try self.contexts.realign(@intCast(epoch));
}

pub fn setSlot(self: *Self, slot: Slot) void {
self.contexts.realign(@intCast(self.schedule.getEpoch(slot)));
pub fn setSlot(self: *Self, slot: Slot) !void {
try self.contexts.realign(@intCast(self.schedule.getEpoch(slot)));
}

pub fn release(self: *Self, context: *const sig.core.EpochContext) void {
Expand Down Expand Up @@ -112,18 +112,15 @@ pub const RpcEpochContextService = struct {
fn refresh(self: *Self) !void {
const response = try self.rpc_client.getSlot(self.allocator, .{});
defer response.deinit();
const old_slot = try response.result() - self.state.schedule.slots_per_epoch;
const last_epoch = self.state.schedule.getEpoch(old_slot);
const this_slot = try response.result();
const this_epoch = self.state.schedule.getEpoch(this_slot);
const old_slot = this_slot -| self.state.schedule.slots_per_epoch;

self.state.setEpoch(last_epoch);

const ls1 = try self.getLeaderSchedule(old_slot);
const ctx1 = EpochContext{ .staked_nodes = .{}, .leader_schedule = ls1 };
try self.state.put(last_epoch, ctx1);
try self.state.setEpoch(this_epoch);

for (0..3) |epoch_offset| {
const selected_slot = old_slot + epoch_offset * self.state.schedule.slots_per_epoch;
const selected_epoch = last_epoch + epoch_offset;
const selected_epoch = this_epoch + epoch_offset -| 1;
std.debug.assert(selected_epoch == self.state.schedule.getEpoch(selected_slot));

if (self.state.contains(selected_epoch)) {
Expand All @@ -132,8 +129,9 @@ pub const RpcEpochContextService = struct {

if (self.getLeaderSchedule(selected_slot)) |ls2| {
const ctx2 = EpochContext{ .staked_nodes = .{}, .leader_schedule = ls2 };
errdefer self.allocator.free(ls2);
try self.state.put(selected_epoch, ctx2);
} else |e| if (selected_epoch == last_epoch) {
} else |e| if (selected_epoch == this_epoch) {
return e;
}
}
Expand All @@ -147,3 +145,23 @@ pub const RpcEpochContextService = struct {
return schedule.slot_leaders;
}
};

test "epochctx" {
if (true) return error.SkipZigTest;
const allocator = std.testing.allocator;

const genesis_config = try sig.accounts_db.GenesisConfig
.init(allocator, "data/genesis-files/testnet_genesis.bin");
defer genesis_config.deinit(allocator);

var rpc_client = sig.rpc.Client.init(allocator, .Testnet, .{});
defer rpc_client.deinit();

var epoch_context_manager = try sig.adapter.EpochContextManager
.init(allocator, genesis_config.epoch_schedule);
defer epoch_context_manager.deinit();
var rpc_epoch_ctx_service = sig.adapter.RpcEpochContextService
.init(allocator, .noop, &epoch_context_manager, rpc_client);

try rpc_epoch_ctx_service.refresh();
}
13 changes: 7 additions & 6 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,12 @@ fn shredCollector() !void {
var rpc_client = sig.rpc.Client.init(allocator, genesis_config.cluster_type, .{});
defer rpc_client.deinit();

var shred_network_conf = config.current.shred_network;
shred_network_conf.start_slot = shred_network_conf.start_slot orelse blk: {
const response = try rpc_client.getSlot(allocator, .{});
break :blk try response.result();
};

const repair_port: u16 = config.current.shred_network.repair_port;
const turbine_recv_port: u16 = config.current.shred_network.turbine_recv_port;

Expand Down Expand Up @@ -949,13 +955,8 @@ fn shredCollector() !void {
const my_contact_info = sig.gossip.data.ThreadSafeContactInfo.fromContactInfo(gossip_service.my_contact_info);

// shred networking
var shred_col_conf = config.current.shred_network;
shred_col_conf.start_slot = shred_col_conf.start_slot orelse blk: {
const response = try rpc_client.getSlot(allocator, .{});
break :blk try response.result();
};
var shred_network_manager = try sig.shred_network.start(
shred_col_conf,
shred_network_conf,
.{
.allocator = allocator,
.logger = app_base.logger.unscoped(),
Expand Down
2 changes: 1 addition & 1 deletion src/ledger/shred_inserter/shred_inserter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ pub const ShredInserter = struct {
recovered: usize,
) void {
const start, const end = erasure_meta.dataShredsIndices();
self.logger.debug().logf(
self.logger.trace().logf(
\\datapoint: blockstore-erasure
\\ slot: {[slot]}
\\ start_index: {[start_index]}
Expand Down
22 changes: 13 additions & 9 deletions src/shred_network/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,20 @@ pub fn start(
.{},
);
var arena = service_manager.arena.allocator();
const defers = &service_manager.defers; // use this instead of defer statements

const repair_socket = try bindUdpReusable(conf.repair_port);
const turbine_socket = try bindUdpReusable(conf.turbine_recv_port);

var retransmit_channel = try sig.sync.Channel(sig.net.Packet).init(deps.allocator);
defer retransmit_channel.deinit();
// channels
const unverified_shred_channel = try Channel(Packet).create(deps.allocator);
try defers.deferCall(Channel(Packet).destroy, .{unverified_shred_channel});
const shreds_to_insert_channel = try Channel(Packet).create(deps.allocator);
try defers.deferCall(Channel(Packet).destroy, .{shreds_to_insert_channel});
const retransmit_channel = try Channel(sig.net.Packet).create(deps.allocator);
try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel});

// receiver (threads)
const unverified_shred_channel = try Channel(Packet).create(deps.allocator);
const verified_shred_channel = try Channel(Packet).create(deps.allocator);
const shred_receiver = try arena.create(ShredReceiver);
shred_receiver.* = .{
.allocator = deps.allocator,
Expand Down Expand Up @@ -116,8 +120,8 @@ pub fn start(
deps.exit,
deps.registry,
unverified_shred_channel,
verified_shred_channel,
&retransmit_channel,
shreds_to_insert_channel,
retransmit_channel,
deps.epoch_context_mgr.slotLeaders(),
},
);
Expand All @@ -139,7 +143,7 @@ pub fn start(
deps.exit,
deps.logger.unscoped(),
deps.registry,
verified_shred_channel,
shreds_to_insert_channel,
shred_tracker,
deps.shred_inserter,
deps.epoch_context_mgr.slotLeaders(),
Expand All @@ -155,7 +159,7 @@ pub fn start(
.my_contact_info = deps.my_contact_info,
.epoch_context_mgr = deps.epoch_context_mgr,
.gossip_table_rw = deps.gossip_table_rw,
.receiver = &retransmit_channel,
.receiver = retransmit_channel,
.maybe_num_retransmit_threads = deps.n_retransmit_threads,
.overwrite_stake_for_testing = deps.overwrite_turbine_stake_for_testing,
.exit = deps.exit,
Expand Down Expand Up @@ -183,7 +187,7 @@ pub fn start(
deps.exit,
);
const repair_svc = try arena.create(RepairService);
try service_manager.defers.deferCall(RepairService.deinit, .{repair_svc});
try defers.deferCall(RepairService.deinit, .{repair_svc});
repair_svc.* = try RepairService.init(
deps.allocator,
deps.logger.unscoped(),
Expand Down
56 changes: 31 additions & 25 deletions src/sync/channel.zig
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,43 @@ pub fn Channel(T: type) type {
};
}

pub fn deinit(channel: *Self) void {
var head = channel.head.index.raw;
var tail = channel.tail.index.raw;
var block = channel.head.block.raw;

head &= ~((@as(usize, 1) << SHIFT) - 1);
tail &= ~((@as(usize, 1) << SHIFT) - 1);

while (head != tail) {
const offset = (head >> SHIFT) % LAP;

if (offset >= BLOCK_CAP) {
const next = block.?.next.raw;
channel.allocator.destroy(block.?);
block = next;
}

head +%= (1 << SHIFT);
}

if (block) |b| {
channel.allocator.destroy(b);
}
}

pub fn create(allocator: Allocator) !*Self {
const channel = try allocator.create(Self);
channel.* = try Self.init(allocator);
return channel;
}

/// to deinit channels created with `create`
pub fn destroy(channel: *Self) void {
channel.deinit();
channel.allocator.destroy(channel);
}

pub fn close(channel: *Self) void {
channel.closed.store(true, .monotonic);
channel.condition.broadcast();
Expand Down Expand Up @@ -356,31 +387,6 @@ pub fn Channel(T: type) type {
// The channel is empty if the indices are pointing at the same slot.
return (head >> SHIFT) == (tail >> SHIFT);
}

pub fn deinit(channel: *Self) void {
var head = channel.head.index.raw;
var tail = channel.tail.index.raw;
var block = channel.head.block.raw;

head &= ~((@as(usize, 1) << SHIFT) - 1);
tail &= ~((@as(usize, 1) << SHIFT) - 1);

while (head != tail) {
const offset = (head >> SHIFT) % LAP;

if (offset >= BLOCK_CAP) {
const next = block.?.next.raw;
channel.allocator.destroy(block.?);
block = next;
}

head +%= (1 << SHIFT);
}

if (block) |b| {
channel.allocator.destroy(b);
}
}
};
}

Expand Down
57 changes: 49 additions & 8 deletions src/sync/shared_memory.zig
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn SharedPointerWindow(
center: std.atomic.Value(usize),
lock: std.Thread.RwLock = .{},
deinit_context: DeinitContext,
discard_buf: []?Rc(T),
discard_buf: std.atomic.Value(?[*]?Rc(T)),

const Self = @This();

Expand All @@ -34,31 +34,34 @@ pub fn SharedPointerWindow(
start: usize,
deinit_context: DeinitContext,
) !Self {
const discard_buf = try allocator.alloc(?Rc(T), len);
return .{
.allocator = allocator,
.window = try Window(Rc(T)).init(allocator, len, start),
.deinit_context = deinit_context,
.center = std.atomic.Value(usize).init(start),
.discard_buf = discard_buf,
.discard_buf = std.atomic.Value(?[*]?Rc(T)).init(null),
};
}

pub fn deinit(self: Self) void {
for (self.window.state) |maybe_item| if (maybe_item) |item| {
self.releaseItem(item);
};
self.window.deinit();
self.window.deinit(self.allocator);
if (self.discard_buf.load(.monotonic)) |buf| {
self.allocator.free(buf[0..self.window.state.len]);
}
}

pub fn put(self: *Self, index: usize, value: T) !void {
const ptr = try Rc(T).create(self.allocator);
errdefer ptr.deinit(self.allocator);
ptr.payload().* = value;

const item_to_release = blk: {
self.lock.lock();
defer self.lock.unlock();
break :blk self.window.put(index, ptr) catch null;
break :blk try self.window.put(index, ptr);
};

if (item_to_release) |old| {
Expand Down Expand Up @@ -86,15 +89,17 @@ pub fn SharedPointerWindow(
return self.window.contains(index);
}

pub fn realign(self: *Self, new_center: usize) void {
pub fn realign(self: *Self, new_center: usize) !void {
if (new_center == self.center.load(.monotonic)) return;
const discard_buf = try self.acquireDiscardBuf();
defer self.releaseDiscardBuf(discard_buf);

const items_to_release = blk: {
self.lock.lock();
defer self.lock.unlock();

self.center.store(new_center, .monotonic);
break :blk self.window.realignGet(new_center, self.discard_buf);
break :blk self.window.realignGet(new_center, discard_buf);
};

for (items_to_release) |maybe_item| {
Expand All @@ -114,6 +119,19 @@ pub fn SharedPointerWindow(
self.allocator.free(bytes_to_free);
}
}

fn acquireDiscardBuf(self: *Self) ![]?Rc(T) {
return if (self.discard_buf.swap(null, .acquire)) |buf|
buf[0..self.window.state.len]
else
try self.allocator.alloc(?Rc(T), self.window.state.len);
}

fn releaseDiscardBuf(self: *Self, buf: []?Rc(T)) void {
if (self.discard_buf.swap(buf.ptr, .release)) |extra_buf| {
self.allocator.free(extra_buf[0..self.window.state.len]);
}
}
};
}

Expand Down Expand Up @@ -143,6 +161,29 @@ pub fn normalizeDeinitFunction(
}
}.f,

else => @compileError("unsupported deinit function type"),
else => if (DeinitContext == Allocator and
@TypeOf(deinitFn) == @TypeOf(Allocator.free) and deinitFn == Allocator.free)
struct {
fn free(v: *V, allocator: Allocator) void {
allocator.free(v.*);
}
}.free
else
@compileError("unsupported deinit function type"),
};
}

test "SharedPointerWindow frees memory" {
const allocator = std.testing.allocator;
var window = try SharedPointerWindow([]u8, Allocator.free, Allocator)
.init(allocator, 3, 1, allocator);
defer window.deinit();
const first = try allocator.alloc(u8, 1);
try window.put(0, first);
const second = try allocator.alloc(u8, 1);
try window.put(0, second);
const third = try allocator.alloc(u8, 1);
try window.put(1, third);
const fourth = try allocator.alloc(u8, 1);
try window.put(2, fourth);
}

0 comments on commit 00ec2fe

Please sign in to comment.