Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 130 additions & 23 deletions src/providers/provider.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1161,18 +1161,6 @@ pub fn Provider(comptime cfg: ProviderConfig) type {
};
defer shared_allocator.free(sessions_dir);

var root_dir = std.Io.Dir.openDirAbsolute(io, sessions_dir, .{ .iterate = true }) catch |err| {
log.info(
"collectEvents: skipping, unable to open sessions dir '{s}' ({s})",
.{ sessions_dir, @errorName(err) },
);
return;
};
defer root_dir.close(io);

var walker = try root_dir.walk(shared_allocator);
defer walker.deinit();

var relative_paths: std.ArrayList([]u8) = .empty;
defer {
for (relative_paths.items) |path| shared_allocator.free(path);
Expand All @@ -1185,12 +1173,26 @@ pub fn Provider(comptime cfg: ProviderConfig) type {
extra_paths.deinit(shared_allocator);
}

while (try walker.next(io)) |entry| {
if (entry.kind != .file) continue;
const relative_path = std.mem.sliceTo(entry.path, 0);
if (!std.mem.endsWith(u8, relative_path, json_ext)) continue;
const copy = try shared_allocator.dupe(u8, relative_path);
try relative_paths.append(shared_allocator, copy);
scan_sessions_dir: {
var root_dir = std.Io.Dir.openDirAbsolute(io, sessions_dir, .{ .iterate = true }) catch |err| {
log.info(
"collectEvents: unable to open sessions dir '{s}' ({s}); continuing without directory scan",
.{ sessions_dir, @errorName(err) },
);
break :scan_sessions_dir;
};
defer root_dir.close(io);

var walker = try root_dir.walk(shared_allocator);
defer walker.deinit();

while (try walker.next(io)) |entry| {
if (entry.kind != .file) continue;
const relative_path = std.mem.sliceTo(entry.path, 0);
if (!std.mem.endsWith(u8, relative_path, json_ext)) continue;
const copy = try shared_allocator.dupe(u8, relative_path);
try relative_paths.append(shared_allocator, copy);
Comment on lines +1193 to +1195
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If relative_paths.append fails with error.OutOfMemory, the newly allocated copy will be leaked. Adding an errdefer right after the allocation ensures that copy is freed if the append operation fails.

                    const copy = try shared_allocator.dupe(u8, relative_path);
                    errdefer shared_allocator.free(copy);
                    try relative_paths.append(shared_allocator, copy);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7143445 by freeing duplicated relative paths if append fails, and applying the same guard to configured extra session file paths.

}
}

const home = ctx.environ_map.get("HOME") orelse "";
Expand Down Expand Up @@ -1308,13 +1310,16 @@ pub fn Provider(comptime cfg: ProviderConfig) type {
continue;
};

if (relative.len <= json_ext.len or !std.mem.endsWith(u8, relative, json_ext)) continue;
const session_id_slice = relative[0 .. relative.len - json_ext.len];

const maybe_session_id = duplicateNonEmpty(worker_allocator, session_id_slice) catch {
const maybe_session_id = duplicateNonEmpty(worker_allocator, session_id_slice) catch |err| {
logSessionWarning(absolute_path, "failed to build session id", err);
continue;
};
const session_id = maybe_session_id orelse {
log.warn("collectEvents: skipping session file with empty session id '{s}'", .{absolute_path});
continue;
};
const session_id = maybe_session_id orelse continue;

parseSessionFile(
worker_allocator,
Expand Down Expand Up @@ -1359,10 +1364,14 @@ pub fn Provider(comptime cfg: ProviderConfig) type {

const basename = std.fs.path.basename(absolute_path);
const session_slice = std.fs.path.stem(basename);
const maybe_session_id = duplicateNonEmpty(extra_allocator, session_slice) catch {
const maybe_session_id = duplicateNonEmpty(extra_allocator, session_slice) catch |err| {
logSessionWarning(absolute_path, "failed to build extra session id", err);
continue;
};
const session_id = maybe_session_id orelse {
log.warn("collectEvents: skipping extra session file with empty session id '{s}'", .{absolute_path});
continue;
};
const session_id = maybe_session_id orelse continue;

parseSessionFile(
extra_allocator,
Expand Down Expand Up @@ -1526,3 +1535,101 @@ pub fn Provider(comptime cfg: ProviderConfig) type {
}
};
}

test "provider scans extra session files when sessions dir is missing" {
const allocator = std.testing.allocator;
const io = std.testing.io;

var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();

{
var extra_file = try tmp.dir.createFile(io, "extra.db", .{});
extra_file.close(io);
}

const extra_path = try tmp.dir.realPathFileAlloc(io, "extra.db", allocator);
defer allocator.free(extra_path);
const home = std.fs.path.dirname(extra_path) orelse return error.InvalidSessionPath;

var environ_map = std.process.Environ.Map.init(allocator);
defer environ_map.deinit();
try environ_map.put("HOME", home);

const Capture = struct {
count: usize = 0,
session_id: ?[]u8 = null,

fn deinit(self: *@This(), alloc: std.mem.Allocator) void {
if (self.session_id) |value| alloc.free(value);
}

fn ingest(
ctx_ptr: *anyopaque,
alloc: std.mem.Allocator,
event: *const model.TokenUsageEvent,
filters: model.DateFilters,
) model.IngestError!void {
_ = filters;
const self: *@This() = @ptrCast(@alignCast(ctx_ptr));
if (self.session_id) |value| alloc.free(value);
self.session_id = try alloc.dupe(u8, event.session_id);
self.count += 1;
}
};

var capture = Capture{};
defer capture.deinit(allocator);

const consumer = EventConsumer{
.context = &capture,
.ingest = Capture.ingest,
};

const parse_extra_file = struct {
fn parse(
_: std.mem.Allocator,
_: *const ParseContext,
runtime: *const ParseRuntime,
session_id: []const u8,
_: []const u8,
_: ?*MessageDeduper,
_: i32,
sink: EventSink,
) !void {
try sink.emit(runtime.io, .{
.session_id = session_id,
.timestamp = "2026-05-29T00:00:00Z",
.local_iso_date = .{ '2', '0', '2', '6', '-', '0', '5', '-', '2', '9' },
.model = "test-model",
.usage = .{
.input_tokens = 1,
.output_tokens = 2,
.total_tokens = 3,
},
.is_fallback = false,
.display_input_tokens = 1,
});
}
}.parse;

const ExtraOnlyProvider = makeProvider(.{
.scope = .provider_extra_file_test,
.sessions_dir_suffix = "/missing-session-dir",
.session_file_ext = ".json",
.extra_session_file_suffixes = &.{"/extra.db"},
.parse_session_fn = parse_extra_file,
});

const ctx = Context{
.allocator = allocator,
.temp_allocator = allocator,
.io = io,
.environ_map = &environ_map,
};

try ExtraOnlyProvider.streamEvents(ctx, .{}, consumer);

try std.testing.expectEqual(@as(usize, 1), capture.count);
try std.testing.expectEqualStrings("extra", capture.session_id orelse "");
}
Loading