Skip to content

Commit

Permalink
feat: add Window and SharedPointerWindow data structures (Syndica#461)
Browse files Browse the repository at this point in the history
These changes are needed by Syndica#459, where they are used to track epoch states.

Previously I used an Lru to track epoch states, but the Lru requires exclusive locks on read operations, which introduces too much contention in a data structure that will be read often by many threads. The data structures in this PR were implemented to handle this use case a bit better. They can be read without an exclusive lock, and are intended to handle the predictable temporal nature of epoch transitions.

`Window` is the basic data structure that supports the idea of tracking a moving window of values.

`SharedPointerWindow` is a wrapper for `Window` that adds two features:
- thread safety via RwLock.
- manages the lifetimes of the contained data with reference counting.

`SharedPointerWindow` _could_ be implemented totally generically, enabling the user to specify any arbitrary internal container type, such as an Lru or a HashMap. But this makes the type a little bit more complex/opaque so I haven't implemented it generically. This could be done in the future if the behavior is actually needed to wrap multiple different underlying data containers.

--------

commits:

* refactor(sync): adapt RcSlice for single-item pointers

* feat: add Window and SharedPointerWindow data structures

* fix(collections): window integer overflow when realigning from 0. add test to reproduce issue and fix bug

* fix(sync): shared pointer window defer lock should be unlock
  • Loading branch information
dnut authored Dec 30, 2024
1 parent bdc065b commit 882c162
Show file tree
Hide file tree
Showing 4 changed files with 369 additions and 23 deletions.
28 changes: 5 additions & 23 deletions src/common/lru.zig
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
const std = @import("std");
const sig = @import("../sig.zig");

const Allocator = std.mem.Allocator;
const TailQueue = std.TailQueue;
const testing = std.testing;
const Mutex = std.Thread.Mutex;

const normalizeDeinitFunction = sig.sync.normalizeDeinitFunction;

pub const Kind = enum {
locking,
non_locking,
Expand All @@ -30,29 +34,7 @@ pub fn LruCacheCustom(
comptime DeinitContext: type,
comptime deinitFn_: anytype,
) type {
const deinitFn = switch (@TypeOf(deinitFn_)) {
fn (*V, DeinitContext) void => deinitFn_,

fn (V, DeinitContext) void => struct {
fn f(v: *V, ctx: DeinitContext) void {
deinitFn_(v.*, ctx);
}
}.f,

fn (V) void => struct {
fn f(v: *V, _: DeinitContext) void {
V.deinit(v.*);
}
}.f,

fn (*V) void => struct {
fn f(v: *V, _: DeinitContext) void {
V.deinit(v);
}
}.f,

else => @compileError("unsupported deinit function type"),
};
const deinitFn = normalizeDeinitFunction(V, DeinitContext, deinitFn_);
return struct {
mux: if (kind == .locking) Mutex else void,
allocator: Allocator,
Expand Down
4 changes: 4 additions & 0 deletions src/sync/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub const ref = @import("ref.zig");
pub const mux = @import("mux.zig");
pub const once_cell = @import("once_cell.zig");
pub const reference_counter = @import("reference_counter.zig");
pub const shared_memory = @import("shared_memory.zig");
pub const thread_pool = @import("thread_pool.zig");
pub const exit = @import("exit.zig");

Expand All @@ -14,6 +15,9 @@ pub const OnceCell = once_cell.OnceCell;
pub const ReferenceCounter = reference_counter.ReferenceCounter;
pub const Rc = reference_counter.Rc;
pub const RcSlice = reference_counter.RcSlice;
pub const SharedPointerWindow = shared_memory.SharedPointerWindow;
pub const ThreadPool = thread_pool.ThreadPool;

pub const ExitCondition = exit.ExitCondition;

pub const normalizeDeinitFunction = shared_memory.normalizeDeinitFunction;
148 changes: 148 additions & 0 deletions src/sync/shared_memory.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
const std = @import("std");
const sig = @import("../sig.zig");

const Allocator = std.mem.Allocator;

/// Thread safe Window that stores a single copy of data that is shared with
/// readers as a pointer to the underlying data inside the Window.
///
/// - this struct owns the data and is responsible for freeing it
/// - the lifetime of returned pointer exceeds every read operation of that pointer,
/// even if another thread evicts it from the Window, as long as `release` is used properly.
pub fn SharedPointerWindow(
T: type,
deinitItem_: anytype,
DeinitContext: type,
) type {
const Window = sig.utils.collections.Window;
const Rc = sig.sync.Rc;
const deinitItem = normalizeDeinitFunction(T, DeinitContext, deinitItem_);

return struct {
allocator: Allocator,
window: Window(Rc(T)),
center: std.atomic.Value(usize),
lock: std.Thread.RwLock = .{},
deinit_context: DeinitContext,
discard_buf: []?Rc(T),

const Self = @This();

pub fn init(
allocator: Allocator,
len: usize,
start: usize,
deinit_context: DeinitContext,
) !Self {
const discard_buf = try allocator.alloc(?Rc(T), len);
return .{
.allocator = allocator,
.window = try Window(Rc(T)).init(allocator, len, start),
.deinit_context = deinit_context,
.center = std.atomic.Value(usize).init(start),
.discard_buf = discard_buf,
};
}

pub fn deinit(self: Self) void {
for (self.window.state) |maybe_item| if (maybe_item) |item| {
self.releaseItem(item);
};
self.window.deinit();
}

pub fn put(self: *Self, index: usize, value: T) !void {
const ptr = try Rc(T).create(self.allocator);
ptr.payload().* = value;

const item_to_release = blk: {
self.lock.lock();
defer self.lock.unlock();
break :blk self.window.put(index, ptr) catch null;
};

if (item_to_release) |old| {
self.releaseItem(old);
}
}

/// call `release` when you're done with the pointer
pub fn get(self: *Self, index: usize) ?*const T {
self.lock.lockShared();
defer self.lock.unlockShared();

if (self.window.get(index)) |element| {
return element.acquire().payload();
} else {
return null;
}
}

/// call `release` when you're done with the pointer
pub fn contains(self: *Self, index: usize) bool {
self.lock.lockShared();
defer self.lock.unlockShared();

return self.window.contains(index);
}

pub fn realign(self: *Self, new_center: usize) void {
if (new_center == self.center.load(.monotonic)) return;

const items_to_release = blk: {
self.lock.lock();
defer self.lock.unlock();

self.center.store(new_center, .monotonic);
break :blk self.window.realignGet(new_center, self.discard_buf);
};

for (items_to_release) |maybe_item| {
if (maybe_item) |item| {
self.releaseItem(item);
}
}
}

pub fn release(self: *Self, ptr: *const T) void {
self.releaseItem(Rc(T).fromPayload(ptr));
}

fn releaseItem(self: *const Self, item: Rc(T)) void {
if (item.release()) |bytes_to_free| {
deinitItem(item.payload(), self.deinit_context);
self.allocator.free(bytes_to_free);
}
}
};
}

pub fn normalizeDeinitFunction(
V: type,
DeinitContext: type,
deinitFn: anytype,
) fn (*V, DeinitContext) void {
return switch (@TypeOf(deinitFn)) {
fn (*V, DeinitContext) void => deinitFn,

fn (V, DeinitContext) void => struct {
fn f(v: *V, ctx: DeinitContext) void {
deinitFn(v.*, ctx);
}
}.f,

fn (V) void => struct {
fn f(v: *V, _: DeinitContext) void {
V.deinit(v.*);
}
}.f,

fn (*V) void => struct {
fn f(v: *V, _: DeinitContext) void {
V.deinit(v);
}
}.f,

else => @compileError("unsupported deinit function type"),
};
}
Loading

0 comments on commit 882c162

Please sign in to comment.