From 8a98430ddacffd1d1be7984c20d7fab102075203 Mon Sep 17 00:00:00 2001 From: Martin Ondejka Date: Fri, 29 Nov 2024 01:20:44 +0100 Subject: [PATCH] Refactor da versioning (#232) --- src/app/zeko/da_layer/lib/client.ml | 62 ++++++++--- src/app/zeko/da_layer/lib/diff.ml | 76 +++++++------ src/app/zeko/da_layer/lib/node.ml | 30 +++-- src/app/zeko/da_layer/lib/rpc.ml | 111 +++++++++---------- src/app/zeko/sequencer/archive_relay/run.ml | 12 +- src/app/zeko/sequencer/lib/zeko_sequencer.ml | 16 ++- 6 files changed, 179 insertions(+), 128 deletions(-) diff --git a/src/app/zeko/da_layer/lib/client.ml b/src/app/zeko/da_layer/lib/client.ml index 74aeeacab4..6b60ca57b9 100644 --- a/src/app/zeko/da_layer/lib/client.ml +++ b/src/app/zeko/da_layer/lib/client.ml @@ -36,27 +36,44 @@ module Rpc = struct go max_tries [] let post_diff ~logger ~node_location ~ledger_openings ~diff = - dispatch ~logger node_location Rpc.Post_diff.v2 { ledger_openings; diff } + dispatch ~logger node_location Rpc.Post_diff.V1.t { ledger_openings; diff } let get_diff ~logger ~node_location ~ledger_hash = - dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.v2 ledger_hash + match%bind + dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V2.t ledger_hash + with + | Ok diff -> + return (Ok diff) + | Error _ -> + (* TODO: do this only if the error is that the rpc method doesn't exist *) + (* Fallback to older version *) + let%bind.Deferred.Result result = + dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V1.t + ledger_hash + in + return (Ok (Option.map result ~f:(fun x -> Diff.Stable.V1.to_latest x))) let get_all_keys ~logger ~node_location () = - dispatch ~max_tries:1 ~logger node_location Rpc.Get_all_keys.v1 () + dispatch ~max_tries:1 ~logger node_location Rpc.Get_all_keys.V1.t () let get_diff_source ~logger ~node_location ~ledger_hash = - dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff_source.v1 + dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff_source.V1.t ledger_hash let get_node_public_key ~logger ~node_location () = - dispatch ~max_tries:1 ~logger node_location Rpc.Get_signer_public_key.v1 () + dispatch ~max_tries:1 ~logger node_location Rpc.Get_signer_public_key.V1.t + () let get_signature ~logger ~node_location ~ledger_hash = - dispatch ~max_tries:1 ~logger node_location Rpc.Get_signature.v1 ledger_hash + dispatch ~max_tries:1 ~logger node_location Rpc.Get_signature.V1.t + ledger_hash end module Config = struct - type t = { nodes : Host_and_port.t Cli_lib.Flag.Types.with_name list } + type t = + { mutable nodes : Host_and_port.t Cli_lib.Flag.Types.with_name list + (** Mutable in case we want to throw out some node *) + } [@@deriving fields] let of_string_list uris = @@ -67,6 +84,11 @@ module Config = struct ; name = sprintf "da-node-%d" i } ) } + + let throw_out_node t ~(node : Host_and_port.t Cli_lib.Flag.Types.with_name) = + let open Cli_lib.Flag.Types in + t.nodes <- + List.filter t.nodes ~f:(fun n -> not (String.equal n.name node.name)) end (** Send the diff to all the nodes in the [~config] *) @@ -198,7 +220,7 @@ let distribute_genesis_diff ~logger ~config ~ledger = ~f:(fun acc (index, _) -> Sparse_ledger.set_exn acc index Account.empty) in let diff = - Diff.Without_timestamp.create + Diff.create ~source_ledger_hash:(Diff.empty_ledger_hash ~depth:(Ledger.depth ledger)) ~changed_accounts ~command_with_action_step_flags:None in @@ -233,7 +255,7 @@ let sync_nodes ~logger ~config ~depth ~target_ledger_hash = return (Ok (attach_openings - ~diffs:(List.map diffs_with_timestamps ~f:fst) + ~diffs:(List.map diffs_with_timestamps ~f:Diff.drop_time) ~depth ) ) ) in Deferred.List.map config.nodes ~f:(fun node -> @@ -244,14 +266,24 @@ let sync_nodes ~logger ~config ~depth ~target_ledger_hash = printf "Node %s is already synced\n%!" (Host_and_port.to_string node.value) ; return (Ok ()) - | Ok None | Error _ -> + | Ok None | Error _ -> ( printf "Syncing node %s\n%!" (Host_and_port.to_string node.value) ; let%bind.Deferred.Result diffs_with_openings = Lazy.force diffs_with_openings in - Deferred.List.map diffs_with_openings ~f:(fun (diff, openings) -> - Rpc.post_diff ~logger ~node_location:node - ~ledger_openings:openings ~diff - >>| Result.ignore_m ) - >>| Result.all_unit ) + match%bind + Deferred.List.map diffs_with_openings ~f:(fun (diff, openings) -> + Rpc.post_diff ~logger ~node_location:node + ~ledger_openings:openings ~diff + >>| Result.ignore_m ) + >>| Result.all_unit + with + | Ok () -> + return (Ok ()) + | Error e -> + printf "Error syncing node %s: %s\n%!" + (Host_and_port.to_string node.value) + (Error.to_string_hum e) ; + Config.throw_out_node config ~node ; + return (Ok ()) ) ) >>| Result.all_unit diff --git a/src/app/zeko/da_layer/lib/diff.ml b/src/app/zeko/da_layer/lib/diff.ml index 10d1c89399..8ef199f811 100644 --- a/src/app/zeko/da_layer/lib/diff.ml +++ b/src/app/zeko/da_layer/lib/diff.ml @@ -2,35 +2,20 @@ open Core_kernel open Mina_base open Mina_ledger -module Without_timestamp = struct - [%%versioned - module Stable = struct - module V1 = struct - type t = - { source_ledger_hash : Ledger_hash.Stable.V1.t - (** Source ledger hash of the diff *) - ; changed_accounts : (int * Account.Stable.V2.t) list - (** List of changed accounts with corresponding index in the ledger *) - ; command_with_action_step_flags : - (User_command.Stable.V2.t * bool list) option - (** Optionally add command with corresponding action steps to store the history *) - } - [@@deriving yojson, fields, sexp_of] - - let to_latest = Fn.id - end - end] - - let create ~source_ledger_hash ~changed_accounts - ~command_with_action_step_flags = - { source_ledger_hash; changed_accounts; command_with_action_step_flags } -end - [%%versioned module Stable = struct module V2 = struct - type t = Without_timestamp.Stable.V1.t * Block_time.Stable.V1.t - [@@deriving yojson, sexp_of] + type t = + { source_ledger_hash : Ledger_hash.Stable.V1.t + (** Source ledger hash of the diff *) + ; changed_accounts : (int * Account.Stable.V2.t) list + (** List of changed accounts with corresponding index in the ledger *) + ; command_with_action_step_flags : + (User_command.Stable.V2.t * bool list) option + (** Optionally add command with corresponding action steps to store the history *) + ; timestamp : Block_time.Stable.V1.t (** Timestamp of the diff *) + } + [@@deriving yojson, fields, sexp_of] let to_latest = Fn.id end @@ -47,22 +32,45 @@ module Stable = struct } [@@deriving yojson, fields, sexp_of] - let to_latest t = (Block_time.zero, t) + let to_latest ?(timestamp = Block_time.zero) (t : t) = + { V2.source_ledger_hash = t.source_ledger_hash + ; changed_accounts = t.changed_accounts + ; command_with_action_step_flags = t.command_with_action_step_flags + ; timestamp + } end end] -let changed_accounts { Without_timestamp.Stable.V1.changed_accounts; _ } = - changed_accounts +let create ~source_ledger_hash ~changed_accounts ~command_with_action_step_flags + = + { Stable.V1.source_ledger_hash + ; changed_accounts + ; command_with_action_step_flags + } -let source_ledger_hash { Without_timestamp.Stable.V1.source_ledger_hash; _ } = - source_ledger_hash +let changed_accounts { Stable.V1.changed_accounts; _ } = changed_accounts + +let source_ledger_hash { Stable.V1.source_ledger_hash; _ } = source_ledger_hash let command_with_action_step_flags - { Without_timestamp.Stable.V1.command_with_action_step_flags; _ } = + { Stable.V1.command_with_action_step_flags; _ } = command_with_action_step_flags -let add_time ~logger (t : Without_timestamp.t) : t = - (t, Block_time.now (Block_time.Controller.basic ~logger)) +let add_time ~logger t = + Stable.V1.to_latest + ~timestamp:(Block_time.now (Block_time.Controller.basic ~logger)) + t + +let drop_time + { Stable.V2.source_ledger_hash + ; changed_accounts + ; command_with_action_step_flags + ; _ + } = + { Stable.V1.source_ledger_hash + ; changed_accounts + ; command_with_action_step_flags + } let to_bigstring = Binable.to_bigstring (module Stable.Latest) diff --git a/src/app/zeko/da_layer/lib/node.ml b/src/app/zeko/da_layer/lib/node.ml index 5b99ef86f8..f403d112f7 100644 --- a/src/app/zeko/da_layer/lib/node.ml +++ b/src/app/zeko/da_layer/lib/node.ml @@ -125,8 +125,7 @@ let post_diff t ~ledger_openings ~diff = Ok () | true -> Error - (Error.create "Duplicate indices" diff - [%sexp_of: Diff.Without_timestamp.t] ) + (Error.create "Duplicate indices" diff [%sexp_of: Diff.Stable.V1.t]) in (* 4 *) @@ -251,7 +250,8 @@ let post_diff t ~ledger_openings ~diff = in (* 7 *) - let diff = Diff.add_time ~logger diff in + (* V2 was added time *) + let diff : Diff.Stable.V2.t = Diff.add_time ~logger diff in (* 8 *) (* We don't care if the diff already existed *) @@ -327,7 +327,8 @@ let get_signature t ~ledger_hash = let implementations t = Async.Rpc.Implementations.create_exn ~on_unknown_rpc:`Raise ~implementations: - [ Async.Rpc.Rpc.implement Rpc.Post_diff.v2 + (* Post_diff *) + [ Async.Rpc.Rpc.implement Rpc.Post_diff.V1.t (fun () { ledger_openings; diff } -> match post_diff t ~ledger_openings ~diff with | Ok signature -> @@ -337,12 +338,19 @@ let implementations t = [%log warn] "Error posting diff: $error" ~metadata:[ ("error", `String (Error.to_string_hum e)) ] ; failwith (Error.to_string_hum e) ) - ; Async.Rpc.Rpc.implement Rpc.Get_diff.v2 (fun () query -> + (* Get_diff *) + ; Async.Rpc.Rpc.implement Rpc.Get_diff.V1.t (fun () query -> + let v2_diff = Db.get_diff t.db ~ledger_hash:query in + let v1_diff = Option.map v2_diff ~f:Diff.drop_time in + Async.return @@ v1_diff ) + ; Async.Rpc.Rpc.implement Rpc.Get_diff.V2.t (fun () query -> Async.return @@ Db.get_diff t.db ~ledger_hash:query ) - ; Async.Rpc.Rpc.implement Rpc.Get_all_keys.v1 (fun () () -> + (* Get_all_keys *) + ; Async.Rpc.Rpc.implement Rpc.Get_all_keys.V1.t (fun () () -> Async.return @@ Db.get_index t.db ) - ; Async.Rpc.Rpc.implement Rpc.Get_diff_source.v1 (fun () query -> - Async.return @@ Diff.source_ledger_hash @@ fst + (* Get_diff_source *) + ; Async.Rpc.Rpc.implement Rpc.Get_diff_source.V1.t (fun () query -> + Async.return @@ Diff.Stable.Latest.source_ledger_hash @@ Option.value_exn ~error: ( Error.of_string @@ -351,9 +359,11 @@ let implementations t = hash %s" (Ledger_hash.to_decimal_string query) ) @@ Db.get_diff t.db ~ledger_hash:query ) - ; Async.Rpc.Rpc.implement Rpc.Get_signer_public_key.v1 (fun () () -> + (* Get_signed_public_key *) + ; Async.Rpc.Rpc.implement Rpc.Get_signer_public_key.V1.t (fun () () -> Async.return @@ Public_key.compress @@ t.signer.public_key ) - ; Async.Rpc.Rpc.implement Rpc.Get_signature.v1 (fun () query -> + (* Get_signature *) + ; Async.Rpc.Rpc.implement Rpc.Get_signature.V1.t (fun () query -> Async.return @@ get_signature t ~ledger_hash:query ) ] diff --git a/src/app/zeko/da_layer/lib/rpc.ml b/src/app/zeko/da_layer/lib/rpc.ml index 22cb99e159..9023fb32d1 100644 --- a/src/app/zeko/da_layer/lib/rpc.ml +++ b/src/app/zeko/da_layer/lib/rpc.ml @@ -6,92 +6,85 @@ open Signature_lib (* val post_diff : ledger_openings:Sparse_ledger.t -> diff:Diff.t -> Signature.t *) module Post_diff = struct - module Query = struct - [%%versioned - module Stable = struct - module V2 = struct - type t = - { ledger_openings : Sparse_ledger.Stable.V2.t - ; diff : Diff.Without_timestamp.Stable.V1.t - } + module V1 = struct + module Query = struct + (* Use Diff.V1 without timestamp, the node determines the timestamp itself *) + type t = + { ledger_openings : Sparse_ledger.Stable.V2.t; diff : Diff.Stable.V1.t } + [@@deriving bin_io_unversioned] + end - let to_latest = Fn.id - end - end] + let t : (Query.t, Signature.t) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Post_diff" ~version:1 ~bin_query:Query.bin_t + ~bin_response:Signature.Stable.V1.bin_t end - - let v2 : (Query.t, Signature.t) Rpc.Rpc.t = - Rpc.Rpc.create ~name:"Post_diff" ~version:2 ~bin_query:Query.Stable.V2.bin_t - ~bin_response:Signature.Stable.V1.bin_t end (* val get_diff : Ledger_hash.t -> Diff.t option *) module Get_diff = struct - module Response = struct - [%%versioned - module Stable = struct - module V2 = struct - type t = Diff.Stable.V2.t option + module V1 = struct + module Response = struct + type t = Diff.Stable.V1.t option [@@deriving bin_io_unversioned] + end - let to_latest = Fn.id - end - end] + let t : (Ledger_hash.t, Response.t) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Get_diff" ~version:1 + ~bin_query:Ledger_hash.Stable.V1.bin_t ~bin_response:Response.bin_t end - let v2 : (Ledger_hash.t, Response.t) Rpc.Rpc.t = - Rpc.Rpc.create ~name:"Get_diff" ~version:2 - ~bin_query:Ledger_hash.Stable.V1.bin_t - ~bin_response:Response.Stable.V2.bin_t + module V2 = struct + module Response = struct + type t = Diff.Stable.V2.t option [@@deriving bin_io_unversioned] + end + + let t : (Ledger_hash.t, Response.t) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Get_diff" ~version:2 + ~bin_query:Ledger_hash.Stable.V1.bin_t ~bin_response:Response.bin_t + end end (* val get_all_keys : unit -> Ledger_hash.t list *) module Get_all_keys = struct - module Response = struct - [%%versioned - module Stable = struct - module V1 = struct - type t = Ledger_hash.Stable.V1.t list + module V1 = struct + module Response = struct + type t = Ledger_hash.Stable.V1.t list [@@deriving bin_io_unversioned] + end - let to_latest = Fn.id - end - end] + let t : (unit, Response.t) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Get_all_keys" ~version:1 ~bin_query:Unit.bin_t + ~bin_response:Response.bin_t end - - let v1 : (unit, Response.t) Rpc.Rpc.t = - Rpc.Rpc.create ~name:"Get_all_keys" ~version:1 ~bin_query:Unit.bin_t - ~bin_response:Response.Stable.V1.bin_t end (* val get_diff_source : Ledger_hash.t -> Ledger_hash.t *) module Get_diff_source = struct - let v1 : (Ledger_hash.t, Ledger_hash.t) Rpc.Rpc.t = - Rpc.Rpc.create ~name:"Get_diff_source" ~version:1 - ~bin_query:Ledger_hash.Stable.V1.bin_t - ~bin_response:Ledger_hash.Stable.V1.bin_t + module V1 = struct + let t : (Ledger_hash.t, Ledger_hash.t) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Get_diff_source" ~version:1 + ~bin_query:Ledger_hash.Stable.V1.bin_t + ~bin_response:Ledger_hash.Stable.V1.bin_t + end end (* val get_signer_public_key : unit -> Public_key.Compressed.t *) module Get_signer_public_key = struct - let v1 : (unit, Public_key.Compressed.t) Rpc.Rpc.t = - Rpc.Rpc.create ~name:"Get_signer_public_key" ~version:1 - ~bin_query:Unit.bin_t ~bin_response:Public_key.Compressed.Stable.V1.bin_t + module V1 = struct + let t : (unit, Public_key.Compressed.t) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Get_signer_public_key" ~version:1 + ~bin_query:Unit.bin_t + ~bin_response:Public_key.Compressed.Stable.V1.bin_t + end end (* val get_signature : Ledger_hash.t -> Signature.t option *) module Get_signature = struct - module Response = struct - [%%versioned - module Stable = struct - module V1 = struct - type t = Signature.Stable.V1.t option + module V1 = struct + module Response = struct + type t = Signature.Stable.V1.t option [@@deriving bin_io_unversioned] + end - let to_latest = Fn.id - end - end] + let t : (Ledger_hash.t, Signature.t option) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Get_signature" ~version:1 + ~bin_query:Ledger_hash.Stable.V1.bin_t ~bin_response:Response.bin_t end - - let v1 : (Ledger_hash.t, Signature.t option) Rpc.Rpc.t = - Rpc.Rpc.create ~name:"Get_signature" ~version:1 - ~bin_query:Ledger_hash.Stable.V1.bin_t - ~bin_response:Response.Stable.V1.bin_t end diff --git a/src/app/zeko/sequencer/archive_relay/run.ml b/src/app/zeko/sequencer/archive_relay/run.ml index 9e1f54953e..962be3db66 100644 --- a/src/app/zeko/sequencer/archive_relay/run.ml +++ b/src/app/zeko/sequencer/archive_relay/run.ml @@ -134,11 +134,15 @@ let sync_archive ~(state : State.t) ~hash = let%bind diffs = Deferred.List.map chain ~f:(fetch_diff ~state) in Ledger.with_ledger ~depth:constraint_constants.ledger_depth ~f:(fun ledger -> let protocol_state = ref compile_time_genesis_state in - Deferred.List.iter diffs ~f:(fun (diff, diff_timestamp) -> - match Da_layer.Diff.command_with_action_step_flags diff with + Deferred.List.iter diffs ~f:(fun diff -> + match + Da_layer.Diff.Stable.Latest.command_with_action_step_flags diff + with | None -> (* Apply accounts diff *) - let changed_accounts = Da_layer.Diff.changed_accounts diff in + let changed_accounts = + Da_layer.Diff.Stable.Latest.changed_accounts diff + in List.iter changed_accounts ~f:(fun (index, account) -> Ledger.set_at_index_exn ledger index account ) ; Ledger.commit ledger ; @@ -165,7 +169,7 @@ let sync_archive ~(state : State.t) ~hash = ~protocol_state:!protocol_state ~ledger ~txn:(Ledger.Transaction_applied.transaction txn_applied) ~dummy_fee_payer:Zkapps_rollup.inner_public_key - ~timestamp:diff_timestamp + ~timestamp:(Da_layer.Diff.Stable.Latest.timestamp diff) in protocol_state := new_protocol_state ; if State.has_been_relayed state (Ledger.merkle_root ledger) then diff --git a/src/app/zeko/sequencer/lib/zeko_sequencer.ml b/src/app/zeko/sequencer/lib/zeko_sequencer.ml index a90619f48d..abc9d36a94 100644 --- a/src/app/zeko/sequencer/lib/zeko_sequencer.ml +++ b/src/app/zeko/sequencer/lib/zeko_sequencer.ml @@ -594,7 +594,7 @@ module Sequencer = struct (index, L.get_at_index_exn l index) ) in let diff = - Da_layer.Diff.Without_timestamp.create + Da_layer.Diff.create ~source_ledger_hash:(Sparse_ledger.merkle_root first_pass_ledger) ~changed_accounts ~command_with_action_step_flags: @@ -796,19 +796,23 @@ module Sequencer = struct let%bind () = Deferred.List.iter ~how:`Sequential ledger_hashes_chain ~f:(fun ledger_hash -> - let%bind diff : Da_layer.Diff.Without_timestamp.t Deferred.t = + let%bind diff : Da_layer.Diff.t Deferred.t = Da_layer.Client.get_diff ~logger ~config:da_config ~ledger_hash - >>| Or_error.ok_exn >>| fst + >>| Or_error.ok_exn in assert ( Ledger_hash.equal - (Da_layer.Diff.source_ledger_hash diff) + (Da_layer.Diff.Stable.Latest.source_ledger_hash diff) (get_root t) ) ; - match Da_layer.Diff.command_with_action_step_flags diff with + match + Da_layer.Diff.Stable.Latest.command_with_action_step_flags diff + with | None -> (* Apply accounts diff *) let mask = L.of_database t.db in - let changed_accounts = Da_layer.Diff.changed_accounts diff in + let changed_accounts = + Da_layer.Diff.Stable.Latest.changed_accounts diff + in printf "Setting %d accounts\n%!" (List.length changed_accounts) ; List.iter changed_accounts ~f:(fun (index, account) -> L.set_at_index_exn mask index account ) ;