diff --git a/src/app/zeko/da_layer/cli.ml b/src/app/zeko/da_layer/cli.ml index 64418dd6a0..d5dd2c6032 100644 --- a/src/app/zeko/da_layer/cli.ml +++ b/src/app/zeko/da_layer/cli.ml @@ -13,12 +13,17 @@ let run_node = flag "--port" (optional_with_default 8080 int) ~doc:"int Port to listen on" - and nodes_to_sync = - flag "--da-node-to-sync" (listed string) - ~doc:"string list Nodes to sync with" + and node_to_sync = + flag "--da-node-to-sync" (optional string) + ~doc:"string Nodes to sync with" + and hash_to_sync = + flag "--hash-to-sync" (optional string) + ~doc:"string Hash to sync with in decimal string form" and testing_mode = flag "--random-sk" no_arg ~doc:"Run in testing mode, the signer key will be generated randomly" + and no_migrations = + flag "--no-migrations" no_arg ~doc:"Do not run migrations" in fun () -> let signer = @@ -26,17 +31,24 @@ let run_node = else Sys.getenv_exn "MINA_PRIVATE_KEY" in let logger = Logger.create () in - let nodes_to_sync = - List.map nodes_to_sync ~f:(fun node_to_sync -> - Cli_lib.Flag.Types. - { value = Core_kernel.Host_and_port.of_string node_to_sync - ; name = "node-to-sync" - } ) + let sync_arg = + match (node_to_sync, hash_to_sync) with + | Some node_to_sync, Some hash_to_sync -> + Some + ( Cli_lib.Flag.Types. + { value = Core_kernel.Host_and_port.of_string node_to_sync + ; name = "node-to-sync" + } + , Mina_base.Ledger_hash.of_decimal_string hash_to_sync ) + | None, None -> + None + | _ -> + failwith "Both node-to-sync and hash-to-sync must be provided" in let%bind () = Deferred.ignore_m - @@ Da_layer.Node.create_server ~nodes_to_sync ~logger ~port ~db_dir - ~signer_sk:signer () + @@ Da_layer.Node.create_server ~sync_arg ~logger ~port ~db_dir + ~signer_sk:signer ~no_migrations () in [%log info] "Server started on port $port" ~metadata:[ ("port", `Int port) ] ; diff --git a/src/app/zeko/da_layer/lib/client.ml b/src/app/zeko/da_layer/lib/client.ml index 6b60ca57b9..2ca2afd8e5 100644 --- a/src/app/zeko/da_layer/lib/client.ml +++ b/src/app/zeko/da_layer/lib/client.ml @@ -44,14 +44,21 @@ module Rpc = struct with | Ok diff -> return (Ok diff) - | Error _ -> - (* TODO: do this only if the error is that the rpc method doesn't exist *) - (* Fallback to older version *) - let%bind.Deferred.Result result = - dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V1.t - ledger_hash + | Error e -> + let v2_unimplemented = + Error.to_string_mach e + |> String.is_substring + ~substring:"Unimplemented_rpc Get_diff (Version 2)" in - return (Ok (Option.map result ~f:(fun x -> Diff.Stable.V1.to_latest x))) + if v2_unimplemented then + (* Fallback to older version *) + let%bind.Deferred.Result result = + dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V1.t + ledger_hash + in + return + (Ok (Option.map result ~f:(fun x -> Diff.Stable.V1.to_latest x))) + else return (Error e) let get_all_keys ~logger ~node_location () = dispatch ~max_tries:1 ~logger node_location Rpc.Get_all_keys.V1.t () @@ -67,6 +74,14 @@ module Rpc = struct let get_signature ~logger ~node_location ~ledger_hash = dispatch ~max_tries:1 ~logger node_location Rpc.Get_signature.V1.t ledger_hash + + let get_ledger_hashes_chain ~logger ~node_location ~source ~target = + dispatch ~max_tries:1 ~logger node_location Rpc.Get_ledger_hashes_chain.V1.t + { source; target } + + let get_diffs_chain ~logger ~node_location ~source ~target = + dispatch ~max_tries:1 ~logger node_location Rpc.Get_diffs_chain.V1.t + { source; target } end module Config = struct @@ -175,21 +190,18 @@ let try_all_nodes ~config ~f = in try_first ~accum_errors:[] (Config.nodes config) -(** Get the chain of ledger hashes from empty ledger hash to [target_ledger_hash] *) -let get_ledger_hashes_chain ~logger ~config ~depth ~target_ledger_hash = - let rec go current = - if Ledger_hash.equal current (Diff.empty_ledger_hash ~depth) then - return (Ok []) - else - let%bind.Deferred.Result source = - try_all_nodes ~config ~f:(fun ~node_location () -> - Rpc.get_diff_source ~logger ~node_location ~ledger_hash:current ) - in - let%bind.Deferred.Result next = go source in - return (Ok (current :: next)) - in - let%bind.Deferred.Result from_target_to_genesis = go target_ledger_hash in - return (Ok (List.rev from_target_to_genesis)) +(** Get the chain of ledger hashes from [source_ledger_hash] hash to [target_ledger_hash] *) +let get_ledger_hashes_chain ~logger ~config ~source_ledger_hash + ~target_ledger_hash = + try_all_nodes ~config ~f:(fun ~node_location () -> + Rpc.get_ledger_hashes_chain ~logger ~node_location + ~source:source_ledger_hash ~target:target_ledger_hash ) + +(** Get the chain of diffs from [source_ledger_hash] hash to [target_ledger_hash] *) +let get_diffs_chain ~logger ~config ~source_ledger_hash ~target_ledger_hash = + try_all_nodes ~config ~f:(fun ~node_location () -> + Rpc.get_diffs_chain ~logger ~node_location ~source:source_ledger_hash + ~target:target_ledger_hash ) (** Try to get the diff from the first node in the list, if it fails, try the next one *) let get_diff ~logger ~config ~ledger_hash = @@ -242,21 +254,16 @@ let attach_openings ~diffs ~depth = (diff, openings) ) let sync_nodes ~logger ~config ~depth ~target_ledger_hash = - let%bind.Deferred.Result ledger_hashes_chain = - get_ledger_hashes_chain ~logger ~config ~depth ~target_ledger_hash - in let diffs_with_openings = lazy - (let%bind.Deferred.Result diffs_with_timestamps = - Deferred.List.map ledger_hashes_chain ~how:(`Max_concurrent_jobs 5) - ~f:(fun ledger_hash -> get_diff ~logger ~config ~ledger_hash) - >>| Result.all + (* TODO: don't fetch all the diffs from genesis, using binary search determine which diffs is the node missing *) + (let%bind.Deferred.Result diffs = + get_diffs_chain ~logger ~config ~source_ledger_hash:`Genesis + ~target_ledger_hash in return - (Ok - (attach_openings - ~diffs:(List.map diffs_with_timestamps ~f:Diff.drop_time) - ~depth ) ) ) + (Ok (attach_openings ~diffs:(List.map diffs ~f:Diff.drop_time) ~depth)) + ) in Deferred.List.map config.nodes ~f:(fun node -> match%bind diff --git a/src/app/zeko/da_layer/lib/db.ml b/src/app/zeko/da_layer/lib/db.ml new file mode 100644 index 0000000000..fa19ef395f --- /dev/null +++ b/src/app/zeko/da_layer/lib/db.ml @@ -0,0 +1,78 @@ +open Core_kernel +open Mina_base + +(** Holds keys to all the diffes *) +module Index = struct + [%%versioned + module Stable = struct + module V1 = struct + type t = Ledger_hash.Stable.V1.t list + + let to_latest = Fn.id + end + end] + + let to_bigstring = Binable.to_bigstring (module Stable.Latest) + + let of_bigstring = Binable.of_bigstring (module Stable.Latest) +end + +module Key_value = struct + type _ t = + | Diff : (Ledger_hash.t * Diff.t) t + | Diff_index : (unit * Index.t) t + | Migration : (unit * int) t + + let serialize_key : type k v. (k * v) t -> k -> Bigstring.t = + fun pair_type key -> + match pair_type with + | Diff -> + Bigstring.concat + [ Bigstring.of_string "diff" + ; Bigstring.of_string @@ Ledger_hash.to_decimal_string key + ] + | Diff_index -> + Bigstring.of_string "diff_index" + | Migration -> + Bigstring.of_string "migration" + + let serialize_value : type k v. (k * v) t -> v -> Bigstring.t = + fun pair_type value -> + match pair_type with + | Diff -> + Diff.to_bigstring value + | Diff_index -> + Index.to_bigstring value + | Migration -> + Bigstring.of_string @@ Int.to_string value + + let deserialize_value : type k v. (k * v) t -> Bigstring.t -> v = + fun pair_type data -> + match pair_type with + | Diff -> + Diff.of_bigstring data |> Or_error.ok_exn + | Diff_index -> + Index.of_bigstring data + | Migration -> + Int.of_string @@ Bigstring.to_string data +end + +include Kvdb_base.Make (Key_value) + +let set_index t ~index = set t Diff_index ~key:() ~data:index + +let get_index t = get t Diff_index ~key:() |> Option.value ~default:[] + +let add_diff t ~ledger_hash ~diff = + let index = get_index t in + if List.mem index ledger_hash ~equal:Ledger_hash.equal then `Already_existed + else ( + set t Diff ~key:ledger_hash ~data:diff ; + set_index t ~index:(ledger_hash :: index) ; + `Added ) + +let get_diff t ~ledger_hash = get t Diff ~key:ledger_hash + +let get_migration t = get t Migration ~key:() |> Option.value ~default:0 + +let set_migration t ~migration = set t Migration ~key:() ~data:migration diff --git a/src/app/zeko/da_layer/lib/diff.ml b/src/app/zeko/da_layer/lib/diff.ml index 8ef199f811..0ab2c689dc 100644 --- a/src/app/zeko/da_layer/lib/diff.ml +++ b/src/app/zeko/da_layer/lib/diff.ml @@ -4,6 +4,8 @@ open Mina_ledger [%%versioned module Stable = struct + [@@@with_top_version_tag] + module V2 = struct type t = { source_ledger_hash : Ledger_hash.Stable.V1.t @@ -15,7 +17,7 @@ module Stable = struct (** Optionally add command with corresponding action steps to store the history *) ; timestamp : Block_time.Stable.V1.t (** Timestamp of the diff *) } - [@@deriving yojson, fields, sexp_of] + [@@deriving yojson, fields, sexp_of, compare] let to_latest = Fn.id end @@ -72,10 +74,28 @@ let drop_time ; command_with_action_step_flags } -let to_bigstring = Binable.to_bigstring (module Stable.Latest) +let to_bigstring = + Binable.to_bigstring (module Stable.Latest.With_top_version_tag) -let of_bigstring = Binable.of_bigstring (module Stable.Latest) +let of_bigstring bigstring = + let pos_ref = ref 0 in + Stable.bin_read_top_tagged_to_latest ~pos_ref bigstring (** [Ledger_hash.empty_hash] is [zero], so we need this for the genesis state of the rollup *) let empty_ledger_hash ~depth = Ledger.merkle_root @@ Ledger.create_ephemeral ~depth () + +let%test_unit "diff versioning" = + let v1 = + Stable.V1. + { source_ledger_hash = Ledger_hash.empty_hash + ; changed_accounts = [] + ; command_with_action_step_flags = None + } + in + let v1_serialized = + Binable.to_bigstring (module Stable.V1.With_top_version_tag) v1 + in + let v2 = of_bigstring v1_serialized |> Or_error.ok_exn in + + [%test_eq: Stable.V2.t] (Stable.V1.to_latest v1) (Stable.V2.to_latest v2) diff --git a/src/app/zeko/da_layer/lib/dune b/src/app/zeko/da_layer/lib/dune index e3a5c222a3..340324e590 100644 --- a/src/app/zeko/da_layer/lib/dune +++ b/src/app/zeko/da_layer/lib/dune @@ -1,5 +1,6 @@ (library (name da_layer) + (inline_tests) (libraries kvdb_base zkapps_rollup diff --git a/src/app/zeko/da_layer/lib/migrations.ml b/src/app/zeko/da_layer/lib/migrations.ml new file mode 100644 index 0000000000..e1f676aa55 --- /dev/null +++ b/src/app/zeko/da_layer/lib/migrations.ml @@ -0,0 +1,41 @@ +open Core_kernel + +type migration = Db.t -> unit + +let progress_bar ?(width = 30) progress = + let filled_length = int_of_float (float_of_int width *. progress) in + let bar = + String.make filled_length '#' ^ String.make (width - filled_length) '-' + in + Printf.printf "\r[%s] %.0f%%%!" bar (progress *. 100.0) ; + if Float.(progress >= 1.0) then Printf.printf "\n%!" + +let add_version_tag db = + let try_read buff = + try + let pos_ref = ref 0 in + Ok (Diff.Stable.V1.bin_read_t ~pos_ref buff) + with _ -> Error "Failed to convert diff" + in + let all = Db.to_alist db in + let l = List.length all in + List.iteri all ~f:(fun i (key, value) -> + progress_bar (Float.of_int i /. Float.of_int l) ; + match try_read value with + | Error _ -> + ( (* The pair is something different from diff *) ) + | Ok diff -> + let v2 = Diff.to_bigstring @@ Diff.Stable.V1.to_latest diff in + Db.set_raw db ~key ~data:v2 ) + +let migrations : migration list = [ add_version_tag ] + +let latest_migration = List.length migrations + +let run_migrations ~logger db = + let old_migration = Db.get_migration db in + List.drop migrations old_migration + |> List.iteri ~f:(fun i migration -> + [%log info] "Running migration %d" (old_migration + i + 1) ; + migration db ; + Db.set_migration db ~migration:(old_migration + i + 1) ) diff --git a/src/app/zeko/da_layer/lib/node.ml b/src/app/zeko/da_layer/lib/node.ml index f403d112f7..8c56ada969 100644 --- a/src/app/zeko/da_layer/lib/node.ml +++ b/src/app/zeko/da_layer/lib/node.ml @@ -3,72 +3,7 @@ open Mina_base open Mina_ledger open Signature_lib -module Db = struct - (** Holds keys to all the diffes *) - module Index = struct - [%%versioned - module Stable = struct - module V1 = struct - type t = Ledger_hash.Stable.V1.t list - - let to_latest = Fn.id - end - end] - - let to_bigstring = Binable.to_bigstring (module Stable.Latest) - - let of_bigstring = Binable.of_bigstring (module Stable.Latest) - end - - module Key_value = struct - type _ t = - | Diff : (Ledger_hash.t * Diff.t) t - | Diff_index : (unit * Index.t) t - - let serialize_key : type k v. (k * v) t -> k -> Bigstring.t = - fun pair_type key -> - match pair_type with - | Diff -> - Bigstring.concat - [ Bigstring.of_string "diff" - ; Bigstring.of_string @@ Ledger_hash.to_decimal_string key - ] - | Diff_index -> - Bigstring.of_string "diff_index" - - let serialize_value : type k v. (k * v) t -> v -> Bigstring.t = - fun pair_type value -> - match pair_type with - | Diff -> - Diff.to_bigstring value - | Diff_index -> - Index.to_bigstring value - - let deserialize_value : type k v. (k * v) t -> Bigstring.t -> v = - fun pair_type data -> - match pair_type with - | Diff -> - Diff.of_bigstring data - | Diff_index -> - Index.of_bigstring data - end - - include Kvdb_base.Make (Key_value) - - let set_index t ~index = set t Diff_index ~key:() ~data:index - - let get_index t = get t Diff_index ~key:() |> Option.value ~default:[] - - let add_diff t ~ledger_hash ~diff = - let index = get_index t in - if List.mem index ledger_hash ~equal:Ledger_hash.equal then `Already_existed - else ( - set t Diff ~key:ledger_hash ~data:diff ; - set_index t ~index:(ledger_hash :: index) ; - `Added ) - - let get_diff t ~ledger_hash = get t Diff ~key:ledger_hash -end +let constraint_constants = Genesis_constants.Constraint_constants.compiled type t = { db : Db.t; signer : Keypair.t; logger : Logger.t } @@ -272,63 +207,77 @@ let post_diff t ~ledger_openings ~diff = in Ok signature -(** Find missing keys and fetch corresponding diffes *) -let sync ~logger ~node_location t = +let sync t ~node_location ~ledger_hash = + let logger = t.logger in let open Async in - let%bind.Deferred.Result remote_keys = - Client.Rpc.get_all_keys ~logger ~node_location () - |> Deferred.map - ~f:(Result.map ~f:(List.dedup_and_sort ~compare:Ledger_hash.compare)) - in - let my_keys = Db.get_index t.db in - let missing_keys = - let set1 = Set.of_list (module Ledger_hash) remote_keys in - let set2 = Set.of_list (module Ledger_hash) my_keys in - Set.diff set1 set2 |> Set.to_list + let%bind.Deferred.Result diffs = + Client.Rpc.get_diffs_chain ~logger ~node_location ~source:`Genesis + ~target:ledger_hash in - let%bind () = - Deferred.List.iter ~how:`Parallel missing_keys ~f:(fun ledger_hash -> - let%bind diff = - match%bind - Client.Rpc.get_diff ~logger ~node_location ~ledger_hash - with - | Ok (Some diff) -> - return diff - | Ok None -> - failwithf "Syncing node claimed to have diff %s but it doesn't" - (Ledger_hash.to_decimal_string ledger_hash) - () - | Error err -> - failwithf "Failed syncing the diff %s, error: %s" - (Ledger_hash.to_decimal_string ledger_hash) - (Error.to_string_hum err) () - in - match Db.add_diff t.db ~ledger_hash ~diff with - | `Added -> - return - @@ [%log info] "Diff with target ledger hash $hash added" - ~metadata: - [ ( "hash" - , `String (Ledger_hash.to_decimal_string ledger_hash) ) - ] - | `Already_existed -> - failwithf - "Diff with target ledger hash %s already existed during syncing" - (Ledger_hash.to_decimal_string ledger_hash) - () ) + let diffs = + Client.attach_openings + ~diffs:(List.map diffs ~f:Diff.drop_time) + ~depth:constraint_constants.ledger_depth in - return (Ok ()) + List.map diffs ~f:(fun (diff, ledger_openings) -> + match post_diff t ~diff ~ledger_openings with + | Ok _signature -> + Ok () + | Error e -> + let logger = t.logger in + [%log warn] "Error posting diff: $error" + ~metadata:[ ("error", `String (Error.to_string_hum e)) ] ; + Error e ) + |> Result.all_unit |> return let get_signature t ~ledger_hash = let%bind.Option _diff = Db.get_diff t.db ~ledger_hash in let message = Random_oracle.Input.Chunked.field_elements [| ledger_hash |] in Some (Schnorr.Chunked.sign t.signer.private_key message) +let get_ledger_hashes_chain t + ({ source = source_opt; target } : Rpc.Get_ledger_hashes_chain.V1.Query.t) = + let source = + match source_opt with + | `Genesis -> + Diff.empty_ledger_hash ~depth:constraint_constants.ledger_depth + | `Specific source -> + source + in + let rec go current = + if Ledger_hash.equal current source then [] + else + let source = + Db.get_diff ~ledger_hash:current t.db + |> Option.value_exn ~here:[%here] + ~message:"Get_ledger_hashes_chain: diff not found" + |> Diff.Stable.V2.source_ledger_hash + in + current :: go source + in + let chain = List.rev (go target) in + match (source_opt, chain) with + | _, [] -> + failwithf + "Get_ledger_hashes_chain: no diffs found for source ledger hash %s and \ + target ledger hash %s" + (Ledger_hash.to_decimal_string source) + (Ledger_hash.to_decimal_string target) + () + | `Specific wanted_source, first_source :: _ + when not (Ledger_hash.equal wanted_source first_source) -> + failwithf + "Get_ledger_hashes_chain: source ledger hash %s is not in the chain" + (Ledger_hash.to_decimal_string source) + () + | _ -> + chain + let implementations t = Async.Rpc.Implementations.create_exn ~on_unknown_rpc:`Raise ~implementations: - (* Post_diff *) - [ Async.Rpc.Rpc.implement Rpc.Post_diff.V1.t + [ (* Post_diff *) + Async.Rpc.Rpc.implement Rpc.Post_diff.V1.t (fun () { ledger_openings; diff } -> match post_diff t ~ledger_openings ~diff with | Ok signature -> @@ -338,18 +287,18 @@ let implementations t = [%log warn] "Error posting diff: $error" ~metadata:[ ("error", `String (Error.to_string_hum e)) ] ; failwith (Error.to_string_hum e) ) - (* Get_diff *) - ; Async.Rpc.Rpc.implement Rpc.Get_diff.V1.t (fun () query -> + ; (* Get_diff *) + Async.Rpc.Rpc.implement Rpc.Get_diff.V1.t (fun () query -> let v2_diff = Db.get_diff t.db ~ledger_hash:query in let v1_diff = Option.map v2_diff ~f:Diff.drop_time in Async.return @@ v1_diff ) ; Async.Rpc.Rpc.implement Rpc.Get_diff.V2.t (fun () query -> Async.return @@ Db.get_diff t.db ~ledger_hash:query ) - (* Get_all_keys *) - ; Async.Rpc.Rpc.implement Rpc.Get_all_keys.V1.t (fun () () -> + ; (* Get_all_keys *) + Async.Rpc.Rpc.implement Rpc.Get_all_keys.V1.t (fun () () -> Async.return @@ Db.get_index t.db ) - (* Get_diff_source *) - ; Async.Rpc.Rpc.implement Rpc.Get_diff_source.V1.t (fun () query -> + ; (* Get_diff_source *) + Async.Rpc.Rpc.implement Rpc.Get_diff_source.V1.t (fun () query -> Async.return @@ Diff.Stable.Latest.source_ledger_hash @@ Option.value_exn ~error: @@ -359,19 +308,31 @@ let implementations t = hash %s" (Ledger_hash.to_decimal_string query) ) @@ Db.get_diff t.db ~ledger_hash:query ) - (* Get_signed_public_key *) - ; Async.Rpc.Rpc.implement Rpc.Get_signer_public_key.V1.t (fun () () -> + ; (* Get_signed_public_key *) + Async.Rpc.Rpc.implement Rpc.Get_signer_public_key.V1.t (fun () () -> Async.return @@ Public_key.compress @@ t.signer.public_key ) - (* Get_signature *) - ; Async.Rpc.Rpc.implement Rpc.Get_signature.V1.t (fun () query -> + ; (* Get_signature *) + Async.Rpc.Rpc.implement Rpc.Get_signature.V1.t (fun () query -> Async.return @@ get_signature t ~ledger_hash:query ) + ; (* Get_ledger_hashes_chain *) + Async.Rpc.Rpc.implement Rpc.Get_ledger_hashes_chain.V1.t + (fun () query -> Async.return @@ get_ledger_hashes_chain t query) + ; (* Get_diffs_chain *) + Async.Rpc.Rpc.implement Rpc.Get_diffs_chain.V1.t + (fun () { source; target } -> + let chain = get_ledger_hashes_chain t { source; target } in + Async.return + @@ List.map chain ~f:(fun ledger_hash -> + Db.get_diff ~ledger_hash t.db + |> Option.value_exn ~here:[%here] ~message:"Diff not found" ) ) ] -let create_server ~nodes_to_sync ~port ~logger ~db_dir ~signer_sk () = +let create_server ~sync_arg ~port ~logger ~db_dir ~signer_sk ~no_migrations () = let open Async in let where_to_listen = Tcp.Where_to_listen.bind_to All_addresses (On_port port) in + let%bind db_existed = Sys.file_exists_exn db_dir in let t = { db = Db.create db_dir ; signer = @@ -380,17 +341,21 @@ let create_server ~nodes_to_sync ~port ~logger ~db_dir ~signer_sk () = } in + (* Set the migration to the latest migration if the database didn't exist *) + if not db_existed then + Db.set_migration t.db ~migration:Migrations.latest_migration ; + + if not no_migrations then Migrations.run_migrations ~logger t.db ; + let%bind () = - Deferred.List.iter ~how:`Sequential nodes_to_sync ~f:(fun n -> - match%bind sync ~logger ~node_location:n t with + match sync_arg with + | None -> + return () + | Some (node_location, ledger_hash) -> ( + match%bind sync t ~node_location ~ledger_hash with | Ok () -> return () | Error e -> - [%log error] "Exception while syncing the node $node: $error" - ~metadata: - [ ("error", `String (Error.to_string_hum e)) - ; ("node", `String n.name) - ] ; failwith (Error.to_string_hum e) ) in diff --git a/src/app/zeko/da_layer/lib/rpc.ml b/src/app/zeko/da_layer/lib/rpc.ml index 9023fb32d1..1922ce8890 100644 --- a/src/app/zeko/da_layer/lib/rpc.ml +++ b/src/app/zeko/da_layer/lib/rpc.ml @@ -66,6 +66,8 @@ module Get_diff_source = struct end end +(* val get_staged_ledger_aux_and_pending_coinbases_at_hash : Ledger_hash.t -> Staged_ledger.Staged_ledger_aux_and_pending_coinbases.t option *) + (* val get_signer_public_key : unit -> Public_key.Compressed.t *) module Get_signer_public_key = struct module V1 = struct @@ -88,3 +90,45 @@ module Get_signature = struct ~bin_query:Ledger_hash.Stable.V1.bin_t ~bin_response:Response.bin_t end end + +(* val get_ledger_hashes_chain : source:Ledger_hash.t option -> target:Ledger_hash.t -> Ledger_hash.t list *) +module Get_ledger_hashes_chain = struct + module V1 = struct + module Query = struct + type t = + { source : [ `Genesis | `Specific of Ledger_hash.Stable.V1.t ] + ; target : Ledger_hash.Stable.V1.t + } + [@@deriving bin_io_unversioned] + end + + module Response = struct + type t = Ledger_hash.Stable.V1.t list [@@deriving bin_io_unversioned] + end + + let t : (Query.t, Response.t) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Get_ledger_hashes_chain" ~version:1 + ~bin_query:Query.bin_t ~bin_response:Response.bin_t + end +end + +(* val get_diffs_chain : source:Ledger_hash.t option -> target:Ledger_hash.t -> Diff.t *) +module Get_diffs_chain = struct + module V1 = struct + module Query = struct + type t = + { source : [ `Genesis | `Specific of Ledger_hash.Stable.V1.t ] + ; target : Ledger_hash.Stable.V1.t + } + [@@deriving bin_io_unversioned] + end + + module Response = struct + type t = Diff.Stable.V2.t list [@@deriving bin_io_unversioned] + end + + let t : (Query.t, Response.t) Rpc.Rpc.t = + Rpc.Rpc.create ~name:"Get_diffs_chain" ~version:1 ~bin_query:Query.bin_t + ~bin_response:Response.bin_t + end +end diff --git a/src/app/zeko/kvdb_base/kvdb_base.ml b/src/app/zeko/kvdb_base/kvdb_base.ml index 19e9e875db..8a90acc5c2 100644 --- a/src/app/zeko/kvdb_base/kvdb_base.ml +++ b/src/app/zeko/kvdb_base/kvdb_base.ml @@ -14,12 +14,16 @@ end) = struct include Mina_ledger.Ledger.Kvdb + let set_raw = set + let set t pair_type ~key ~data = - set t + set_raw t ~key:(Key_value.serialize_key pair_type key) ~data:(Key_value.serialize_value pair_type data) + let get_raw = get + let get t pair_type ~key = - get t ~key:(Key_value.serialize_key pair_type key) + get_raw t ~key:(Key_value.serialize_key pair_type key) |> Option.map ~f:(Key_value.deserialize_value pair_type) end diff --git a/src/app/zeko/sequencer/archive_relay/dune b/src/app/zeko/sequencer/archive_relay/dune index ba9ae85357..2bd53d550c 100644 --- a/src/app/zeko/sequencer/archive_relay/dune +++ b/src/app/zeko/sequencer/archive_relay/dune @@ -12,5 +12,5 @@ websocket-async async_kernel) (preprocess - (pps ppx_jane)) + (pps ppx_jane ppx_mina)) (modules run)) diff --git a/src/app/zeko/sequencer/archive_relay/run.ml b/src/app/zeko/sequencer/archive_relay/run.ml index 962be3db66..1cff078f94 100644 --- a/src/app/zeko/sequencer/archive_relay/run.ml +++ b/src/app/zeko/sequencer/archive_relay/run.ml @@ -3,10 +3,18 @@ open Core_kernel open Mina_base open Mina_lib open Mina_ledger -module Ws = Websocket.Make (Cohttp_async.Io) let constraint_constants = Genesis_constants.Constraint_constants.compiled +let rec rmrf path = + match Sys.is_directory path with + | true -> + Sys.readdir path + |> Array.iter ~f:(fun name -> rmrf (Filename.concat path name)) ; + Sys.rmdir path + | false -> + Sys.remove path + let compile_time_genesis_state = let genesis_constants = Genesis_constants.compiled in let consensus_constants = @@ -22,39 +30,32 @@ let compile_time_genesis_state = in compile_time_genesis.data -let time label (d : 'a Deferred.t) = +let time ~logger label (d : 'a Deferred.t) = let start = Time.now () in let%bind x = d in let stop = Time.now () in - printf "%s: %s\n%!" label (Time.Span.to_string_hum @@ Time.diff stop start) ; + [%log info] "%s: %s\n%!" label + (Time.Span.to_string_hum @@ Time.diff stop start) ; return x -module Cache = struct - type t = Da_layer.Diff.t Ledger_hash.Map.t - - let empty = Ledger_hash.Map.empty - - let get (t : t) hash = Ledger_hash.Map.find t hash - - let add (t : t) hash diff = Ledger_hash.Map.set t ~key:hash ~data:diff -end - module State = struct type t = { logger : Logger.t ; archive_uri : Host_and_port.t Cli_lib.Flag.Types.with_name ; zeko_uri : Uri.t ; da_config : Da_layer.Client.Config.t - ; mutable cache : Cache.t + ; mutable ledger_cache : Ledger.Db.t ; mutable already_relayed_hashes : Ledger_hash.Set.t } - let create ~logger ~archive_uri ~zeko_uri ~da_nodes = + let create ~logger ~archive_uri ~zeko_uri ~da_nodes ~ledger_cache = { logger ; archive_uri ; zeko_uri ; da_config = Da_layer.Client.Config.of_string_list da_nodes - ; cache = Cache.empty + ; ledger_cache = + Ledger.Db.create ~directory_name:ledger_cache + ~depth:constraint_constants.ledger_depth () ; already_relayed_hashes = Ledger_hash.Set.empty } @@ -62,76 +63,26 @@ module State = struct t.already_relayed_hashes <- Set.add t.already_relayed_hashes hash let has_been_relayed t hash = Set.mem t.already_relayed_hashes hash -end - -let fetch_diff ~(state : State.t) hash = - match Cache.get state.cache hash with - | Some diff -> - return diff - | None -> ( - match%bind - Da_layer.Client.get_diff ~logger:state.logger ~config:state.da_config - ~ledger_hash:hash - with - | Ok diff -> - state.cache <- Cache.add state.cache hash diff ; - return diff - | Error e -> - raise (Error.to_exn e) ) - -module Graphql_ws = struct - type client_message = - | Gql_connection_init - | Gql_start of - { id : int - ; query : string - ; variables : Yojson.Safe.t - ; operation_name : string option - } - type server_message = - | Gql_connection_ack - | Gql_data of { data : Yojson.Safe.t } - | Gql_unknown of string - - let client_message_to_string = - Fn.compose Yojson.Safe.to_string (function - | Gql_connection_init -> - `Assoc [ ("type", `String "connection_init"); ("payload", `Assoc []) ] - | Gql_start { id; query; variables; operation_name } -> - `Assoc - [ ("type", `String "start") - ; ("id", `String (Int.to_string id)) - ; ( "payload" - , `Assoc - [ ("query", `String query) - ; ("variables", variables) - ; ( "operationName" - , Option.value ~default:`Null - @@ Option.map ~f:(fun s -> `String s) operation_name ) - ] ) - ] ) - - let server_message_of_string s = - let open Yojson.Safe.Util in - let json = Yojson.Safe.from_string s in - let type_ = json |> member "type" |> to_string in - match type_ with - | "connection_ack" -> - Gql_connection_ack - | "data" -> - Gql_data { data = json |> member "payload" |> member "data" } - | _ -> - Gql_unknown s + let reset_ledger_cache t () = + let directory_name = + Option.value_exn ~message:"No ledger_cache directory" + @@ Ledger.Db.get_directory t.ledger_cache + in + Ledger.Db.close t.ledger_cache ; + rmrf directory_name ; + t.ledger_cache <- + Ledger.Db.create ~directory_name ~depth:constraint_constants.ledger_depth + () end let sync_archive ~(state : State.t) ~hash = let logger = state.logger in - let%bind.Deferred.Result chain = - Da_layer.Client.get_ledger_hashes_chain ~logger ~config:state.da_config - ~depth:constraint_constants.ledger_depth ~target_ledger_hash:hash + let%bind.Deferred.Result diffs = + Da_layer.Client.get_diffs_chain ~logger ~config:state.da_config + ~source_ledger_hash:(`Specific (Ledger.Db.merkle_root state.ledger_cache)) + ~target_ledger_hash:hash in - let%bind diffs = Deferred.List.map chain ~f:(fetch_diff ~state) in Ledger.with_ledger ~depth:constraint_constants.ledger_depth ~f:(fun ledger -> let protocol_state = ref compile_time_genesis_state in Deferred.List.iter diffs ~f:(fun diff -> @@ -182,7 +133,7 @@ let sync_archive ~(state : State.t) ~hash = | Ok () -> State.add_hash state (Ledger.merkle_root ledger) ; return - @@ printf "Synced diff to archive with hash: %s\n%!" + @@ [%log info] "Synced diff to archive with hash: %s\n%!" ( Ledger_hash.to_decimal_string @@ Ledger.merkle_root ledger ) | Error e -> @@ -192,12 +143,12 @@ let sync_archive ~(state : State.t) ~hash = let fetch_current_ledger_hash ~zeko_uri () = let query = {| - query { - stateHashes { - unprovedLedgerHash - } - } - |} + query { + stateHashes { + unprovedLedgerHash + } + } + |} in let body = Yojson.Safe.to_string @@ -241,28 +192,34 @@ let fetch_current_ledger_hash ~zeko_uri () = return (Ok unproved_ledger_hash) let sync ~(state : State.t) () = + let logger = state.logger in Thread_safe.block_on_async_exn (fun () -> let%bind ledger_hash = match%bind fetch_current_ledger_hash ~zeko_uri:state.zeko_uri () with | Ok hash -> - printf "Fetched ledger hash: %s\n%!" + [%log info] "Fetched ledger hash: %s\n%!" (Ledger_hash.to_decimal_string hash) ; return hash | Error e -> failwith e in - time "Synced" (sync_archive ~state ~hash:ledger_hash) ) + time ~logger "Synced" (sync_archive ~state ~hash:ledger_hash) ) let rec run ~(state : State.t) ~sync_period () = + let logger = state.logger in let () = match sync ~state () with | Ok () -> - () + (* wait *) + Thread_safe.block_on_async_exn (fun () -> + after (Time.Span.of_sec sync_period) ) | Error e -> - print_endline (Error.to_string_hum e) + (* ledger_hash_invalidated *) + [%log error] "Error syncing: %s\n%!" (Error.to_string_hum e) ; + [%log warn] "Invalidating ledger cache" ; + State.reset_ledger_cache state () in - Thread_safe.block_on_async_exn (fun () -> - after (Time.Span.of_sec sync_period) ) ; + (* go again *) run ~state ~sync_period () let () = @@ -279,6 +236,10 @@ let () = flag "--sync-period" (optional_with_default 60. float) ~doc:"Sync period" + and ledger_cache = + flag "--ledger-cache" + (optional_with_default "ledger_cache" string) + ~doc:"Ledger cache" in let logger = Logger.create () in let zeko_uri = Uri.of_string zeko_uri in @@ -289,5 +250,7 @@ let () = } in - let state = State.create ~logger ~archive_uri ~zeko_uri ~da_nodes in + let state = + State.create ~logger ~archive_uri ~zeko_uri ~da_nodes ~ledger_cache + in run ~state ~sync_period ) diff --git a/src/app/zeko/sequencer/lib/zeko_sequencer.ml b/src/app/zeko/sequencer/lib/zeko_sequencer.ml index abc9d36a94..56554eb65a 100644 --- a/src/app/zeko/sequencer/lib/zeko_sequencer.ml +++ b/src/app/zeko/sequencer/lib/zeko_sequencer.ml @@ -787,19 +787,13 @@ module Sequencer = struct printf "Init root: %s\n%!" Ledger_hash.(to_decimal_string (get_root t)) ; (* apply diffs from DA layer *) - let%bind ledger_hashes_chain = - Da_layer.Client.get_ledger_hashes_chain ~logger ~config:da_config - ~depth:constraint_constants.ledger_depth - ~target_ledger_hash:committed_ledger_hash + let%bind diffs = + Da_layer.Client.get_diffs_chain ~logger ~config:da_config + ~source_ledger_hash:`Genesis ~target_ledger_hash:committed_ledger_hash |> Deferred.map ~f:Or_error.ok_exn in let%bind () = - Deferred.List.iter ~how:`Sequential ledger_hashes_chain - ~f:(fun ledger_hash -> - let%bind diff : Da_layer.Diff.t Deferred.t = - Da_layer.Client.get_diff ~logger ~config:da_config ~ledger_hash - >>| Or_error.ok_exn - in + Deferred.List.iter ~how:`Sequential diffs ~f:(fun diff -> assert ( Ledger_hash.equal (Da_layer.Diff.Stable.Latest.source_ledger_hash diff) @@ -1122,6 +1116,7 @@ let%test_module "Sequencer tests" = } let%test_unit "apply commands and commit" = + print_endline "Started test 'apply commands and commit'" ; Quickcheck.test ~trials:1 (Sequencer_test_spec.gen ()) ~f:(fun { zkapp_keypair; signer; ephemeral_ledger; specs; sequencer } -> let batch1, batch2 = List.split_n specs 3 in @@ -1345,6 +1340,7 @@ let%test_module "Sequencer tests" = final_ledger_hash ) ) let%test_unit "dummy signature should fail" = + print_endline "Started test 'dummy signature should fail'" ; Quickcheck.test ~trials:1 (Sequencer_test_spec.gen ()) ~f:(fun { zkapp_keypair; signer; ephemeral_ledger; specs; sequencer } -> let dummy_signature_command : Zkapp_command.t = @@ -1381,6 +1377,7 @@ let%test_module "Sequencer tests" = Error.raise unexpected_error ) let%test_unit "deposits" = + print_endline "Started test 'deposits'" ; Quickcheck.test ~trials:1 (Sequencer_test_spec.gen ~delay_deposit:2 ()) ~f:(fun { zkapp_keypair; signer; ephemeral_ledger; specs; sequencer } -> (* Create l1 accounts *)