Skip to content

Commit

Permalink
Add ledger caching to the archive relay syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinOndejka committed Dec 2, 2024
1 parent 4953faf commit 6f8f4f1
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 91 deletions.
95 changes: 45 additions & 50 deletions src/app/zeko/da_layer/lib/node.ml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,44 @@ let get_signature t ~ledger_hash =
let message = Random_oracle.Input.Chunked.field_elements [| ledger_hash |] in
Some (Schnorr.Chunked.sign t.signer.private_key message)

let get_ledger_hashes_chain t
({ source = source_opt; target } : Rpc.Get_ledger_hashes_chain.V1.Query.t) =
let source =
match source_opt with
| `Genesis ->
Diff.empty_ledger_hash ~depth:constraint_constants.ledger_depth
| `Specific source ->
source
in
let rec go current =
if Ledger_hash.equal current source then []
else
let source =
Db.get_diff ~ledger_hash:current t.db
|> Option.value_exn ~here:[%here]
~message:"Get_ledger_hashes_chain: diff not found"
|> Diff.Stable.V2.source_ledger_hash
in
current :: go source
in
let chain = List.rev (go target) in
match (source_opt, chain) with
| _, [] ->
failwithf
"Get_ledger_hashes_chain: no diffs found for source ledger hash %s and \
target ledger hash %s"
(Ledger_hash.to_decimal_string source)
(Ledger_hash.to_decimal_string target)
()
| `Specific wanted_source, first_source :: _
when not (Ledger_hash.equal wanted_source first_source) ->
failwithf
"Get_ledger_hashes_chain: source ledger hash %s is not in the chain"
(Ledger_hash.to_decimal_string source)
()
| _ ->
chain

let implementations t =
Async.Rpc.Implementations.create_exn ~on_unknown_rpc:`Raise
~implementations:
Expand Down Expand Up @@ -369,58 +407,15 @@ let implementations t =
Async.return @@ get_signature t ~ledger_hash:query )
; (* Get_ledger_hashes_chain *)
Async.Rpc.Rpc.implement Rpc.Get_ledger_hashes_chain.V1.t
(fun () { source; target } ->
let source =
Option.value
~default:
(Diff.empty_ledger_hash
~depth:constraint_constants.ledger_depth )
source
in
let rec go current =
if Ledger_hash.equal current source then []
else
let source =
Db.get_diff ~ledger_hash:current t.db
|> Option.value_exn ~here:[%here] ~message:"Diff not found"
|> Diff.Stable.V2.source_ledger_hash
in
current :: go source
in
Async.return @@ List.rev (go target) )
(fun () query -> Async.return @@ get_ledger_hashes_chain t query)
; (* Get_diffs_chain *)
Async.Rpc.Rpc.implement Rpc.Get_diffs_chain.V1.t
(fun () { source = source_opt; target } ->
let source =
match source_opt with
| `Genesis ->
Diff.empty_ledger_hash
~depth:constraint_constants.ledger_depth
| `Specific source ->
source
in
let rec go current_ledger_hash =
if Ledger_hash.equal current_ledger_hash source then []
else
let current =
Db.get_diff ~ledger_hash:current_ledger_hash t.db
|> Option.value_exn ~here:[%here] ~message:"Diff not found"
in
current :: go (Diff.Stable.V2.source_ledger_hash current)
in
let chain = List.rev (go target) in
match (source_opt, chain) with
| _, [] ->
failwith "No diffs found"
| ( `Specific wanted_source
, { source_ledger_hash = first_source; _ } :: _ )
when not (Ledger_hash.equal wanted_source first_source) ->
failwithf
"Get_diffs_chain: source ledger hash %s is not in the chain"
(Ledger_hash.to_decimal_string source)
()
| _ ->
Async.return chain )
(fun () { source; target } ->
let chain = get_ledger_hashes_chain t { source; target } in
Async.return
@@ List.map chain ~f:(fun ledger_hash ->
Db.get_diff ~ledger_hash t.db
|> Option.value_exn ~here:[%here] ~message:"Diff not found" ) )
]

let create_server ~nodes_to_sync ~port ~logger ~db_dir ~signer_sk () =
Expand Down
2 changes: 1 addition & 1 deletion src/app/zeko/da_layer/lib/rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ module Get_ledger_hashes_chain = struct
module V1 = struct
module Query = struct
type t =
{ source : Ledger_hash.Stable.V1.t option
{ source : [ `Genesis | `Specific of Ledger_hash.Stable.V1.t ]
; target : Ledger_hash.Stable.V1.t
}
[@@deriving bin_io_unversioned]
Expand Down
2 changes: 1 addition & 1 deletion src/app/zeko/sequencer/archive_relay/dune
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
websocket-async
async_kernel)
(preprocess
(pps ppx_jane))
(pps ppx_jane ppx_mina))
(modules run))
89 changes: 50 additions & 39 deletions src/app/zeko/sequencer/archive_relay/run.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ module Ws = Websocket.Make (Cohttp_async.Io)

let constraint_constants = Genesis_constants.Constraint_constants.compiled

let rec rmrf path =
match Sys.is_directory path with
| true ->
Sys.readdir path
|> Array.iter ~f:(fun name -> rmrf (Filename.concat path name)) ;
Sys.rmdir path
| false ->
Sys.remove path

let compile_time_genesis_state =
let genesis_constants = Genesis_constants.compiled in
let consensus_constants =
Expand All @@ -22,62 +31,51 @@ let compile_time_genesis_state =
in
compile_time_genesis.data

let time label (d : 'a Deferred.t) =
let time ~logger label (d : 'a Deferred.t) =
let start = Time.now () in
let%bind x = d in
let stop = Time.now () in
printf "%s: %s\n%!" label (Time.Span.to_string_hum @@ Time.diff stop start) ;
[%log info] "%s: %s\n%!" label
(Time.Span.to_string_hum @@ Time.diff stop start) ;
return x

module Cache = struct
type t = Da_layer.Diff.t Ledger_hash.Map.t

let empty = Ledger_hash.Map.empty

let get (t : t) hash = Ledger_hash.Map.find t hash

let add (t : t) hash diff = Ledger_hash.Map.set t ~key:hash ~data:diff
end

module State = struct
type t =
{ logger : Logger.t
; archive_uri : Host_and_port.t Cli_lib.Flag.Types.with_name
; zeko_uri : Uri.t
; da_config : Da_layer.Client.Config.t
; mutable cache : Cache.t
; mutable ledger_cache : Ledger.Db.t
; mutable already_relayed_hashes : Ledger_hash.Set.t
}

let create ~logger ~archive_uri ~zeko_uri ~da_nodes =
let create ~logger ~archive_uri ~zeko_uri ~da_nodes ~ledger_cache =
{ logger
; archive_uri
; zeko_uri
; da_config = Da_layer.Client.Config.of_string_list da_nodes
; cache = Cache.empty
; ledger_cache =
Ledger.Db.create ~directory_name:ledger_cache
~depth:constraint_constants.ledger_depth ()
; already_relayed_hashes = Ledger_hash.Set.empty
}

let add_hash t hash =
t.already_relayed_hashes <- Set.add t.already_relayed_hashes hash

let has_been_relayed t hash = Set.mem t.already_relayed_hashes hash
end

let fetch_diff ~(state : State.t) hash =
match Cache.get state.cache hash with
| Some diff ->
return diff
| None -> (
match%bind
Da_layer.Client.get_diff ~logger:state.logger ~config:state.da_config
~ledger_hash:hash
with
| Ok diff ->
state.cache <- Cache.add state.cache hash diff ;
return diff
| Error e ->
raise (Error.to_exn e) )
let reset_ledger_cache t () =
let directory_name =
Option.value_exn ~message:"No ledger_cache directory"
@@ Ledger.Db.get_directory t.ledger_cache
in
Ledger.Db.close t.ledger_cache ;
rmrf directory_name ;
t.ledger_cache <-
Ledger.Db.create ~directory_name ~depth:constraint_constants.ledger_depth
()
end

module Graphql_ws = struct
type client_message =
Expand Down Expand Up @@ -129,7 +127,8 @@ let sync_archive ~(state : State.t) ~hash =
let logger = state.logger in
let%bind.Deferred.Result diffs =
Da_layer.Client.get_diffs_chain ~logger ~config:state.da_config
~source_ledger_hash:`Genesis ~target_ledger_hash:hash
~source_ledger_hash:(`Specific (Ledger.Db.merkle_root state.ledger_cache))
~target_ledger_hash:hash
in
Ledger.with_ledger ~depth:constraint_constants.ledger_depth ~f:(fun ledger ->
let protocol_state = ref compile_time_genesis_state in
Expand Down Expand Up @@ -181,7 +180,7 @@ let sync_archive ~(state : State.t) ~hash =
| Ok () ->
State.add_hash state (Ledger.merkle_root ledger) ;
return
@@ printf "Synced diff to archive with hash: %s\n%!"
@@ [%log info] "Synced diff to archive with hash: %s\n%!"
( Ledger_hash.to_decimal_string
@@ Ledger.merkle_root ledger )
| Error e ->
Expand Down Expand Up @@ -240,28 +239,34 @@ let fetch_current_ledger_hash ~zeko_uri () =
return (Ok unproved_ledger_hash)

let sync ~(state : State.t) () =
let logger = state.logger in
Thread_safe.block_on_async_exn (fun () ->
let%bind ledger_hash =
match%bind fetch_current_ledger_hash ~zeko_uri:state.zeko_uri () with
| Ok hash ->
printf "Fetched ledger hash: %s\n%!"
[%log info] "Fetched ledger hash: %s\n%!"
(Ledger_hash.to_decimal_string hash) ;
return hash
| Error e ->
failwith e
in
time "Synced" (sync_archive ~state ~hash:ledger_hash) )
time ~logger "Synced" (sync_archive ~state ~hash:ledger_hash) )

let rec run ~(state : State.t) ~sync_period () =
let logger = state.logger in
let () =
match sync ~state () with
| Ok () ->
()
(* wait *)
Thread_safe.block_on_async_exn (fun () ->
after (Time.Span.of_sec sync_period) )
| Error e ->
print_endline (Error.to_string_hum e)
(* ledger_hash_invalidated *)
[%log error] "Error syncing: %s\n%!" (Error.to_string_hum e) ;
[%log warn] "Invalidating ledger cache" ;
State.reset_ledger_cache state ()
in
Thread_safe.block_on_async_exn (fun () ->
after (Time.Span.of_sec sync_period) ) ;
(* go again *)
run ~state ~sync_period ()

let () =
Expand All @@ -278,6 +283,10 @@ let () =
flag "--sync-period"
(optional_with_default 60. float)
~doc:"Sync period"
and ledger_cache =
flag "--ledger-cache"
(optional_with_default "ledger_cache" string)
~doc:"Ledger cache"
in
let logger = Logger.create () in
let zeko_uri = Uri.of_string zeko_uri in
Expand All @@ -288,5 +297,7 @@ let () =
}
in

let state = State.create ~logger ~archive_uri ~zeko_uri ~da_nodes in
let state =
State.create ~logger ~archive_uri ~zeko_uri ~da_nodes ~ledger_cache
in
run ~state ~sync_period )

0 comments on commit 6f8f4f1

Please sign in to comment.