diff --git a/demo/zig/LICENSE b/demo/zig/LICENSE new file mode 100644 index 0000000..e48e096 --- /dev/null +++ b/demo/zig/LICENSE @@ -0,0 +1,277 @@ +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: {name license(s), +version(s), and exceptions or additional permissions here}." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/demo/zig/README.md b/demo/zig/README.md new file mode 100644 index 0000000..5d91a7c --- /dev/null +++ b/demo/zig/README.md @@ -0,0 +1,219 @@ +# maelstrom-zig-node + +Zig node framework for building distributed systems for learning for +https://github.com/jepsen-io/maelstrom and solving https://fly.io/dist-sys/ +challenges. + +## What is Maelstrom? + +Maelstrom is a platform for learning distributed systems. It is build around Jepsen and Elle to ensure no properties are +violated. With maelstrom you build nodes that form distributed system that can process different workloads. + +## Features + +- zig 0.10.1 + mt +- Runtime API +- response types auto-deduction, extra data available via Value() +- unknown message types handling +- a/sync RPC() support + timeout / context +- lin/seq/lww kv storage + +## Examples + +### Echo workload + +```bash +zig build && ~/maelstrom/maelstrom test -w echo --bin ./zig-out/bin/echo --node-count 1 --time-limit 10 --log-stderr +```` + +implementation: + +```zig +const m = @import("maelstrom"); +const std = @import("std"); + +pub const log = m.log.f; +pub const log_level = .debug; + +pub fn main() !void { + var runtime = try m.Runtime.init(); + try runtime.handle("echo", echo); + try runtime.run(); +} + +fn echo(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + self.send_back_ok(req); +} +``` + +spec: + +receiving + +```json +{ + "src": "c1", + "dest": "n1", + "body": { + "type": "echo", + "msg_id": 1, + "echo": "Please echo 35" + } +} +``` + +send back the same msg with body.type == echo_ok. + +```json +{ + "src": "n1", + "dest": "c1", + "body": { + "type": "echo_ok", + "msg_id": 1, + "in_reply_to": 1, + "echo": "Please echo 35" + } +} +``` + +### Broadcast workload + +```sh +zig build && ~/maelstrom/maelstrom test -w broadcast --bin ./zig-out/bin/broadcast --node-count 2 --time-limit 20 --rate 10 --log-stderr +``` + +implementation: + +```zig +var storage: *Storage = undefined; + +pub fn main() !void { + var runtime = try m.init(); + storage = try Storage.init(runtime.alloc); + try runtime.handle("read", read); + try runtime.handle("broadcast", broadcast); + try runtime.handle("topology", topology); + try runtime.run(); +} + +fn read(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + self.reply(req, ReadOk{ + .messages = storage.snapshot(self.alloc) catch return m.Error.Abort, + }); +} + +fn broadcast(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Broadcast, self.alloc, req.body) catch return m.Error.MalformedRequest; + + if (storage.add(in.message) catch return m.Error.Abort) { + var ns = self.neighbours(); + while (ns.next()) |node| { + self.send(node, .{ + .typ = "broadcast", + .message = in.message, + }); + } + } + + if (!self.is_cluster_node(req.src)) { + self.reply_ok(req); + } +} + +fn topology(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + // FIXME: oops sorry, compiler bug: + // panic: Zig compiler bug: attempted to destroy declaration with an attached error + // const in = try m.proto.json_map_obj(Topology, self.alloc, req.body); + const data = req.body.raw.Object.get("topology"); + if (data == null) return m.Error.MalformedRequest; + std.log.info("got new topology: {s}", .{std.json.stringifyAlloc(self.alloc, data, .{}) catch return m.Error.Abort}); + self.reply_ok(req); +} +``` + +### lin-kv workload + +```sh +zig build && ~/maelstrom/maelstrom test -w lin-kv --bin ./zig-out/bin/lin_kv --node-count 4 --concurrency 2n --time-limit 20 --rate 100 --log-stderr +``` + +implementation: + +```zig +var kv: m.kv.Storage = m.kv.Storage.init_lin_kv(); + +pub fn main() !void { + var runtime = try m.init(); + try runtime.handle("read", read); + try runtime.handle("write", write); + try runtime.handle("cas", cas); + try runtime.run(); +} + +fn read(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Read, self.alloc, req.body) catch return m.Error.MalformedRequest; + const key = std.fmt.allocPrint(self.alloc, "{}", .{in.key}) catch return m.Error.Crash; + const val = try kv.get(self, key, 0); + + self.reply(req, ReadOk{ + .value = val.Integer, + }); +} + +fn write(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Write, self.alloc, req.body) catch return m.Error.MalformedRequest; + const key = std.fmt.allocPrint(self.alloc, "{}", .{in.key}) catch return m.Error.Crash; + + try kv.put(self, key, in.value, 0); + + self.reply_ok(req); +} + +fn cas(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Cas, self.alloc, req.body) catch return m.Error.MalformedRequest; + const key = std.fmt.allocPrint(self.alloc, "{}", .{in.key}) catch return m.Error.Crash; + const put_var = req.body.raw.Object.get("put"); + const put = if (put_var) |v| v.Bool else false; + + try kv.cas(self, key, in.from, in.to, put, 0); + + self.reply_ok(req); +} +``` + +### g-set workload + +```sh +zig build && ~/maelstrom/maelstrom test -w g-set --bin ./zig-out/bin/g_set --node-count 2 --concurrency 2n --time-limit 20 --rate 10 --log-stderr +``` + +implementation: + +```zig +... +``` + +## API + +see examples. + +## Why + +Now its a good time to learn Zig. Zig is beautiful C-like language. +That Will be not perfect but ok. Thanks TigerBeetle for the inspiration. + +Thanks Aphyr and guys a lot. + +## Where + +[GitHub](https://github.com/sitano/maelstrom-zig-node) / Ivan Prisyazhnyi / @JohnKoepi + +## Links + +- +- +- +- +- +- diff --git a/demo/zig/build.zig b/demo/zig/build.zig new file mode 100644 index 0000000..2e81e2c --- /dev/null +++ b/demo/zig/build.zig @@ -0,0 +1,83 @@ +const std = @import("std"); + +const pkgs = struct { + const maelstrom = std.build.Pkg{ + .name = "maelstrom", + .source = .{ .path = "src/main.zig" }, + .dependencies = &[_]std.build.Pkg{}, + }; +}; + +pub fn build_lib(b: *std.build.Builder) void { + // Standard release options allow the person running `zig build` to select + // between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. + const mode = b.standardReleaseOptions(); + + const lib = b.addStaticLibrary("maelstrom-zig-node", "src/main.zig"); + lib.setBuildMode(mode); + lib.install(); + + const main_tests = b.addTest("src/main.zig"); + main_tests.setBuildMode(mode); + + const test_step = b.step("test", "Run library tests"); + test_step.dependOn(&main_tests.step); +} + +pub fn build_example(b: *std.build.Builder, target: anytype, name: []const u8) void { + // Standard release options allow the person running `zig build` to select + // between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. + const mode = b.standardReleaseOptions(); + + const path = std.fmt.allocPrint(b.allocator, "examples/{s}.zig", .{name}) catch { + @panic("out of memory"); + }; + defer b.allocator.free(path); + + const exe = b.addExecutable(name, path); + exe.addPackage(pkgs.maelstrom); + exe.setTarget(target); + exe.setBuildMode(mode); + exe.install(); + + const run_cmd = exe.run(); + run_cmd.step.dependOn(b.getInstallStep()); + if (b.args) |args| { + run_cmd.addArgs(args); + } + + const run_step = b.step("run", "Run the app"); + run_step.dependOn(&run_cmd.step); + + const exe_tests = b.addTest(path); + exe_tests.setTarget(target); + exe_tests.setBuildMode(mode); + + const test_step = b.step("test", "Run unit tests"); + test_step.dependOn(&exe_tests.step); +} + +pub fn build(b: *std.build.Builder) void { + // usr/lib/zig/std/event/loop.zig:16:39: error: async has not been + // implemented in the self-hosted compiler yet + // + // /usr/lib/zig/std/event/loop.zig:16:39: note: to use async enable the + // stage1 compiler with either '-fstage1' or by setting '.use_stage1 = true` + // in your 'build.zig' script + // + // no needed any more: b.use_stage1 = true; + + build_lib(b); + + // Standard target options allows the person running `zig build` to choose + // what target to build for. Here we do not override the defaults, which + // means any target is allowed, and the default is native. Other options + // for restricting supported target set are available. + const target = b.standardTargetOptions(.{}); + + build_example(b, target, "echo"); + build_example(b, target, "broadcast"); + build_example(b, target, "broadcast_rpc_sync"); + build_example(b, target, "broadcast_rpc_async"); + build_example(b, target, "lin_kv"); +} diff --git a/demo/zig/examples/broadcast.zig b/demo/zig/examples/broadcast.zig new file mode 100644 index 0000000..6955442 --- /dev/null +++ b/demo/zig/examples/broadcast.zig @@ -0,0 +1,101 @@ +// zig build && ~/maelstrom/maelstrom test -w broadcast --bin ./zig-out/bin/broadcast --node-count 2 --time-limit 20 --rate 10 --log-stderr + +const m = @import("maelstrom"); +const std = @import("std"); + +pub const log = m.log.f; +pub const log_level = .debug; + +var storage: *Storage = undefined; + +pub fn main() !void { + var runtime = try m.init(); + storage = try Storage.init(runtime.alloc); + try runtime.handle("read", read); + try runtime.handle("broadcast", broadcast); + try runtime.handle("topology", topology); + try runtime.run(); +} + +fn read(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + self.reply(req, ReadOk{ + .messages = storage.snapshot(self.alloc) catch return m.Error.Abort, + }); +} + +fn broadcast(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Broadcast, self.alloc, req.body) catch return m.Error.MalformedRequest; + + if (storage.add(in.message) catch return m.Error.Abort) { + var ns = self.neighbours(); + while (ns.next()) |node| { + self.send(node, .{ + .typ = "broadcast", + .message = in.message, + }); + } + } + + if (!self.is_cluster_node(req.src)) { + self.reply_ok(req); + } +} + +fn topology(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + // FIXME: oops sorry, compiler bug: + // panic: Zig compiler bug: attempted to destroy declaration with an attached error + // const in = try m.proto.json_map_obj(Topology, self.alloc, req.body); + const data = req.body.raw.Object.get("topology"); + if (data == null) return m.Error.MalformedRequest; + std.log.info("got new topology: {s}", .{std.json.stringifyAlloc(self.alloc, data, .{}) catch return m.Error.Abort}); + self.reply_ok(req); +} + +const ReadOk = struct { + messages: []u64, +}; + +const Broadcast = struct { + message: u64, +}; + +const Topology = struct { + topology: std.StringHashMap(std.ArrayList([]const u8)), +}; + +const Storage = struct { + a: std.mem.Allocator, + m: std.Thread.Mutex, + s: std.AutoArrayHashMap(u64, bool), + + pub fn init(alloc: std.mem.Allocator) !*Storage { + var res = try alloc.create(Storage); + res.* = Storage{ + .a = alloc, + .m = std.Thread.Mutex{}, + .s = std.AutoArrayHashMap(u64, bool).init(alloc), + }; + return res; + } + + pub fn add(self: *Storage, val: u64) !bool { + self.m.lock(); + defer self.m.unlock(); + var res = try self.s.getOrPut(val); + res.value_ptr.* = true; + return !res.found_existing; + } + + pub fn snapshot(self: *Storage, alloc: std.mem.Allocator) ![]u64 { + self.m.lock(); + defer self.m.unlock(); + var res = try alloc.alloc(u64, self.s.count()); + var i: usize = 0; + var it = self.s.iterator(); + while (it.next()) |item| { + res[i] = item.key_ptr.*; + i += 1; + } + return res; + } +}; diff --git a/demo/zig/examples/broadcast_rpc_async.zig b/demo/zig/examples/broadcast_rpc_async.zig new file mode 100644 index 0000000..8504300 --- /dev/null +++ b/demo/zig/examples/broadcast_rpc_async.zig @@ -0,0 +1,101 @@ +// zig build && ~/maelstrom/maelstrom test -w broadcast --bin ./zig-out/bin/broadcast_rpc_async --node-count 2 --time-limit 20 --rate 10 --log-stderr + +const m = @import("maelstrom"); +const std = @import("std"); + +pub const log = m.log.f; +pub const log_level = .debug; + +var storage: *Storage = undefined; + +pub fn main() !void { + var runtime = try m.init(); + storage = try Storage.init(runtime.alloc); + try runtime.handle("read", read); + try runtime.handle("broadcast", broadcast); + try runtime.handle("topology", topology); + try runtime.run(); +} + +fn read(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + self.reply(req, ReadOk{ + .messages = storage.snapshot(self.alloc) catch return m.Error.Abort, + }); +} + +fn broadcast(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Broadcast, self.alloc, req.body) catch return m.Error.MalformedRequest; + + if (storage.add(in.message) catch return m.Error.Abort) { + var ns = self.neighbours(); + while (ns.next()) |node| { + if (!std.mem.eql(u8, node, req.src)) { + _ = self.call_async(node, .{ + .typ = "broadcast", + .message = in.message, + }); + } + } + } + + self.reply_ok(req); +} + +fn topology(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + // FIXME: oops sorry, compiler bug: + // panic: Zig compiler bug: attempted to destroy declaration with an attached error + // const in = try m.proto.json_map_obj(Topology, self.alloc, req.body); + const data = req.body.raw.Object.get("topology"); + if (data == null) return m.Error.MalformedRequest; + std.log.info("got new topology: {s}", .{std.json.stringifyAlloc(self.alloc, data, .{}) catch return m.Error.Abort}); + self.reply_ok(req); +} + +const ReadOk = struct { + messages: []u64, +}; + +const Broadcast = struct { + message: u64, +}; + +const Topology = struct { + topology: std.StringHashMap(std.ArrayList([]const u8)), +}; + +const Storage = struct { + a: std.mem.Allocator, + m: std.Thread.Mutex, + s: std.AutoArrayHashMap(u64, bool), + + pub fn init(alloc: std.mem.Allocator) !*Storage { + var res = try alloc.create(Storage); + res.* = Storage{ + .a = alloc, + .m = std.Thread.Mutex{}, + .s = std.AutoArrayHashMap(u64, bool).init(alloc), + }; + return res; + } + + pub fn add(self: *Storage, val: u64) !bool { + self.m.lock(); + defer self.m.unlock(); + var res = try self.s.getOrPut(val); + res.value_ptr.* = true; + return !res.found_existing; + } + + pub fn snapshot(self: *Storage, alloc: std.mem.Allocator) ![]u64 { + self.m.lock(); + defer self.m.unlock(); + var res = try alloc.alloc(u64, self.s.count()); + var i: usize = 0; + var it = self.s.iterator(); + while (it.next()) |item| { + res[i] = item.key_ptr.*; + i += 1; + } + return res; + } +}; diff --git a/demo/zig/examples/broadcast_rpc_sync.zig b/demo/zig/examples/broadcast_rpc_sync.zig new file mode 100644 index 0000000..48616c0 --- /dev/null +++ b/demo/zig/examples/broadcast_rpc_sync.zig @@ -0,0 +1,103 @@ +// zig build && ~/maelstrom/maelstrom test -w broadcast --bin ./zig-out/bin/broadcast_rpc_sync --node-count 2 --time-limit 20 --rate 10 --log-stderr + +const m = @import("maelstrom"); +const std = @import("std"); + +pub const log = m.log.f; +pub const log_level = .debug; + +var storage: *Storage = undefined; + +pub fn main() !void { + var runtime = try m.init(); + storage = try Storage.init(runtime.alloc); + try runtime.handle("read", read); + try runtime.handle("broadcast", broadcast); + try runtime.handle("topology", topology); + try runtime.run(); +} + +fn read(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + self.reply(req, ReadOk{ + .messages = storage.snapshot(self.alloc) catch return m.Error.Abort, + }); +} + +fn broadcast(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Broadcast, self.alloc, req.body) catch return m.Error.MalformedRequest; + + if (storage.add(in.message) catch return m.Error.Abort) { + var ns = self.neighbours(); + while (ns.next()) |node| { + if (!std.mem.eql(u8, node, req.src)) { + var rpc = self.call(node, .{ + .typ = "broadcast", + .message = in.message, + }); + defer rpc.deinit(); + _ = rpc.wait(); + } + } + } + + self.reply_ok(req); +} + +fn topology(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + // FIXME: oops sorry, compiler bug: + // panic: Zig compiler bug: attempted to destroy declaration with an attached error + // const in = try m.proto.json_map_obj(Topology, self.alloc, req.body); + const data = req.body.raw.Object.get("topology"); + if (data == null) return m.Error.MalformedRequest; + std.log.info("got new topology: {s}", .{std.json.stringifyAlloc(self.alloc, data, .{}) catch return m.Error.Abort}); + self.reply_ok(req); +} + +const ReadOk = struct { + messages: []u64, +}; + +const Broadcast = struct { + message: u64, +}; + +const Topology = struct { + topology: std.StringHashMap(std.ArrayList([]const u8)), +}; + +const Storage = struct { + a: std.mem.Allocator, + m: std.Thread.Mutex, + s: std.AutoArrayHashMap(u64, bool), + + pub fn init(alloc: std.mem.Allocator) !*Storage { + var res = try alloc.create(Storage); + res.* = Storage{ + .a = alloc, + .m = std.Thread.Mutex{}, + .s = std.AutoArrayHashMap(u64, bool).init(alloc), + }; + return res; + } + + pub fn add(self: *Storage, val: u64) !bool { + self.m.lock(); + defer self.m.unlock(); + var res = try self.s.getOrPut(val); + res.value_ptr.* = true; + return !res.found_existing; + } + + pub fn snapshot(self: *Storage, alloc: std.mem.Allocator) ![]u64 { + self.m.lock(); + defer self.m.unlock(); + var res = try alloc.alloc(u64, self.s.count()); + var i: usize = 0; + var it = self.s.iterator(); + while (it.next()) |item| { + res[i] = item.key_ptr.*; + i += 1; + } + return res; + } +}; diff --git a/demo/zig/examples/echo.zig b/demo/zig/examples/echo.zig new file mode 100644 index 0000000..9104c3e --- /dev/null +++ b/demo/zig/examples/echo.zig @@ -0,0 +1,17 @@ +// zig build && ~/maelstrom/maelstrom test -w echo --bin ./zig-out/bin/echo --node-count 1 --time-limit 10 --log-stderr + +const m = @import("maelstrom"); +const std = @import("std"); + +pub const log = m.log.f; +pub const log_level = .debug; + +pub fn main() !void { + var runtime = try m.Runtime.init(); + try runtime.handle("echo", echo); + try runtime.run(); +} + +fn echo(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + self.send_back_ok(req); +} diff --git a/demo/zig/examples/lin_kv.zig b/demo/zig/examples/lin_kv.zig new file mode 100644 index 0000000..9ec53d1 --- /dev/null +++ b/demo/zig/examples/lin_kv.zig @@ -0,0 +1,67 @@ +// zig build && ~/maelstrom/maelstrom test -w lin-kv --bin ./zig-out/bin/lin_kv --node-count 4 --concurrency 2n --time-limit 20 --rate 100 --log-stderr + +const m = @import("maelstrom"); +const std = @import("std"); + +pub const log = m.log.f; +pub const log_level = .debug; + +var kv: m.kv.Storage = m.kv.Storage.init_lin_kv(); + +pub fn main() !void { + var runtime = try m.init(); + try runtime.handle("read", read); + try runtime.handle("write", write); + try runtime.handle("cas", cas); + try runtime.run(); +} + +fn read(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Read, self.alloc, req.body) catch return m.Error.MalformedRequest; + const key = std.fmt.allocPrint(self.alloc, "{}", .{in.key}) catch return m.Error.Crash; + const val = try kv.get(self, key, 0); + + self.reply(req, ReadOk{ + .value = val.Integer, + }); +} + +fn write(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Write, self.alloc, req.body) catch return m.Error.MalformedRequest; + const key = std.fmt.allocPrint(self.alloc, "{}", .{in.key}) catch return m.Error.Crash; + + try kv.put(self, key, in.value, 0); + + self.reply_ok(req); +} + +fn cas(self: m.ScopedRuntime, req: *m.Message) m.Error!void { + const in = m.proto.json_map_obj(Cas, self.alloc, req.body) catch return m.Error.MalformedRequest; + const key = std.fmt.allocPrint(self.alloc, "{}", .{in.key}) catch return m.Error.Crash; + const put_var = req.body.raw.Object.get("put"); + const put = if (put_var) |v| v.Bool else false; + + try kv.cas(self, key, in.from, in.to, put, 0); + + self.reply_ok(req); +} + +const Read = struct { + key: u64, +}; + +const ReadOk = struct { + value: i64, +}; + +const Write = struct { + key: u64, + value: i64, +}; + +const Cas = struct { + key: u64, + from: i64, + to: i64, + // optional: put: bool, +}; diff --git a/demo/zig/src/error.zig b/demo/zig/src/error.zig new file mode 100644 index 0000000..d21b887 --- /dev/null +++ b/demo/zig/src/error.zig @@ -0,0 +1,112 @@ +const std = @import("std"); +const proto = @import("protocol.zig"); + +/// [source](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#errors). +pub const HandlerError = error{ + /// Indicates that the requested operation could not be completed within a timeout. + Timeout, + /// Use this error to indicate that a requested operation is not supported by + /// the current implementation. Helpful for stubbing out APIs during development. + NotSupported, + /// Indicates that the operation definitely cannot be performed at this time--perhaps + /// because the server is in a read-only state, has not yet been initialized, + /// believes its peers to be down, and so on. Do not use this error for indeterminate + /// cases, when the operation may actually have taken place. + TemporarilyUnavailable, + /// The client's request did not conform to the server's expectations, + /// and could not possibly have been processed. + MalformedRequest, + /// Indicates that some kind of general, indefinite error occurred. + /// Use this as a catch-all for errors you can't otherwise categorize, + /// or as a starting point for your error handler: it's safe to return + /// internal-error for every problem by default, then add special cases + /// for more specific errors later. + Crash, + /// Indicates that some kind of general, definite error occurred. + /// Use this as a catch-all for errors you can't otherwise categorize, + /// when you specifically know that the requested operation has not taken place. + /// For instance, you might encounter an indefinite failure during + /// the prepare phase of a transaction: since you haven't started the commit process yet, + /// the transaction can't have taken place. It's therefore safe to return + /// a definite abort to the client. + Abort, + /// The client requested an operation on a key which does not exist + /// (assuming the operation should not automatically create missing keys). + KeyDoesNotExist, + /// The client requested the creation of a key which already exists, + /// and the server will not overwrite it. + KeyAlreadyExists, + /// The requested operation expected some conditions to hold, + /// and those conditions were not met. For instance, a compare-and-set operation + /// might assert that the value of a key is currently 5; if the value is 3, + /// the server would return precondition-failed. + PreconditionFailed, + /// The requested transaction has been aborted because of a conflict with + /// another transaction. Servers need not return this error on every conflict: + /// they may choose to retry automatically instead. + TxnConflict, + /// General error for anything you would like to add. + Other, +}; + +pub fn to_code(err: HandlerError) i64 { + return switch (err) { + error.Timeout => 0, + error.NotSupported => 10, + error.TemporarilyUnavailable => 11, + error.MalformedRequest => 12, + error.Crash => 13, + error.Abort => 14, + error.KeyDoesNotExist => 20, + error.KeyAlreadyExists => 21, + error.PreconditionFailed => 22, + error.TxnConflict => 30, + error.Other => 1000, + }; +} + +pub fn to_err(code: i64) HandlerError { + return switch (code) { + 0 => error.Timeout, + 10 => error.NotSupported, + 11 => error.TemporarilyUnavailable, + 12 => error.MalformedRequest, + 13 => error.Crash, + 14 => error.Abort, + 20 => error.KeyDoesNotExist, + 21 => error.KeyAlreadyExists, + 22 => error.PreconditionFailed, + 30 => error.TxnConflict, + else => error.Other, + }; +} + +pub fn to_text(err: HandlerError) []const u8 { + return switch (err) { + error.Timeout => "timeout", + error.NotSupported => "not supported", + error.TemporarilyUnavailable => "temporarily unavailable", + error.MalformedRequest => "malformed request", + error.Crash => "crash", + error.Abort => "abort", + error.KeyDoesNotExist => "key does not exist", + error.KeyAlreadyExists => "key already exists", + error.PreconditionFailed => "precondition failed", + error.TxnConflict => "txn conflict", + error.Other => "user-level error kind", + }; +} + +pub fn to_message(err: HandlerError) proto.ErrorMessageBody { + return proto.ErrorMessageBody{ + .typ = "error", + .code = to_code(err), + .text = to_text(err), + }; +} + +test "error mapping works" { + try std.testing.expect(to_code(HandlerError.NotSupported) == 10); + try std.testing.expect(std.mem.eql(u8, to_text(HandlerError.NotSupported), "not supported")); + try std.testing.expect(to_message(HandlerError.NotSupported).code == 10); +} diff --git a/demo/zig/src/kv.zig b/demo/zig/src/kv.zig new file mode 100644 index 0000000..2c92754 --- /dev/null +++ b/demo/zig/src/kv.zig @@ -0,0 +1,106 @@ +const std = @import("std"); +const m = @import("runtime.zig"); +const errors = @import("error.zig"); +const proto = @import("protocol.zig"); + +const Error = errors.HandlerError; + +pub const LinKind = "lin-kv"; +pub const SeqKind = "seq-kv"; +pub const LWWKind = "lww-kv"; +pub const TSOKind = "lin-tso"; + +pub const Storage = struct { + kind: []const u8, + + pub fn init(kind: []const u8) Storage { + return Storage{ + .kind = kind, + }; + } + + pub fn init_lin_kv() Storage { + return Storage.init(LinKind); + } + + pub fn init_seq_kv() Storage { + return Storage.init(SeqKind); + } + + pub fn init_lww_kv() Storage { + return Storage.init(LWWKind); + } + + pub fn init_tso_kv() Storage { + return Storage.init(TSOKind); + } + + pub fn get(self: *Storage, runtime: m.ScopedRuntime, key: []const u8, wait_ns: u64) Error!std.json.Value { + // FIXME: everything is allocated on top of runtime.alloc, so we just skip the defer rpc.deinit(); + // to assign it to the runtime a' lifetime. + var rpc = runtime.call(self.kind, .{ .key = key, .typ = "read" }); + var resp = if (wait_ns == 0) rpc.wait() else try rpc.timed_wait(wait_ns); + const is_err = std.mem.eql(u8, resp.body.typ, "error"); + if (is_err) { + var err = proto.ErrorMessageBody.init(); + _ = err.from_json(resp.body.raw) catch |err2| { + std.log.err("[{d}] kv storage/{s} get({s}) from_json error: {}", .{ runtime.worker_id, self.kind, key, err2 }); + return Error.Crash; + }; + std.log.debug("[{d}] kv storage/{s} get({s}) error: {d}/{s}", .{ runtime.worker_id, self.kind, key, err.code, err.text }); + return errors.to_err(err.code); + } + + // FIXME: check it is _ok status + // FIXME: const obj = proto.json_map_obj(struct { value: value_type }, runtime.alloc, resp.body.raw) catch |err2| { + // FIXME: std.log.err("[{d}] kv storage/{s} get({s}) json_map_obj error: {} / {}", .{ runtime.worker_id, self.kind, key, err2, resp.body.raw }); + // FIXME: return Error.Crash; + // FIXME: }; + // FIXME: return obj.value; + + // if protocol/parse_into_arena/parser/copy_strigns == false, it will assert sometimes with garbage ;D. + const val = resp.body.raw.Object.get("value"); + if (val == null) { + std.log.err("[{d}] kv storage/{s} get({s}) protocol error: {}", .{ runtime.worker_id, self.kind, key, resp }); + return Error.Crash; + } + + return val.?; + } + + pub fn put(self: *Storage, runtime: m.ScopedRuntime, key: []const u8, value: anytype, wait_ns: u64) Error!void { + // FIXME: everything is allocated on top of runtime.alloc, so we just skip the defer rpc.deinit(); + // to assign it to the runtime a' lifetime. + var rpc = runtime.call(self.kind, .{ .key = key, .value = value, .typ = "write" }); + var resp = if (wait_ns == 0) rpc.wait() else try rpc.timed_wait(wait_ns); + const is_err = std.mem.eql(u8, resp.body.typ, "error"); + if (is_err) { + var err = proto.ErrorMessageBody.init(); + _ = err.from_json(resp.body.raw) catch |err2| { + std.log.err("[{d}] kv storage/{s} put({s}) from_json error: {}", .{ runtime.worker_id, self.kind, key, err2 }); + return Error.Crash; + }; + std.log.debug("[{d}] kv storage/{s} put({s}) error: {d}/{s}", .{ runtime.worker_id, self.kind, key, err.code, err.text }); + return errors.to_err(err.code); + } + // FIXME: check it is _ok status + } + + pub fn cas(self: *Storage, runtime: m.ScopedRuntime, key: []const u8, from: anytype, to: anytype, putIfAbsent: bool, wait_ns: u64) Error!void { + // FIXME: everything is allocated on top of runtime.alloc, so we just skip the defer rpc.deinit(); + // to assign it to the runtime a' lifetime. + var rpc = runtime.call(self.kind, .{ .key = key, .from = from, .to = to, .put = putIfAbsent, .typ = "cas" }); + var resp = if (wait_ns == 0) rpc.wait() else try rpc.timed_wait(wait_ns); + const is_err = std.mem.eql(u8, resp.body.typ, "error"); + if (is_err) { + var err = proto.ErrorMessageBody.init(); + _ = err.from_json(resp.body.raw) catch |err2| { + std.log.err("[{d}] kv storage/{s} put({s}) from_json error: {}", .{ runtime.worker_id, self.kind, key, err2 }); + return Error.Crash; + }; + std.log.debug("[{d}] kv storage/{s} put({s}) error: {d}/{s}", .{ runtime.worker_id, self.kind, key, err.code, err.text }); + return errors.to_err(err.code); + } + // FIXME: check it is _ok status + } +}; diff --git a/demo/zig/src/log.zig b/demo/zig/src/log.zig new file mode 100644 index 0000000..125f113 --- /dev/null +++ b/demo/zig/src/log.zig @@ -0,0 +1,36 @@ +const builtin = @import("builtin"); +const root = @import("root"); +const std = @import("std"); + +// pub const log = maelstrom.log.f; +// pub const log_level : std.log.Level = .debug; + +pub const f = log; + +pub fn log( + comptime message_level: std.log.Level, + comptime scope: @Type(.EnumLiteral), + comptime format: []const u8, + args: anytype, +) void { + if (builtin.os.tag == .freestanding) + @compileError( + \\freestanding targets do not have I/O configured; + \\please provide at least an empty `log` function declaration + ); + + const ts = std.time.nanoTimestamp(); + const ns = @intCast(u64, @mod(ts, 1000000)); + const ms = @intCast(u64, @mod(@divTrunc(ts, 1000000), 1000)); + const ss = @divTrunc(ts, 1000000000); + + const level_txt = " " ++ comptime message_level.asText(); + const prefix2 = " " ++ if (scope == .default) "default" else @tagName(scope); + + const stderr = std.io.getStdErr().writer(); + + std.debug.getStderrMutex().lock(); + defer std.debug.getStderrMutex().unlock(); + + nosuspend stderr.print("[{} {d:0>3}.{d:0>6}" ++ level_txt ++ prefix2 ++ "] " ++ format ++ "\n", .{ ss, ms, ns } ++ args) catch return; +} diff --git a/demo/zig/src/main.zig b/demo/zig/src/main.zig new file mode 100644 index 0000000..9c56247 --- /dev/null +++ b/demo/zig/src/main.zig @@ -0,0 +1,35 @@ +// implementation is based on zig 0.10.1. + +const builtin = @import("builtin"); +const std = @import("std"); + +pub const log = @import("log.zig"); + +const runtime = @import("runtime.zig"); +pub const Runtime = runtime.Runtime; +pub const ScopedRuntime = runtime.ScopedRuntime; +pub const RPCRequest = runtime.Request; + +pub const proto = @import("protocol.zig"); +pub const Message = proto.Message; +pub const MessageBody = proto.MessageBody; + +pub const errors = @import("error.zig"); +pub const Error = errors.HandlerError; + +pub const kv = @import("kv.zig"); + +// we did like to use async io but it does not work at least until 0.12.0. +// see protocol format() issues and mutex+async_print+darwin. +// +// to tell runtime we want async io define the following in root ns: +// pub const io_mode = .evented; // auto deducted, or +// var global_instance_state: std.event.Loop = undefined; +// pub const event_loop: *std.event.Loop = &global_instance_state; +// +// async io also requires enabling stage1 compiler (-fstage1). +const _ = if (std.io.is_async) @panic("io is async, but is broken, thus unsupported at least until 0.12.0. sorry.") else void; + +pub fn init() !*Runtime { + return Runtime.init(); +} diff --git a/demo/zig/src/pool.zig b/demo/zig/src/pool.zig new file mode 100644 index 0000000..a0d207a --- /dev/null +++ b/demo/zig/src/pool.zig @@ -0,0 +1,65 @@ +const std = @import("std"); +const queue = @import("wait_job_queue.zig"); + +// can't store anyframe cause zig does not support async without -fstage1. +pub const Job = struct { + arena: std.heap.ArenaAllocator, + req: []u8, +}; + +pub const Pool = struct { + alloc: std.mem.Allocator, + threads: []std.Thread, + queue: queue.WaitJobQueue(Job), + + pub const JobNode = queue.WaitJobQueue(Job).Node; + + pub fn init(alloc: std.mem.Allocator, size: usize) !Pool { + return Pool{ + .alloc = alloc, + .threads = try alloc.alloc(std.Thread, size), + .queue = queue.WaitJobQueue(Job).init(), + }; + } + + pub fn start(self: *Pool, worker: anytype, args: anytype) !void { + var i: usize = 0; + while (i < self.threads.len) : (i += 1) { + // FIXME: cleanup on errdefer + self.threads[i] = try std.Thread.spawn(.{}, worker, args); + } + } + + // frame allocated on alloc that we will deinit later. + pub fn enqueue(self: *Pool, alloc0: std.mem.Allocator, req: []const u8) !void { + var arena = std.heap.ArenaAllocator.init(alloc0); + var alloc = arena.allocator(); + + var node = try alloc.create(JobNode); + + node.data = Job{ + .arena = arena, + .req = try alloc.dupe(u8, req), + }; + + self.queue.put(node); + } + + pub fn deinit(self: *Pool) void { + // signal shutdown + self.queue.shutdown(); + + // wait threads to exit + for (self.threads) |t| { + t.join(); + } + + // purge queue + while (!self.queue.isEmpty()) { + self.queue.get().?.data.arena.deinit(); + } + + // cleanup + self.alloc.free(self.threads); + } +}; \ No newline at end of file diff --git a/demo/zig/src/protocol.zig b/demo/zig/src/protocol.zig new file mode 100644 index 0000000..2172cf7 --- /dev/null +++ b/demo/zig/src/protocol.zig @@ -0,0 +1,441 @@ +const std = @import("std"); +const root = @import("root"); + +pub const Message = struct { + src: []const u8, + dest: []const u8, + + body: MessageBody, + + pub usingnamespace MessageMethods(@This()); +}; + +pub const MessageBody = struct { + typ: []const u8, + msg_id: u64, + in_reply_to: u64, + + raw: std.json.Value, + + pub usingnamespace FormatAsJson(@This()); + pub usingnamespace MessageBodyMethods(@This()); +}; + +pub const InitMessageBody = struct { + node_id: []const u8, + node_ids: [][]const u8, + + pub usingnamespace FormatAsJson(@This()); +}; + +pub const ErrorMessageBody = struct { + // .typ fields are handled by to_json_value. + typ: []const u8, + code: i64, + text: []const u8, + + pub usingnamespace FormatAsJson(@This()); + pub usingnamespace ErrorMessageMethods(@This()); +}; + +// buf must be allocated on arena. we would not clean or copy it. +pub fn parse_message(alloc: std.mem.Allocator, buf: []u8) !*Message { + return Message.parse_into_arena(alloc, buf); +} + +fn MessageMethods(comptime Self: type) type { + return struct { + // buf must be allocated on arena. we would not clean or copy it. + pub fn parse_into_arena(alloc: std.mem.Allocator, buf: []u8) !*Message { + // FIXME: with copy_strings == false, some rpc resp reads fail after + // being passed over the threads boundary. wut???. + // + // runtime.rpc: + // + // if (proto.parse_message(item.arena.allocator(), node.data.req)) |resp| + // item.set_completed(resp); + // + // test: zig build && ~/maelstrom/maelstrom test -w lin-kv --bin ./zig-out/bin/lin_kv --node-count 4 --concurrency 2n --time-limit 20 --rate 100 --log-stderr + var parser = std.json.Parser.init(alloc, true); + defer parser.deinit(); + + var tree = try parser.parse(buf); + // we hope for arena allocator instead of defer tree.deinit(); + + var m = try alloc.create(Message); + m.* = Message.init(); + + return try m.from_json(tree.root); + } + + pub fn from_json(self: *Self, src0: ?std.json.Value) !*Message { + if (src0 == null) return self; + const src = src0.?; + + self.src = try_json_string(src.Object.get("src")); + self.dest = try_json_string(src.Object.get("dest")); + _ = try self.body.from_json(src.Object.get("body")); + + return self; + } + + pub fn init() Message { + return Message{ + .src = "", + .dest = "", + .body = MessageBody.init(), + }; + } + + pub fn format(value: Self, comptime fmt: []const u8, options: std.fmt.FormatOptions, writer: anytype) !void { + try writer.writeAll("{"); + + var i: u32 = 0; + if (value.src.len > 0) { + try writer.writeAll(" \"src\": \""); + try writer.writeAll(value.src); + try writer.writeAll("\""); + i += 1; + } + + if (value.dest.len > 0) { + if (i > 0) try writer.writeAll(","); + try writer.writeAll(" \"dest\": \""); + try writer.writeAll(value.dest); + try writer.writeAll("\""); + } + + if (i > 0) try writer.writeAll(","); + try writer.writeAll(" \"body\": "); + try value.body.format(fmt, options, writer); + + try writer.writeAll(" }"); + } + + pub fn to_json_value(self: Self, alloc: std.mem.Allocator) !std.json.Value { + var v = std.json.Value{ .Object = std.json.ObjectMap.init(alloc) }; + try v.Object.put("src", std.json.Value{ .String = self.src }); + try v.Object.put("dest", std.json.Value{ .String = self.dest }); + try v.Object.put("body", try self.body.to_json_value(alloc)); + return v; + } + }; +} + +fn MessageBodyMethods(comptime Self: type) type { + return struct { + pub fn init() MessageBody { + return MessageBody{ + .typ = "", + .msg_id = 0, + .in_reply_to = 0, + .raw = .Null, + }; + } + + pub fn from_json(self: *Self, src0: ?std.json.Value) !*MessageBody { + if (src0 == null) return self; + const src = src0.?; + + self.typ = try_json_string(src.Object.get("type")); + self.msg_id = try_json_u64(src.Object.get("msg_id")); + self.in_reply_to = try_json_u64(src.Object.get("in_reply_to")); + self.raw = src; + + _ = self.raw.Object.swapRemove("type"); + _ = self.raw.Object.swapRemove("msg_id"); + _ = self.raw.Object.swapRemove("in_reply_to"); + + return self; + } + + pub fn to_json_value(self: Self, alloc: std.mem.Allocator) !std.json.Value { + var v = std.json.Value{ .Object = std.json.ObjectMap.init(alloc) }; + + if (self.typ.len > 0) try v.Object.put("type", std.json.Value{ .String = self.typ }); + if (self.msg_id > 0) try v.Object.put("msg_id", std.json.Value{ .Integer = @intCast(i64, self.msg_id) }); + if (self.in_reply_to > 0) try v.Object.put("in_reply_to", std.json.Value{ .Integer = @intCast(i64, self.in_reply_to) }); + + try merge_json(&v, self.raw); + + return v; + } + }; +} + +fn ErrorMessageMethods(comptime Self: type) type { + return struct { + pub fn init() ErrorMessageBody { + return ErrorMessageBody{ + .typ = "error", + .code = 0, + .text = "", + }; + } + + pub fn from_json(self: *Self, src0: ?std.json.Value) !*ErrorMessageBody { + if (src0 == null) return self; + const src = src0.?; + + self.typ = try_json_string(src.Object.get("type")); + self.code = try_json_i64(src.Object.get("code")); + self.text = try_json_string(src.Object.get("text")); + + return self; + } + }; +} + +fn FormatAsJson(comptime Self: type) type { + return struct { + pub fn format(value: Self, comptime fmt: []const u8, options: std.fmt.FormatOptions, writer: anytype) !void { + _ = fmt; + _ = options; + // FIXME: this does not work in async IO until this fixed: https://github.com/ziglang/zig/issues/4060. + try nosuspend std.json.stringify(value, .{}, writer); + } + }; +} + +fn try_json_string(val: ?std.json.Value) []const u8 { + if (val == null) return ""; + switch (val.?) { + .String => |s| return s, + else => return "", + } +} + +fn try_json_u64(val: ?std.json.Value) u64 { + if (val == null) return 0; + switch (val.?) { + .Integer => |s| return @intCast(u64, s), + else => return 0, + } +} + +fn try_json_i64(val: ?std.json.Value) i64 { + if (val == null) return 0; + switch (val.?) { + .Integer => |s| return @intCast(i64, s), + else => return 0, + } +} + +const MergeJsonError = error{InvalidType}; + +/// merges src object into dst object. +pub fn merge_json(dst: *std.json.Value, src: std.json.Value) !void { + switch (dst.*) { + .Object => {}, + else => { + return MergeJsonError.InvalidType; + }, + } + + switch (src) { + .Object => |inner| { + var it = inner.iterator(); + while (it.next()) |entry| { + try dst.Object.put(entry.key_ptr.*, entry.value_ptr.*); + } + }, + else => {}, + } +} + +/// all allocated memory belongs to the caller. +/// FIXME: double serialization is not efficient, but we want to be simple right now. +/// until the std lib api will be able to handle it. +/// support override with to_json_value(Allocator). +pub fn to_json_value(alloc: std.mem.Allocator, value: anytype) !std.json.Value { + const T = @TypeOf(value); + const args_type_info = @typeInfo(T); + + if (args_type_info == .Pointer) { + return to_json_value(alloc, value.*); + } + + if (T == std.json.Value) { + return value; + } + + if (args_type_info != .Struct) { + @compileError("expected tuple or struct argument, found " ++ @typeName(T)); + } + + if (comptime std.meta.trait.hasFn("to_json_value")(T)) { + return try value.to_json_value(alloc); + } + + const str = try std.json.stringifyAlloc(alloc, value, .{}); + + var parser = std.json.Parser.init(alloc, false); + defer parser.deinit(); + + var obj = try parser.parse(str); + + // FIXME: I am sorry for this hack. But std.json.Parser creates temporary arena allocator + // that becomes dead after it is coming out of scope of parser.Parse() scope. So any other + // calls to the objects that require allocation in tree.alocc are causing segfaults. + // Also, thanks std.json.Parser.transition is private and we can't workaround that. + // + // Without that we are having: + // + // /usr/lib/zig/std/heap/arena_allocator.zig:67:42: 0x2a0c5f in allocImpl (echo) + // const cur_buf = cur_node.data[@sizeOf(BufNode)..]; + // ^ + // /usr/lib/zig/std/mem/Allocator.zig:154:34: 0x291014 in allocAdvancedWithRetAddr__anon_8869 (echo) + // return self.vtable.alloc(self.ptr, len, ptr_align, len_align, ret_addr); + // + // At the moment, this is very dirty hack, but works while alloc is an whole scope + // arena. + // + // This is fixed in main, see + // https://github.com/ziglang/zig/commit/b42caff2a20eb34073f6a766f55d27288028165a. + // + // But we want to support at least 0.10.0. + var t = obj.root; + t.Object.allocator = alloc; + + // Now we must transform all .typ fields into .type fields, as Zig don't have tags. + try rename_typ_field(&t); + + return t; +} + +/// renames "typ" fields into "type" fields. +/// FIXME: does storage modification invalidates running iterator? probably yes. +fn rename_typ_field(o: *std.json.Value) !void { + switch (o.*) { + .Object => |inner| { + var it = inner.iterator(); + while (it.next()) |entry| { + if (std.mem.eql(u8, entry.key_ptr.*, "typ")) { + try o.Object.put("type", entry.value_ptr.*); + _ = o.Object.swapRemove("typ"); + continue; + } + + try rename_typ_field(entry.value_ptr); + } + }, + else => {}, + } +} + +/// merges a set of objects into a json Value. +/// all allocated memory belongs to the caller. +/// +/// merge_to_json(arena, .{body, .{ .code = 10, .text = "not supported" }}); +pub fn merge_to_json(alloc: std.mem.Allocator, args: anytype) !std.json.Value { + const ArgsType = @TypeOf(args); + const args_type_info = @typeInfo(ArgsType); + + if (ArgsType == std.json.Value) { + return args; + } + + if (args_type_info != .Struct) { + @compileError("expected tuple or struct argument, found " ++ @typeName(ArgsType)); + } + + if (!args_type_info.Struct.is_tuple) { + return to_json_value(alloc, args); + } + + const fields_info = args_type_info.Struct.fields; + const len = comptime fields_info.len; + + if (len == 0) { + return std.json.Value{ .Object = std.json.ObjectMap.init(alloc) }; + } + + comptime var i: usize = 0; + var obj = try to_json_value(alloc, @field(args, fields_info[i].name)); + + i += 1; + inline while (i < len) { + var src = try to_json_value(alloc, @field(args, fields_info[i].name)); + try merge_json(&obj, src); + i += 1; + } + + return obj; +} + +/// maps object src json representation into the target Object. +/// FIXME: non efficient, but works. +pub fn json_map_obj(comptime T: type, alloc: std.mem.Allocator, src: anytype) !T { + const srcObj = try to_json_value(alloc, src); + const srcStr = try std.json.stringifyAlloc(alloc, srcObj, .{}); + var stream = std.json.TokenStream.init(srcStr); + return try std.json.parse(T, &stream, .{ + .allocator = alloc, + .ignore_unknown_fields = true, + }); +} + +test "to_json(.{})" { + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); + defer arena.deinit(); + + { + var obj = try to_json_value(arena.allocator(), .{}); + switch (obj) { + .Object => {}, + else => try std.testing.expect(false), + } + + var str = try std.json.stringifyAlloc(arena.allocator(), obj, .{}); + try std.testing.expectEqualSlices(u8, "{}", str); + } + + { + const obj = std.json.Value{ + .Object = std.json.ObjectMap.init(arena.allocator()), + }; + var res = try to_json_value(arena.allocator(), obj); + try std.testing.expectEqual(obj, res); + } +} + +test "merge_to_json works" { + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); + defer arena.deinit(); + + var alloc = arena.allocator(); + + var body = MessageBody.init(); + + { + var dst = try to_json_value(alloc, &body); + const str = try std.json.stringifyAlloc(alloc, dst, .{}); + try std.testing.expectEqualStrings("{}", str); + } + + { + // TODO: merge_json must ignore merging Nulls + // var dst = try to_json_value(alloc, body); + // const obj = try merge_json(&dst, std.json.Value{ .Null = void{} }); + // const str = try std.json.stringifyAlloc(alloc, obj, .{}); + // try std.testing.expectEqualStrings("{}", str); + } + + body.msg_id = 1; + body.in_reply_to = 2; + body.typ = "init"; + body.raw = std.json.Value{ .Object = std.json.ObjectMap.init(alloc) }; + try body.raw.Object.put("raw_key", std.json.Value{ .Integer = 1 }); + + { + const obj = try to_json_value(alloc, body); + const str = try std.json.stringifyAlloc(alloc, obj, .{}); + try std.testing.expectEqualStrings("{\"type\":\"init\",\"msg_id\":1,\"in_reply_to\":2,\"raw_key\":1}", str); + } + + { + const obj = try merge_to_json(alloc, .{ body, .{ .msg_id = 2, .text = "bbb" } }); + const str = try std.json.stringifyAlloc(alloc, obj, .{}); + try std.testing.expectEqualStrings("{\"type\":\"init\",\"msg_id\":2,\"in_reply_to\":2,\"raw_key\":1,\"text\":\"bbb\"}", str); + } +} diff --git a/demo/zig/src/rpc.zig b/demo/zig/src/rpc.zig new file mode 100644 index 0000000..d7ff9d6 --- /dev/null +++ b/demo/zig/src/rpc.zig @@ -0,0 +1,232 @@ +const std = @import("std"); +const proto = @import("protocol.zig"); + +// can't store anyframe cause zig does not support async without -fstage1. +pub const Request = struct { + arena: std.heap.ArenaAllocator, + req: proto.Message, + msg_id: u64, + // if Request is async, the Runtime shall execute a message handler, + // to process the request. Otherwise, it expects the waiters to be notified. + is_async: bool, + + notify: std.Thread.Condition, + m: std.Thread.Mutex, + completed: bool, + // usage count for counting users before deallocating arena. + // is needed because it is used by both caller and requests handler (main loop). + rc: usize, + // must be allocated on local arena, and available by completion. + resp: *proto.Message, + + pub fn init(arena: std.heap.ArenaAllocator, msg_id: u64) Request { + return Request{ + .arena = arena, + .req = proto.Message.init(), + .msg_id = msg_id, + .is_async = true, + .notify = std.Thread.Condition{}, + .m = std.Thread.Mutex{}, + .completed = false, + .rc = 0, + .resp = undefined, + }; + } + + pub fn deinit(self: *Request) void { + self.m.lock(); + if (self.rc == std.math.maxInt(usize)) { + std.debug.panic("rpc() msg deinit() called over unitializaed arena", .{}); + } + if (self.rc > 0) { + self.rc -= 1; + } + var rel = false; + var arena = self.arena; + if (self.rc == 0) { + rel = true; + self.rc = std.math.maxInt(usize); + } + self.m.unlock(); + + if (rel) { + arena.deinit(); + // Request memory is released at this point. + } + } + + pub fn add_ref(self: *Request) void { + self.m.lock(); + defer self.m.unlock(); + self.rc += 1; + } + + pub fn is_completed(self: *Request) bool { + self.m.lock(); + defer self.m.unlock(); + return self.completed; + } + + pub fn get_result(self: *Request) *proto.Message { + self.m.lock(); + defer self.m.unlock(); + return self.resp; + } + + /// resp must be allocated on top of local (Request) arena. + pub fn set_completed(self: *Request, resp: *proto.Message) void { + self.m.lock(); + defer self.m.unlock(); + self.completed = true; + self.resp = resp; + self.notify.broadcast(); + } + + pub fn wait(self: *Request) *proto.Message { + self.m.lock(); + defer self.m.unlock(); + while (!self.completed) { + self.notify.wait(&self.m); + } + return self.resp; + } + + pub fn timed_wait(self: *Request, timeout_ns: u64) error{Timeout}!*proto.Message { + self.m.lock(); + defer self.m.unlock(); + while (!self.completed) { + try self.notify.timedWait(&self.m, timeout_ns); + } + return self.resp; + } +}; + +pub const Runtime = struct { + alloc: std.mem.Allocator, + msg_id: std.atomic.Atomic(u64), + m: std.Thread.Mutex, + reqs: std.AutoHashMap(u64, *Request), + + pub fn init(alloc: std.mem.Allocator) !Runtime { + return Runtime{ + .alloc = alloc, + .reqs = std.AutoHashMap(u64, *Request).init(alloc), + .m = std.Thread.Mutex{}, + .msg_id = std.atomic.Atomic(u64).init(1), + }; + } + + pub fn deinit(self: *Runtime) void { + self.m.lock(); + defer self.m.unlock(); + if (self.reqs.count() > 0) { + std.log.warn("{d} rpc requests left in the queue", .{self.reqs.count()}); + } + self.reqs.deinit(); + } + + pub fn next_msg_id(self: *Runtime) u64 { + return self.msg_id.fetchAdd(1, std.atomic.Ordering.AcqRel); + } + + pub fn new_req(self: *Runtime, is_async: bool) !*Request { + var arena = std.heap.ArenaAllocator.init(self.alloc); + var req = try arena.allocator().create(Request); + req.* = Request.init(arena, self.next_msg_id()); + req.is_async = is_async; + return req; + } + + pub fn add(self: *Runtime, req: *Request) !bool { + req.add_ref(); + errdefer req.deinit(); + + self.m.lock(); + + if (self.reqs.contains(req.msg_id)) { + self.m.unlock(); + req.deinit(); + return false; + } + + defer self.m.unlock(); + try self.reqs.put(req.msg_id, req); + + return true; + } + + pub fn remove(self: *Runtime, req_id: u64) void { + self.m.lock(); + defer self.m.unlock(); + + _ = self.reqs.remove(req_id); + } + + /// when you get a request, you are responsible for cleaning up the arena. + pub fn poll_request(self: *Runtime, req_id: u64) ?*Request { + self.m.lock(); + defer self.m.unlock(); + + if (self.reqs.get(req_id)) |req| { + _ = self.reqs.remove(req_id); + return req; + } + + return null; + } +}; + +test "simple queue" { + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); + defer arena.deinit(); + + var alloc = arena.allocator(); + + var runtime = try Runtime.init(alloc); + defer runtime.deinit(); + + var req = try runtime.new_req(false); + try std.testing.expectEqual(true, try runtime.add(req)); + + var resp = try req.arena.allocator().create(proto.Message); + req.set_completed(resp); + + var resp2 = req.wait(); + try std.testing.expectEqual(resp, resp2); + + runtime.remove(req.msg_id); +} + +test "no double free" { + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); + defer arena.deinit(); + + var msg = Request.init(arena, 1); + msg.deinit(); + // TODO: how to expect panic from msg.deinit(); +} + +test "sync is double ref" { + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); + defer arena.deinit(); + + var alloc = arena.allocator(); + + var runtime = try Runtime.init(alloc); + defer runtime.deinit(); + + var req = try runtime.new_req(false); + try std.testing.expectEqual(true, try runtime.add(req)); + defer req.deinit(); + + req.add_ref(); // return sync call to caller + req.deinit(); // caller leaved + + var resp = try req.arena.allocator().create(proto.Message); + req.set_completed(resp); + + var resp2 = req.wait(); + try std.testing.expectEqual(resp, resp2); + + runtime.remove(req.msg_id); +} diff --git a/demo/zig/src/runtime.zig b/demo/zig/src/runtime.zig new file mode 100644 index 0000000..f0ac770 --- /dev/null +++ b/demo/zig/src/runtime.zig @@ -0,0 +1,543 @@ +const builtin = @import("builtin"); +const errors = @import("error.zig"); +const pool = @import("pool.zig"); +const proto = @import("protocol.zig"); +const root = @import("root"); +const rpcpkg = @import("rpc.zig"); +const std = @import("std"); + +const Message = proto.Message; +const ErrorMessageBody = proto.ErrorMessageBody; +const HandlerError = errors.HandlerError; +const EmptyStringArray = [0][]const u8{}; + +pub const thread_safe: bool = !builtin.single_threaded; +pub const MutexType: type = @TypeOf(if (thread_safe) std.Thread.Mutex{} else DummyMutex{}); + +// FIXME: what we can do about those? - move to the heap with BufReader. +pub const read_buf_size = if (@hasDecl(root, "read_buf_size")) root.read_buf_size else 4096; + +pub const Handler = fn (ScopedRuntime, *Message) HandlerError!void; +pub const HandlerPtr = *const Handler; +pub const HandlerMap = std.StringHashMap(HandlerPtr); + +pub const RPCRequest = rpcpkg.Request; +const RPCRuntime = rpcpkg.Runtime; + +pub const Runtime = struct { + gpa: ?std.heap.GeneralPurposeAllocator(.{}), + // thread-safe by itself + alloc: std.mem.Allocator, + + out: std.fs.File, + + pool: pool.Pool, + // log: TODO: @TypeOf(Scoped) + + handlers: HandlerMap, + rpc_runtime: RPCRuntime, + + // init state + m: MutexType, + node_id: []const u8, + nodes: [][]const u8, + + pub fn init() !*Runtime { + return initWithAllocator(null); + } + + // alloc is expected to be thread-safe by itself. + pub fn initWithAllocator(alloc: ?std.mem.Allocator) !*Runtime { + var runtime: *Runtime = undefined; + + if (alloc) |a| { + runtime = try a.create(Runtime); + runtime.gpa = null; + runtime.alloc = a; + } else { + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + + runtime = try gpa.allocator().create(Runtime); + runtime.gpa = gpa; + runtime.alloc = runtime.gpa.?.allocator(); + } + + runtime.out = std.io.getStdOut(); + runtime.pool = try pool.Pool.init(runtime.alloc, @max(2, @min(4, try std.Thread.getCpuCount()))); + runtime.handlers = HandlerMap.init(runtime.alloc); + runtime.rpc_runtime = try RPCRuntime.init(runtime.alloc); + runtime.m = MutexType{}; + runtime.node_id = ""; + runtime.nodes = &EmptyStringArray; + + return runtime; + } + + pub fn deinit(self: *Runtime) void { + self.pool.deinit(); + self.handlers.deinit(); + self.rpc_runtime.deinit(); + } + + /// handle(type, f) registers a handler for specific message type. + pub fn handle(self: *Runtime, msg_type: []const u8, f: HandlerPtr) !void { + if (self.handlers.contains(msg_type)) { + std.debug.panic("this message type is already registered: {s}", .{msg_type}); + } + + try self.handlers.put(msg_type, f); + } + + pub fn send_raw_f(self: *Runtime, comptime fmt: []const u8, args: anytype) void { + if (comptime std.io.is_async) { + @panic("async IO in unsupported at least until 0.12.0. we need sync stdout. see the comment below."); + } + + const out = self.out.writer(); + + defer std.log.debug("Sent " ++ fmt, args); + + const m = std.debug.getStderrMutex(); + m.lock(); + defer m.unlock(); + // stdout.writer().print suspends in async io mode. + // on Darwin a suspend point in the middle of mutex causes for 0.10.1: + // Illegal instruction at address 0x7ff80f6c1efc + // ???:?:?: 0x7ff80f6c1efc in ??? (???) + // zig/0.10.1/lib/zig/std/Thread/Mutex.zig:115:40: 0x10f60dd84 in std.Thread.Mutex.DarwinImpl.unlock (echo) + // os.darwin.os_unfair_lock_unlock(&self.oul); + // + // FIXME: check if it works with 0.12.0 + darwin when its ready. + nosuspend out.print(fmt ++ "\n", args) catch return; + } + + pub fn send_raw(self: *Runtime, msg: []const u8) void { + self.send_raw_f("{s}", .{msg}); + } + + // msg must support special treatment for arrays and messagebody flattening. + // does not support non-struct and non array kinds. + // + // runtime.send("n1", msg); + // runtime.send("n1", .{req.body, msg}) - merges objects. + pub fn send(self: *Runtime, alloc: std.mem.Allocator, to: []const u8, msg: anytype) !void { + const body = try proto.merge_to_json(alloc, msg); + + var packet = proto.Message{ + .src = self.node_id, + .dest = to, + .body = proto.MessageBody.init(), + }; + + packet.body.raw = body; + + var obj = try proto.to_json_value(alloc, packet); + const str = try std.json.stringifyAlloc(alloc, obj, .{}); + + self.send_raw(str); + + if (self.node_id.len == 0) { + std.log.warn("Responding to {s} with {s} without having src address. Missed message?", .{ to, str }); + } + } + + pub fn send_back(self: *Runtime, alloc0: std.mem.Allocator, req: *Message, msg: anytype) !void { + try self.send(alloc0, req.src, msg); + } + + pub fn reply(self: *Runtime, alloc: std.mem.Allocator, req: *Message, msg: anytype) !void { + var obj = try proto.merge_to_json(alloc, msg); + + try obj.Object.put("in_reply_to", std.json.Value{ + .Integer = @intCast(i64, req.body.msg_id), + }); + + if (!obj.Object.contains("type")) { + try obj.Object.put("type", std.json.Value{ + .String = try std.fmt.allocPrint(alloc, "{s}_ok", .{req.body.typ}), + }); + } + + try self.send(alloc, req.src, obj); + } + + pub fn reply_err(self: *Runtime, alloc: std.mem.Allocator, req: *Message, resp: HandlerError) !void { + var obj = errors.to_message(resp); + if (resp == HandlerError.NotSupported) { + obj.text = try std.fmt.allocPrint(alloc, "not supported: {s}", .{req.body.typ}); + } + try self.reply(alloc, req, obj); + } + + pub fn reply_custom_err(self: *Runtime, alloc: std.mem.Allocator, req: *Message, code: i64, text: []const u8) !void { + var obj = errors.to_message(HandlerError.Other); + obj.code = code; + obj.text = text; + try self.reply(alloc, req, obj); + } + + pub fn reply_ok(self: *Runtime, alloc: std.mem.Allocator, req: *Message) !void { + var resp = std.json.Value{ + .Object = std.json.ObjectMap.init(alloc), + }; + var typ = req.body.typ; + + if (!std.mem.endsWith(u8, typ, "_ok")) { + typ = try std.fmt.allocPrint(alloc, "{s}_ok", .{typ}); + } + + try self.reply(alloc, req, resp); + } + + pub fn send_back_ok(self: *Runtime, alloc: std.mem.Allocator, req: *Message) !void { + var resp = std.json.Value{ + .Object = std.json.ObjectMap.init(alloc), + }; + var typ = req.body.typ; + + if (!std.mem.endsWith(u8, typ, "_ok")) { + typ = try std.fmt.allocPrint(alloc, "{s}_ok", .{typ}); + } + + try resp.Object.put("type", std.json.Value{ + .String = typ, + }); + + try self.reply(alloc, req, .{ req.body, resp }); + } + + /// rpc() makes an RPC call to another node. + /// if Request is async, the Runtime shall execute a message handler, + /// to process the request. Otherwise, it expects the waiters to be notified. + pub fn rpc(self: *Runtime, is_async: bool, to: []const u8, msg: anytype) !*RPCRequest { + var req = try self.rpc_runtime.new_req(is_async); + var alloc = req.arena.allocator(); + var obj = try proto.merge_to_json(alloc, msg); + + if (!is_async) { + req.add_ref(); + } + errdefer { + if (!is_async) { + req.deinit(); + } + } + + if (!try self.rpc_runtime.add(req)) { + // should not ever happen + return error.TryAgain; + } + + errdefer { + self.rpc_runtime.remove(req.msg_id); + req.deinit(); + } + + try self.send(alloc, to, .{ + proto.MessageBody{ .typ = "", .msg_id = req.msg_id, .in_reply_to = 0, .raw = obj }, + }); + + return req; + } + + /// call() makes a sync RPC call to another node. use .wait() or .timed_wait(). + pub fn call(self: *Runtime, to: []const u8, msg: anytype) !*RPCRequest { + return try self.rpc(false, to, msg); + } + + /// call_async() makes an async RPC call to another node. + pub fn call_async(self: *Runtime, to: []const u8, msg: anytype) !u64 { + return try self.rpc(true, to, msg).msg_id; + } + + // in: std.io.Reader.{} + // read buffer is 4kB. + pub fn listen(self: *Runtime, in: anytype) !void { + var buffer: [read_buf_size]u8 = undefined; + + while (nextLine(in, &buffer)) |try_line| { + if (try_line == null) return; + const line = try_line.?; + + if (line.len == 0) continue; + std.log.debug("Received {s}", .{line}); + + try self.pool.enqueue(self.alloc, line); + } else |err| { + return err; + } + } + + pub fn run(self: *Runtime) !void { + try self.pool.start(worker, .{self}); + + std.log.info("node started.", .{}); + + self.listen(std.io.getStdIn().reader()) catch |e| { + std.log.err("listen loop error: {}", .{e}); + }; + + // finish workers before printing finish. + self.deinit(); + + std.log.info("node finished.", .{}); + } + + pub fn worker(self: *Runtime) void { + const id = std.Thread.getCurrentId(); + + std.log.debug("[{d}] worker started.", .{id}); + + while (self.pool.queue.get()) |node| { + self.process_request_node(node); + } + + std.log.debug("[{d}] worker finished.", .{id}); + } + + fn process_request_node(self: *Runtime, node: *pool.Pool.JobNode) void { + defer node.data.arena.deinit(); + + const id = std.Thread.getCurrentId(); + + // std.log.debug("[{d}] worker: got an item: {s}", .{ id, node.data.req }); + + if (proto.parse_message(node.data.arena.allocator(), node.data.req)) |req| { + var scoped = ScopedRuntime.init(self, node, id); + + const rpc_request = self.rpc_runtime.poll_request(req.body.in_reply_to); + // defer is scoped, so deinit defer here. + defer if (rpc_request) |item| { + item.deinit(); + }; + if (rpc_request) |item| { + // FIXME: any ideas how to move the whole tree to another arena? + // we can't just pass the req with local lifetime (node.data.arena a') to the external lifetime b'. + if (proto.parse_message(item.arena.allocator(), node.data.req)) |resp| { + item.set_completed(resp); + } else |err| { + // we can't continue from here safely. probably we could, but this is experimental project. + std.debug.panic("[{d}] rpc response parsing error {s}: {}", .{ id, node.data.req, err }); + } + + if (!item.is_async) { + return; + } + } + + const is_err = std.mem.eql(u8, req.body.typ, "error"); + if (is_err) { + // if it is an error response to rpc, we can't do anything about it. + std.log.err("[{d}] got error response {s}", .{ id, node.data.req }); + return; + } + + const is_init = std.mem.eql(u8, req.body.typ, "init"); + if (is_init) { + process_init_message(&scoped, req) catch |err| { + std.log.err("[{d}] processing init message error {s}: {}", .{ id, node.data.req, err }); + scoped.reply_err(req, HandlerError.MalformedRequest); + return; + }; + } + + if (self.handlers.get(req.body.typ)) |f| { + f(scoped, req) catch |err| scoped.reply_err(req, err); + } else if (!is_init) { + const is_ok = std.mem.endsWith(u8, req.body.typ, "_ok"); + // if the msg is not an init and is not an ok response from rpc than yeah. + if (!is_ok) { + scoped.reply_err(req, HandlerError.NotSupported); + } + return; + } + + if (is_init) { + scoped.reply_ok(req); + } + } else |err| { + std.log.err("[{d}] incoming message parsing error {s}: {}", .{ id, node.data.req, err }); + } + } + + fn process_init_message(self: *ScopedRuntime, req: *Message) !void { + const in = try proto.json_map_obj(proto.InitMessageBody, self.alloc, req.body); + + const node_id = try self.runtime.alloc.dupe(u8, in.node_id); + var node_ids = try self.runtime.alloc.alloc([]const u8, in.node_ids.len); + var i: usize = 0; + while (i < node_ids.len) { + node_ids[i] = try self.runtime.alloc.dupe(u8, in.node_ids[i]); + i += 1; + } + + self.runtime.m.lock(); + defer self.runtime.m.unlock(); + + self.runtime.node_id = node_id; + self.runtime.nodes = node_ids; + + self.node_id = node_id; + self.nodes = node_ids; + + std.log.info("new cluster state: node_id = {s}, nodes = {s}", .{ node_id, node_ids }); + } + + pub fn neighbours(self: *Runtime) NeighbourIterator { + return NeighbourIterator{ + .node_id = self.node_id, + .nodes = self.nodes, + .len = self.nodes.len, + }; + } +}; + +/// Proxy class for the context of (runtime, allocator scoped to the current request). +pub const ScopedRuntime = struct { + runtime: *Runtime, + alloc: std.mem.Allocator, + worker_id: usize, + + node_id: []const u8, + nodes: [][]const u8, + + pub fn init(runtime: *Runtime, node: *pool.Pool.JobNode, worker_id: usize) ScopedRuntime { + return ScopedRuntime{ + .runtime = runtime, + .worker_id = worker_id, + .alloc = node.data.arena.allocator(), + .node_id = runtime.node_id, + .nodes = runtime.nodes, + }; + } + + pub inline fn send_raw_f(self: ScopedRuntime, comptime fmt: []const u8, args: anytype) void { + self.runtime.send_raw_f(fmt, args); + } + + pub inline fn send_raw(self: ScopedRuntime, msg: []const u8) void { + self.runtime.send_raw(msg); + } + + // msg must support special treatment for arrays and messagebody flattening. + // does not support non-struct and non array kinds. + // + // runtime.send("n1", msg); + // runtime.send("n1", .{req.body, msg}) - merges objects. + pub inline fn send(self: ScopedRuntime, to: []const u8, msg: anytype) void { + self.runtime.send(self.alloc, to, msg) catch |err| { + std.log.err("[{d}] sending {} to {s} error: {}", .{ self.worker_id, msg, to, err }); + self.runtime.send(self.alloc, to, errors.to_message(HandlerError.Crash)) catch |err2| { + std.debug.panic("[{d}] sending error Crash error: {}", .{ self.worker_id, err2 }); + }; + }; + } + + pub inline fn send_back(self: ScopedRuntime, req: *Message, msg: anytype) void { + self.runtime.send_back(self.alloc, req, msg) catch |err| { + std.log.err("[{d}] sending back {} on {s} error: {}", .{ self.worker_id, msg, req, err }); + self.runtime.reply_err(self.alloc, req, HandlerError.Crash); + }; + } + + pub inline fn reply(self: ScopedRuntime, req: *Message, msg: anytype) void { + self.runtime.reply(self.alloc, req, msg) catch |err| { + std.log.err("[{d}] responding with {} to {s} error: {}", .{ self.worker_id, msg, req, err }); + self.reply_err(req, HandlerError.Crash); + }; + } + + pub inline fn reply_err(self: ScopedRuntime, req: *Message, resp: HandlerError) void { + self.runtime.reply_err(self.alloc, req, resp) catch |err| { + std.debug.panic("[{d}] responding with error {} error {s}: {}", .{ self.worker_id, resp, req, err }); + }; + } + + pub fn reply_custom_err(self: *Runtime, req: *Message, code: i64, text: []const u8) void { + self.runtime.reply_custom_err(req, code, text) catch |err| { + std.debug.panic("[{d}] responding with custom error {d}:{s} error {s}: {}", .{ self.worker_id, code, text, req, err }); + }; + } + + pub inline fn reply_ok(self: ScopedRuntime, req: *Message) void { + self.runtime.reply_ok(self.alloc, req) catch |err| { + std.debug.panic("[{d}] responding with ok error {s}: {}", .{ self.worker_id, req, err }); + }; + } + + pub inline fn send_back_ok(self: ScopedRuntime, req: *Message) void { + self.runtime.send_back_ok(self.alloc, req) catch |err| { + std.debug.panic("[{d}] responding with ok error {s}: {}", .{ self.worker_id, req, err }); + }; + } + + /// rpc() makes an RPC call to another node. + /// if Request is async, the Runtime shall execute a message handler, + /// to process the request. Otherwise, it expects the waiters to be notified. + pub fn rpc(self: ScopedRuntime, is_async: bool, to: []const u8, msg: anytype) *RPCRequest { + var r = self.runtime.rpc(is_async, to, msg) catch |err| { + std.debug.panic("[{d}] emitting an rpc call error {}: {}", .{ self.worker_id, msg, err }); + }; + return r; + } + + /// call() makes a sync RPC call to another node. use .wait() or .timed_wait(). + pub fn call(self: ScopedRuntime, to: []const u8, msg: anytype) *RPCRequest { + return self.rpc(false, to, msg); + } + + /// call_async() makes an async RPC call to another node. + pub fn call_async(self: ScopedRuntime, to: []const u8, msg: anytype) u64 { + return self.rpc(true, to, msg).msg_id; + } + + pub fn neighbours(self: ScopedRuntime) NeighbourIterator { + return NeighbourIterator{ + .node_id = self.node_id, + .nodes = self.nodes, + .len = self.nodes.len, + }; + } + + pub fn is_cluster_node(_: ScopedRuntime, src: []const u8) bool { + return src.len > 0 and src[0] == 'n'; + } +}; + +pub const NeighbourIterator = struct { + node_id: []const u8, + nodes: [][]const u8, + len: usize, + index: usize = 0, + + pub fn next(it: *NeighbourIterator) ?[]const u8 { + if (it.index >= it.len) return null; + if (std.mem.eql(u8, it.nodes[it.index], it.node_id)) { + it.index += 1; + if (it.index >= it.len) return null; + } + it.index += 1; + return it.nodes[it.index - 1]; + } + + /// Reset the iterator to the initial index + pub fn reset(it: *NeighbourIterator) void { + it.index = 0; + } +}; + +const DummyMutex = struct { + fn lock(_: *DummyMutex) void {} + fn unlock(_: *DummyMutex) void {} +}; + +// reader: std.io.Reader.{} +fn nextLine(reader: anytype, buffer: []u8) !?[]const u8 { + var line = (try reader.readUntilDelimiterOrEof(buffer, '\n')) orelse return null; + // trim annoying windows-only carriage return character + if (@import("builtin").os.tag == .windows) { + return std.mem.trimRight(u8, line, "\r"); + } else { + return line; + } +} diff --git a/demo/zig/src/wait_job_queue.zig b/demo/zig/src/wait_job_queue.zig new file mode 100644 index 0000000..0cea4a1 --- /dev/null +++ b/demo/zig/src/wait_job_queue.zig @@ -0,0 +1,451 @@ +const std = @import("std"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const expect = std.testing.expect; + +/// Simplest thread-safe job queue that make threads wait in get(). +/// Many producer, many consumer, non-allocating, thread-safe. +/// Uses a mutex to protect access. +/// The queue does not manage ownership and the user is responsible to +/// manage the storage of the nodes. +pub fn WaitJobQueue(comptime T: type) type { + return struct { + head: ?*Node, + tail: ?*Node, + mutex: std.Thread.Mutex, + cond: std.Thread.Condition, + shutdown_f: bool, + + pub const Self = @This(); + pub const Node = std.TailQueue(T).Node; + + /// Initializes a new queue. The queue does not provide a `deinit()` + /// function, so the user must take care of cleaning up the queue elements. + pub fn init() Self { + return Self{ + .head = null, + .tail = null, + .mutex = std.Thread.Mutex{}, + .cond = std.Thread.Condition{}, + .shutdown_f = false, + }; + } + + /// Appends `node` to the queue. + /// The lifetime of `node` must be longer than lifetime of queue. + pub fn put(self: *Self, node: *Node) void { + node.next = null; + + self.mutex.lock(); + defer self.mutex.unlock(); + defer self.cond.signal(); + + node.prev = self.tail; + self.tail = node; + if (node.prev) |prev_tail| { + prev_tail.next = node; + } else { + assert(self.head == null); + self.head = node; + } + } + + /// Gets a previously inserted node or returns `null` if there is none. + /// It is safe to `get()` a node from the queue while another thread tries + /// to `remove()` the same node at the same time. + pub fn get(self: *Self) ?*Node { + return self.get_internal(true); + } + + /// Gets a previously inserted node or returns `null` if there is none. + /// It is safe to `get()` a node from the queue while another thread tries + /// to `remove()` the same node at the same time. + pub fn try_get(self: *Self) ?*Node { + return self.get_internal(false); + } + + /// Gets a previously inserted node or returns `null` if there is none. + /// It is safe to `get()` a node from the queue while another thread tries + /// to `remove()` the same node at the same time. + fn get_internal(self: *Self, wait: bool) ?*Node { + self.mutex.lock(); + defer self.mutex.unlock(); + + // allow to drain queue first before shutting down. + if (wait) { + while (self.head orelse null == null) { + if (self.shutdown_f) return null; + self.cond.wait(&self.mutex); + } + } + + const head = self.head orelse return null; + self.head = head.next; + if (head.next) |new_head| { + new_head.prev = null; + } else { + self.tail = null; + } + // This way, a get() and a remove() are thread-safe with each other. + head.prev = null; + head.next = null; + return head; + } + + /// Prepends `node` to the front of the queue. + /// The lifetime of `node` must be longer than the lifetime of the queue. + pub fn unget(self: *Self, node: *Node) void { + node.prev = null; + + self.mutex.lock(); + defer self.mutex.unlock(); + defer self.cond.signal(); + + const opt_head = self.head; + self.head = node; + if (opt_head) |old_head| { + node.next = old_head; + } else { + assert(self.tail == null); + self.tail = node; + } + } + + /// Removes a node from the queue, returns whether node was actually removed. + /// It is safe to `remove()` a node from the queue while another thread tries + /// to `get()` the same node at the same time. + pub fn remove(self: *Self, node: *Node) bool { + self.mutex.lock(); + defer self.mutex.unlock(); + + if (node.prev == null and node.next == null and self.head != node) { + return false; + } + + if (node.prev) |prev| { + prev.next = node.next; + } else { + self.head = node.next; + } + if (node.next) |next| { + next.prev = node.prev; + } else { + self.tail = node.prev; + } + node.prev = null; + node.next = null; + return true; + } + + /// Returns `true` if the queue is currently empty. + /// Note that in a multi-consumer environment a return value of `false` + /// does not mean that `get` will yield a non-`null` value! + pub fn isEmpty(self: *Self) bool { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.head == null; + } + + /// Dumps the contents of the queue to `stderr`. + pub fn dump(self: *Self) void { + self.dumpToStream(std.io.getStdErr().writer()) catch return; + } + + /// Dumps the contents of the queue to `stream`. + /// Up to 4 elements from the head are dumped and the tail of the queue is + /// dumped as well. + pub fn dumpToStream(self: *Self, stream: anytype) !void { + const S = struct { + fn dumpRecursive( + s: anytype, + optional_node: ?*Node, + indent: usize, + comptime depth: comptime_int, + ) !void { + try s.writeByteNTimes(' ', indent); + if (optional_node) |node| { + try s.print("0x{x}={}\n", .{ @ptrToInt(node), node.data }); + if (depth == 0) { + try s.print("(max depth)\n", .{}); + return; + } + try dumpRecursive(s, node.next, indent + 1, depth - 1); + } else { + try s.print("(null)\n", .{}); + } + } + }; + self.mutex.lock(); + defer self.mutex.unlock(); + + try stream.print("head: ", .{}); + try S.dumpRecursive(stream, self.head, 0, 4); + try stream.print("tail: ", .{}); + try S.dumpRecursive(stream, self.tail, 0, 4); + } + + pub fn shutdown(self: *Self) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.shutdown_f = true; + + self.cond.broadcast(); + } + }; +} + +const Context = struct { + allocator: std.mem.Allocator, + queue: *WaitJobQueue(i32), + put_sum: isize, + get_sum: isize, + get_count: usize, + puts_done: bool, +}; + +// TODO add lazy evaluated build options and then put puts_per_thread behind +// some option such as: "AggressiveMultithreadedFuzzTest". In the AppVeyor +// CI we would use a less aggressive setting since at 1 core, while we still +// want this test to pass, we need a smaller value since there is so much thrashing +// we would also use a less aggressive setting when running in valgrind +const puts_per_thread = 500; +const put_thread_count = 3; + +test "std.atomic.Queue" { + var plenty_of_memory = try std.heap.page_allocator.alloc(u8, 300 * 1024); + defer std.heap.page_allocator.free(plenty_of_memory); + + var fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(plenty_of_memory); + var a = fixed_buffer_allocator.threadSafeAllocator(); + + var queue = WaitJobQueue(i32).init(); + var context = Context{ + .allocator = a, + .queue = &queue, + .put_sum = 0, + .get_sum = 0, + .puts_done = false, + .get_count = 0, + }; + + if (builtin.single_threaded) { + try expect(context.queue.isEmpty()); + { + var i: usize = 0; + while (i < put_thread_count) : (i += 1) { + try expect(startPuts(&context) == 0); + } + } + try expect(!context.queue.isEmpty()); + context.puts_done = true; + { + var i: usize = 0; + while (i < put_thread_count) : (i += 1) { + try expect(startGets(&context) == 0); + } + } + try expect(context.queue.isEmpty()); + } else { + try expect(context.queue.isEmpty()); + + var putters: [put_thread_count]std.Thread = undefined; + for (putters) |*t| { + t.* = try std.Thread.spawn(.{}, startPuts, .{&context}); + } + var getters: [put_thread_count]std.Thread = undefined; + for (getters) |*t| { + t.* = try std.Thread.spawn(.{}, startGets, .{&context}); + } + + for (putters) |t| + t.join(); + @atomicStore(bool, &context.puts_done, true, .SeqCst); + for (getters) |t| + t.join(); + + try expect(context.queue.isEmpty()); + } + + if (context.put_sum != context.get_sum) { + std.debug.panic("failure\nput_sum:{} != get_sum:{}", .{ context.put_sum, context.get_sum }); + } + + if (context.get_count != puts_per_thread * put_thread_count) { + std.debug.panic("failure\nget_count:{} != puts_per_thread:{} * put_thread_count:{}", .{ + context.get_count, + @as(u32, puts_per_thread), + @as(u32, put_thread_count), + }); + } +} + +fn startPuts(ctx: *Context) u8 { + var put_count: usize = puts_per_thread; + var prng = std.rand.DefaultPrng.init(0xdeadbeef); + const random = prng.random(); + while (put_count != 0) : (put_count -= 1) { + std.time.sleep(1); // let the os scheduler be our fuzz + const x = @bitCast(i32, random.int(u32)); + const node = ctx.allocator.create(WaitJobQueue(i32).Node) catch unreachable; + node.* = .{ + .prev = undefined, + .next = undefined, + .data = x, + }; + ctx.queue.put(node); + _ = @atomicRmw(isize, &ctx.put_sum, .Add, x, .SeqCst); + } + return 0; +} + +fn startGets(ctx: *Context) u8 { + while (true) { + const last = @atomicLoad(bool, &ctx.puts_done, .SeqCst); + + while (ctx.queue.try_get()) |node| { + std.time.sleep(1); // let the os scheduler be our fuzz + _ = @atomicRmw(isize, &ctx.get_sum, .Add, node.data, .SeqCst); + _ = @atomicRmw(usize, &ctx.get_count, .Add, 1, .SeqCst); + } + + if (last) return 0; + } +} + +test "std.atomic.Queue single-threaded" { + var queue = WaitJobQueue(i32).init(); + try expect(queue.isEmpty()); + + var node_0 = WaitJobQueue(i32).Node{ + .data = 0, + .next = undefined, + .prev = undefined, + }; + queue.put(&node_0); + try expect(!queue.isEmpty()); + + var node_1 = WaitJobQueue(i32).Node{ + .data = 1, + .next = undefined, + .prev = undefined, + }; + queue.put(&node_1); + try expect(!queue.isEmpty()); + + try expect(queue.get().?.data == 0); + try expect(!queue.isEmpty()); + + var node_2 = WaitJobQueue(i32).Node{ + .data = 2, + .next = undefined, + .prev = undefined, + }; + queue.put(&node_2); + try expect(!queue.isEmpty()); + + var node_3 = WaitJobQueue(i32).Node{ + .data = 3, + .next = undefined, + .prev = undefined, + }; + queue.put(&node_3); + try expect(!queue.isEmpty()); + + try expect(queue.get().?.data == 1); + try expect(!queue.isEmpty()); + + try expect(queue.get().?.data == 2); + try expect(!queue.isEmpty()); + + var node_4 = WaitJobQueue(i32).Node{ + .data = 4, + .next = undefined, + .prev = undefined, + }; + queue.put(&node_4); + try expect(!queue.isEmpty()); + + try expect(queue.get().?.data == 3); + node_3.next = null; + try expect(!queue.isEmpty()); + + queue.unget(&node_3); + try expect(queue.get().?.data == 3); + try expect(!queue.isEmpty()); + + try expect(queue.get().?.data == 4); + try expect(queue.isEmpty()); + + try expect(queue.try_get() == null); + try expect(queue.isEmpty()); + + // unget an empty queue + queue.unget(&node_4); + try expect(queue.tail == &node_4); + try expect(queue.head == &node_4); + + try expect(queue.get().?.data == 4); + + try expect(queue.try_get() == null); + try expect(queue.isEmpty()); +} + +test "std.atomic.Queue dump" { + const mem = std.mem; + var buffer: [1024]u8 = undefined; + var expected_buffer: [1024]u8 = undefined; + var fbs = std.io.fixedBufferStream(&buffer); + + var queue = WaitJobQueue(i32).init(); + + // Test empty stream + fbs.reset(); + try queue.dumpToStream(fbs.writer()); + try expect(mem.eql(u8, buffer[0..fbs.pos], + \\head: (null) + \\tail: (null) + \\ + )); + + // Test a stream with one element + var node_0 = WaitJobQueue(i32).Node{ + .data = 1, + .next = undefined, + .prev = undefined, + }; + queue.put(&node_0); + + fbs.reset(); + try queue.dumpToStream(fbs.writer()); + + var expected = try std.fmt.bufPrint(expected_buffer[0..], + \\head: 0x{x}=1 + \\ (null) + \\tail: 0x{x}=1 + \\ (null) + \\ + , .{ @ptrToInt(queue.head), @ptrToInt(queue.tail) }); + try expect(mem.eql(u8, buffer[0..fbs.pos], expected)); + + // Test a stream with two elements + var node_1 = WaitJobQueue(i32).Node{ + .data = 2, + .next = undefined, + .prev = undefined, + }; + queue.put(&node_1); + + fbs.reset(); + try queue.dumpToStream(fbs.writer()); + + expected = try std.fmt.bufPrint(expected_buffer[0..], + \\head: 0x{x}=1 + \\ 0x{x}=2 + \\ (null) + \\tail: 0x{x}=2 + \\ (null) + \\ + , .{ @ptrToInt(queue.head), @ptrToInt(queue.head.?.next), @ptrToInt(queue.tail) }); + try expect(mem.eql(u8, buffer[0..fbs.pos], expected)); +}