Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunking to da sync #239

Open
wants to merge 10 commits into
base: compatible
Choose a base branch
from
2 changes: 2 additions & 0 deletions nix/ocaml.nix
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ let
src/app/zeko/sequencer/cli.exe \
src/app/zeko/sequencer/archive_relay/run.exe \
src/app/zeko/sequencer/tests/testing_ledger/run.exe \
src/app/zeko/sequencer/prover/cli.exe \
src/app/zeko/da_layer/cli.exe \
src/app/logproc/logproc.exe \
src/app/cli/src/mina.exe \
Expand Down Expand Up @@ -268,6 +269,7 @@ let
cp src/app/zeko/sequencer/deploy.exe $zeko/bin/zeko-deploy
cp src/app/zeko/sequencer/cli.exe $zeko/bin/zeko-cli
cp src/app/zeko/sequencer/tests/testing_ledger/run.exe $localnet/bin/mina-localnet
cp src/app/zeko/sequencer/prover/cli.exe $zeko/bin/zeko-prover
cp src/app/zeko/da_layer/cli.exe $zeko_da/bin/zeko-da
cp src/app/zeko/sequencer/archive_relay/run.exe $zeko_archive_relay/bin/zeko-archive-relay
cp -R _doc/_html $out/share/doc/html
Expand Down
59 changes: 58 additions & 1 deletion src/app/zeko/da_layer/cli.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
open Core
open Async
open Signature_lib
open Mina_base
open Mina_ledger

