From 6f8f4f111a42125d6ea7a5e52d8d0347974e61eb Mon Sep 17 00:00:00 2001 From: Martin Ondejka Date: Thu, 28 Nov 2024 17:39:06 +0100 Subject: [PATCH] Add ledger caching to the archive relay syncing --- src/app/zeko/da_layer/lib/node.ml | 95 ++++++++++----------- src/app/zeko/da_layer/lib/rpc.ml | 2 +- src/app/zeko/sequencer/archive_relay/dune | 2 +- src/app/zeko/sequencer/archive_relay/run.ml | 89 ++++++++++--------- 4 files changed, 97 insertions(+), 91 deletions(-) diff --git a/src/app/zeko/da_layer/lib/node.ml b/src/app/zeko/da_layer/lib/node.ml index a7684c8266..f0ba60ef28 100644 --- a/src/app/zeko/da_layer/lib/node.ml +++ b/src/app/zeko/da_layer/lib/node.ml @@ -326,6 +326,44 @@ let get_signature t ~ledger_hash = let message = Random_oracle.Input.Chunked.field_elements [| ledger_hash |] in Some (Schnorr.Chunked.sign t.signer.private_key message) +let get_ledger_hashes_chain t + ({ source = source_opt; target } : Rpc.Get_ledger_hashes_chain.V1.Query.t) = + let source = + match source_opt with + | `Genesis -> + Diff.empty_ledger_hash ~depth:constraint_constants.ledger_depth + | `Specific source -> + source + in + let rec go current = + if Ledger_hash.equal current source then [] + else + let source = + Db.get_diff ~ledger_hash:current t.db + |> Option.value_exn ~here:[%here] + ~message:"Get_ledger_hashes_chain: diff not found" + |> Diff.Stable.V2.source_ledger_hash + in + current :: go source + in + let chain = List.rev (go target) in + match (source_opt, chain) with + | _, [] -> + failwithf + "Get_ledger_hashes_chain: no diffs found for source ledger hash %s and \ + target ledger hash %s" + (Ledger_hash.to_decimal_string source) + (Ledger_hash.to_decimal_string target) + () + | `Specific wanted_source, first_source :: _ + when not (Ledger_hash.equal wanted_source first_source) -> + failwithf + "Get_ledger_hashes_chain: source ledger hash %s is not in the chain" + (Ledger_hash.to_decimal_string source) + () + | _ -> + chain + let implementations t = Async.Rpc.Implementations.create_exn ~on_unknown_rpc:`Raise ~implementations: @@ -369,58 +407,15 @@ let implementations t = Async.return @@ get_signature t ~ledger_hash:query ) ; (* Get_ledger_hashes_chain *) Async.Rpc.Rpc.implement Rpc.Get_ledger_hashes_chain.V1.t - (fun () { source; target } -> - let source = - Option.value - ~default: - (Diff.empty_ledger_hash - ~depth:constraint_constants.ledger_depth ) - source - in - let rec go current = - if Ledger_hash.equal current source then [] - else - let source = - Db.get_diff ~ledger_hash:current t.db - |> Option.value_exn ~here:[%here] ~message:"Diff not found" - |> Diff.Stable.V2.source_ledger_hash - in - current :: go source - in - Async.return @@ List.rev (go target) ) + (fun () query -> Async.return @@ get_ledger_hashes_chain t query) ; (* Get_diffs_chain *) Async.Rpc.Rpc.implement Rpc.Get_diffs_chain.V1.t - (fun () { source = source_opt; target } -> - let source = - match source_opt with - | `Genesis -> - Diff.empty_ledger_hash - ~depth:constraint_constants.ledger_depth - | `Specific source -> - source - in - let rec go current_ledger_hash = - if Ledger_hash.equal current_ledger_hash source then [] - else - let current = - Db.get_diff ~ledger_hash:current_ledger_hash t.db - |> Option.value_exn ~here:[%here] ~message:"Diff not found" - in - current :: go (Diff.Stable.V2.source_ledger_hash current) - in - let chain = List.rev (go target) in - match (source_opt, chain) with - | _, [] -> - failwith "No diffs found" - | ( `Specific wanted_source - , { source_ledger_hash = first_source; _ } :: _ ) - when not (Ledger_hash.equal wanted_source first_source) -> - failwithf - "Get_diffs_chain: source ledger hash %s is not in the chain" - (Ledger_hash.to_decimal_string source) - () - | _ -> - Async.return chain ) + (fun () { source; target } -> + let chain = get_ledger_hashes_chain t { source; target } in + Async.return + @@ List.map chain ~f:(fun ledger_hash -> + Db.get_diff ~ledger_hash t.db + |> Option.value_exn ~here:[%here] ~message:"Diff not found" ) ) ] let create_server ~nodes_to_sync ~port ~logger ~db_dir ~signer_sk () = diff --git a/src/app/zeko/da_layer/lib/rpc.ml b/src/app/zeko/da_layer/lib/rpc.ml index b7304ef1b6..1922ce8890 100644 --- a/src/app/zeko/da_layer/lib/rpc.ml +++ b/src/app/zeko/da_layer/lib/rpc.ml @@ -96,7 +96,7 @@ module Get_ledger_hashes_chain = struct module V1 = struct module Query = struct type t = - { source : Ledger_hash.Stable.V1.t option + { source : [ `Genesis | `Specific of Ledger_hash.Stable.V1.t ] ; target : Ledger_hash.Stable.V1.t } [@@deriving bin_io_unversioned] diff --git a/src/app/zeko/sequencer/archive_relay/dune b/src/app/zeko/sequencer/archive_relay/dune index ba9ae85357..2bd53d550c 100644 --- a/src/app/zeko/sequencer/archive_relay/dune +++ b/src/app/zeko/sequencer/archive_relay/dune @@ -12,5 +12,5 @@ websocket-async async_kernel) (preprocess - (pps ppx_jane)) + (pps ppx_jane ppx_mina)) (modules run)) diff --git a/src/app/zeko/sequencer/archive_relay/run.ml b/src/app/zeko/sequencer/archive_relay/run.ml index 300b708ba9..98b1160d51 100644 --- a/src/app/zeko/sequencer/archive_relay/run.ml +++ b/src/app/zeko/sequencer/archive_relay/run.ml @@ -7,6 +7,15 @@ module Ws = Websocket.Make (Cohttp_async.Io) let constraint_constants = Genesis_constants.Constraint_constants.compiled +let rec rmrf path = + match Sys.is_directory path with + | true -> + Sys.readdir path + |> Array.iter ~f:(fun name -> rmrf (Filename.concat path name)) ; + Sys.rmdir path + | false -> + Sys.remove path + let compile_time_genesis_state = let genesis_constants = Genesis_constants.compiled in let consensus_constants = @@ -22,39 +31,32 @@ let compile_time_genesis_state = in compile_time_genesis.data -let time label (d : 'a Deferred.t) = +let time ~logger label (d : 'a Deferred.t) = let start = Time.now () in let%bind x = d in let stop = Time.now () in - printf "%s: %s\n%!" label (Time.Span.to_string_hum @@ Time.diff stop start) ; + [%log info] "%s: %s\n%!" label + (Time.Span.to_string_hum @@ Time.diff stop start) ; return x -module Cache = struct - type t = Da_layer.Diff.t Ledger_hash.Map.t - - let empty = Ledger_hash.Map.empty - - let get (t : t) hash = Ledger_hash.Map.find t hash - - let add (t : t) hash diff = Ledger_hash.Map.set t ~key:hash ~data:diff -end - module State = struct type t = { logger : Logger.t ; archive_uri : Host_and_port.t Cli_lib.Flag.Types.with_name ; zeko_uri : Uri.t ; da_config : Da_layer.Client.Config.t - ; mutable cache : Cache.t + ; mutable ledger_cache : Ledger.Db.t ; mutable already_relayed_hashes : Ledger_hash.Set.t } - let create ~logger ~archive_uri ~zeko_uri ~da_nodes = + let create ~logger ~archive_uri ~zeko_uri ~da_nodes ~ledger_cache = { logger ; archive_uri ; zeko_uri ; da_config = Da_layer.Client.Config.of_string_list da_nodes - ; cache = Cache.empty + ; ledger_cache = + Ledger.Db.create ~directory_name:ledger_cache + ~depth:constraint_constants.ledger_depth () ; already_relayed_hashes = Ledger_hash.Set.empty } @@ -62,22 +64,18 @@ module State = struct t.already_relayed_hashes <- Set.add t.already_relayed_hashes hash let has_been_relayed t hash = Set.mem t.already_relayed_hashes hash -end -let fetch_diff ~(state : State.t) hash = - match Cache.get state.cache hash with - | Some diff -> - return diff - | None -> ( - match%bind - Da_layer.Client.get_diff ~logger:state.logger ~config:state.da_config - ~ledger_hash:hash - with - | Ok diff -> - state.cache <- Cache.add state.cache hash diff ; - return diff - | Error e -> - raise (Error.to_exn e) ) + let reset_ledger_cache t () = + let directory_name = + Option.value_exn ~message:"No ledger_cache directory" + @@ Ledger.Db.get_directory t.ledger_cache + in + Ledger.Db.close t.ledger_cache ; + rmrf directory_name ; + t.ledger_cache <- + Ledger.Db.create ~directory_name ~depth:constraint_constants.ledger_depth + () +end module Graphql_ws = struct type client_message = @@ -129,7 +127,8 @@ let sync_archive ~(state : State.t) ~hash = let logger = state.logger in let%bind.Deferred.Result diffs = Da_layer.Client.get_diffs_chain ~logger ~config:state.da_config - ~source_ledger_hash:`Genesis ~target_ledger_hash:hash + ~source_ledger_hash:(`Specific (Ledger.Db.merkle_root state.ledger_cache)) + ~target_ledger_hash:hash in Ledger.with_ledger ~depth:constraint_constants.ledger_depth ~f:(fun ledger -> let protocol_state = ref compile_time_genesis_state in @@ -181,7 +180,7 @@ let sync_archive ~(state : State.t) ~hash = | Ok () -> State.add_hash state (Ledger.merkle_root ledger) ; return - @@ printf "Synced diff to archive with hash: %s\n%!" + @@ [%log info] "Synced diff to archive with hash: %s\n%!" ( Ledger_hash.to_decimal_string @@ Ledger.merkle_root ledger ) | Error e -> @@ -240,28 +239,34 @@ let fetch_current_ledger_hash ~zeko_uri () = return (Ok unproved_ledger_hash) let sync ~(state : State.t) () = + let logger = state.logger in Thread_safe.block_on_async_exn (fun () -> let%bind ledger_hash = match%bind fetch_current_ledger_hash ~zeko_uri:state.zeko_uri () with | Ok hash -> - printf "Fetched ledger hash: %s\n%!" + [%log info] "Fetched ledger hash: %s\n%!" (Ledger_hash.to_decimal_string hash) ; return hash | Error e -> failwith e in - time "Synced" (sync_archive ~state ~hash:ledger_hash) ) + time ~logger "Synced" (sync_archive ~state ~hash:ledger_hash) ) let rec run ~(state : State.t) ~sync_period () = + let logger = state.logger in let () = match sync ~state () with | Ok () -> - () + (* wait *) + Thread_safe.block_on_async_exn (fun () -> + after (Time.Span.of_sec sync_period) ) | Error e -> - print_endline (Error.to_string_hum e) + (* ledger_hash_invalidated *) + [%log error] "Error syncing: %s\n%!" (Error.to_string_hum e) ; + [%log warn] "Invalidating ledger cache" ; + State.reset_ledger_cache state () in - Thread_safe.block_on_async_exn (fun () -> - after (Time.Span.of_sec sync_period) ) ; + (* go again *) run ~state ~sync_period () let () = @@ -278,6 +283,10 @@ let () = flag "--sync-period" (optional_with_default 60. float) ~doc:"Sync period" + and ledger_cache = + flag "--ledger-cache" + (optional_with_default "ledger_cache" string) + ~doc:"Ledger cache" in let logger = Logger.create () in let zeko_uri = Uri.of_string zeko_uri in @@ -288,5 +297,7 @@ let () = } in - let state = State.create ~logger ~archive_uri ~zeko_uri ~da_nodes in + let state = + State.create ~logger ~archive_uri ~zeko_uri ~da_nodes ~ledger_cache + in run ~state ~sync_period )