let run_node =
( "run-node"
Expand Down Expand Up @@ -54,4 +56,59 @@ let run_node =
~metadata:[ ("port", `Int port) ] ;
Async.never () ) )

let () = Command.group ~summary:"DA layer CLI" [ run_node ] |> Command_unix.run
let sync_node =
( "sync-node"
, Command.async ~summary:"Sync node"
(let%map_open.Command synced_node =
flag "--synced-node" (required string)
~doc:"string Node that is providing data"
and unsynced_node =
flag "--unsynced-node" (required string)
~doc:"string Node that needs to be synced"
and hash_to_sync =
flag "--hash-to-sync" (required string)
~doc:"string Hash to sync with in decimal string form"
in
fun () ->
let logger = Logger.create () in
[%log info] "Fetching intervals" ;
let ledger =
Ledger.create_ephemeral
~depth:Da_layer.Node.constraint_constants.ledger_depth ()
in
let synced_node =
Cli_lib.Flag.Types.
{ value = Host_and_port.of_string synced_node
; name = sprintf "synced_node"
}
in
let unsynced_node =
Cli_lib.Flag.Types.
{ value = Host_and_port.of_string unsynced_node
; name = sprintf "unsynced_node"
}
in
Da_layer.Client.map_diffs ~logger
~depth:Da_layer.Node.constraint_constants.ledger_depth
~config:(Da_layer.Client.Config.of_node_locations [ synced_node ])
~source_ledger_hash:`Genesis
~target_ledger_hash:(Ledger_hash.of_decimal_string hash_to_sync)
~print_progress:true
~f:(fun diff ->
let diff = Da_layer.Diff.drop_time diff in
let ledger_openings = Da_layer.Client.get_openings ~diff ~ledger in
match%bind
Da_layer.Client.Rpc.post_diff ~logger
~node_location:unsynced_node ~diff ~ledger_openings
with
| Ok _signature ->
return ()
| Error e ->
[%log warn] "Error posting diff: $error"
~metadata:[ ("error", `String (Error.to_string_hum e)) ] ;
Error.raise e )
>>| Or_error.ok_exn >>| ignore ) )

let () =
Command.group ~summary:"DA layer CLI" [ run_node; sync_node ]
|> Command_unix.run
155 changes: 94 additions & 61 deletions src/app/zeko/da_layer/lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ module Rpc = struct
dispatch ~max_tries:1 ~logger node_location Rpc.Get_signature.V1.t
ledger_hash

let get_ledger_hashes_chain ~logger ~node_location ~source ~target =
let get_ledger_hashes_chain ~logger ~node_location ?max_length ~source ~target
() =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_ledger_hashes_chain.V1.t
{ source; target }
{ source; target; max_length }

let get_diffs_chain ~logger ~node_location ~source ~target =
let get_diffs_chain ~logger ~node_location ?max_length ~source ~target () =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diffs_chain.V1.t
{ source; target }
{ source; target; max_length }
end

module Config = struct
Expand All @@ -100,6 +101,8 @@ module Config = struct
} )
}

let of_node_locations nodes = { nodes }

let throw_out_node t ~(node : Host_and_port.t Cli_lib.Flag.Types.with_name) =
let open Cli_lib.Flag.Types in
t.nodes <-
Expand Down Expand Up @@ -191,17 +194,75 @@ let try_all_nodes ~config ~f =
try_first ~accum_errors:[] (Config.nodes config)

(** Get the chain of ledger hashes from [source_ledger_hash] hash to [target_ledger_hash] *)
let get_ledger_hashes_chain ~logger ~config ~source_ledger_hash
~target_ledger_hash =
let get_ledger_hashes_chain ~logger ~config ?max_length ~source_ledger_hash
~target_ledger_hash () =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_ledger_hashes_chain ~logger ~node_location
~source:source_ledger_hash ~target:target_ledger_hash )
Rpc.get_ledger_hashes_chain ~logger ~node_location ?max_length
~source:source_ledger_hash ~target:target_ledger_hash () )

(** Get the chain of diffs from [source_ledger_hash] hash to [target_ledger_hash] *)
let get_diffs_chain ~logger ~config ~source_ledger_hash ~target_ledger_hash =
let get_diffs_chain ~logger ~config ?max_length ~source_ledger_hash
~target_ledger_hash () =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_diffs_chain ~logger ~node_location ~source:source_ledger_hash
~target:target_ledger_hash )
Rpc.get_diffs_chain ~logger ~node_location ?max_length
~source:source_ledger_hash ~target:target_ledger_hash () )

(** Lazily fetch chunks of diffs, used to minimize memory usage *)
let get_lazy_diffs_chunks ~logger ~depth ~config ?(n = 100) ~source_ledger_hash
~target_ledger_hash () =
let source_ledger_hash =
match source_ledger_hash with
| `Genesis ->
Diff.empty_ledger_hash ~depth
| `Specific h ->
h
in
(* Get ledger hashes intervals of size [n] *)
let rec get_intervals ~target_ledger_hash =
let%bind.Deferred.Result chain =
get_ledger_hashes_chain ~logger ~config ~max_length:n
~source_ledger_hash:(`Specific source_ledger_hash) ~target_ledger_hash
()
in
match chain with
| [] ->
return (Ok [])
| [ last ] ->
let interval = (source_ledger_hash, last) in
return (Ok [ interval ])
| chain ->
let hd = List.hd_exn chain in
let tl = List.last_exn chain in
let interval = (hd, tl) in
let%bind.Deferred.Result next_intervals =
get_intervals ~target_ledger_hash:hd
in
return (Ok (interval :: next_intervals))
in
let%bind.Deferred.Result intervals =
get_intervals ~target_ledger_hash >>| Result.map ~f:List.rev
in
return
@@ Ok
(List.map intervals ~f:(fun (source, target) ->
lazy
(get_diffs_chain ~logger ~config
~source_ledger_hash:(`Specific source)
~target_ledger_hash:target () ) ) )

let map_diffs ~logger ~depth ~config ~source_ledger_hash ~target_ledger_hash
~print_progress ~f =
let%bind.Deferred.Result lazy_chunks =
get_lazy_diffs_chunks ~logger ~depth ~config ~source_ledger_hash
~target_ledger_hash ()
in
let l = List.length lazy_chunks in
Deferred.List.mapi ~how:`Sequential lazy_chunks ~f:(fun i lazy_chunk ->
let progress = Float.of_int i /. Float.of_int l in
if print_progress then Zeko_util.progress_bar progress ;
let%bind.Deferred.Result diffs = Lazy.force lazy_chunk in
Deferred.List.map ~how:`Sequential diffs ~f >>| Result.return )
>>| Result.all >>| Result.map ~f:List.join

(** Try to get the diff from the first node in the list, if it fails, try the next one *)
let get_diff ~logger ~config ~ledger_hash =
Expand Down Expand Up @@ -238,59 +299,31 @@ let distribute_genesis_diff ~logger ~config ~ledger =
in
distribute_diff ~logger ~config ~ledger_openings ~diff ~quorum:0

let attach_openings ~diffs ~depth =
let l = Ledger.create_ephemeral ~depth () in
List.map diffs ~f:(fun diff ->
let changed_accounts =
Diff.changed_accounts diff
|> List.sort ~compare:(fun (a, _) (b, _) -> Int.compare a b)
in
let account_ids =
List.map changed_accounts ~f:snd |> List.map ~f:Account.identifier
in
let openings = Sparse_ledger.of_ledger_subset_exn l account_ids in
List.iter changed_accounts ~f:(fun (index, account) ->
Ledger.set_at_index_exn l index account ) ;
(diff, openings) )

let sync_nodes ~logger ~config ~depth ~target_ledger_hash =
let diffs_with_openings =
lazy
(* TODO: don't fetch all the diffs from genesis, using binary search determine which diffs is the node missing *)
(let%bind.Deferred.Result diffs =
get_diffs_chain ~logger ~config ~source_ledger_hash:`Genesis
~target_ledger_hash
in
return
(Ok (attach_openings ~diffs:(List.map diffs ~f:Diff.drop_time) ~depth))
)
let get_openings ~diff ~ledger =
let changed_accounts =
Diff.changed_accounts diff
|> List.sort ~compare:(fun (a, _) (b, _) -> Int.compare a b)
in
let account_ids =
List.map changed_accounts ~f:snd |> List.map ~f:Account.identifier
in
Deferred.List.map config.nodes ~f:(fun node ->
let openings = Sparse_ledger.of_ledger_subset_exn ledger account_ids in
List.iter changed_accounts ~f:(fun (index, account) ->
Ledger.set_at_index_exn ledger index account ) ;
openings

let attach_openings ~diffs ~ledger =
List.map diffs ~f:(fun diff -> (diff, get_openings ~diff ~ledger))

let check_synced_nodes ~logger ~(config : Config.t) ~target_ledger_hash =
Deferred.List.iter config.nodes ~f:(fun node ->
match%bind
Rpc.get_diff ~logger ~node_location:node ~ledger_hash:target_ledger_hash
with
| Ok (Some _) ->
printf "Node %s is already synced\n%!"
return ( (* synced node *) )
| Ok None | Error _ ->
printf
!"Node %s is not synced\n%!"
(Host_and_port.to_string node.value) ;
return (Ok ())
| Ok None | Error _ -> (
printf "Syncing node %s\n%!" (Host_and_port.to_string node.value) ;
let%bind.Deferred.Result diffs_with_openings =
Lazy.force diffs_with_openings
in
match%bind
Deferred.List.map diffs_with_openings ~f:(fun (diff, openings) ->
Rpc.post_diff ~logger ~node_location:node
~ledger_openings:openings ~diff
>>| Result.ignore_m )
>>| Result.all_unit
with
| Ok () ->
return (Ok ())
| Error e ->
printf "Error syncing node %s: %s\n%!"
(Host_and_port.to_string node.value)
(Error.to_string_hum e) ;
Config.throw_out_node config ~node ;
return (Ok ()) ) )
>>| Result.all_unit
return (Config.throw_out_node config ~node) )
10 changes: 1 addition & 9 deletions src/app/zeko/da_layer/lib/migrations.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@ open Core_kernel

type migration = Db.t -> unit

let progress_bar ?(width = 30) progress =
let filled_length = int_of_float (float_of_int width *. progress) in
let bar =
String.make filled_length '#' ^ String.make (width - filled_length) '-'
in
Printf.printf "\r[%s] %.0f%%%!" bar (progress *. 100.0) ;
if Float.(progress >= 1.0) then Printf.printf "\n%!"

let add_version_tag db =
let try_read buff =
try
Expand All @@ -20,7 +12,7 @@ let add_version_tag db =
let all = Db.to_alist db in
let l = List.length all in
List.iteri all ~f:(fun i (key, value) ->
progress_bar (Float.of_int i /. Float.of_int l) ;
Zeko_util.progress_bar (Float.of_int i /. Float.of_int l) ;
match try_read value with
| Error _ ->
( (* The pair is something different from diff *) )
Expand Down
Loading