Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DA improve syncing #234

Merged
merged 9 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions src/app/zeko/da_layer/cli.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,42 @@ let run_node =
flag "--port"
(optional_with_default 8080 int)
~doc:"int Port to listen on"
and nodes_to_sync =
flag "--da-node-to-sync" (listed string)
~doc:"string list Nodes to sync with"
and node_to_sync =
flag "--da-node-to-sync" (optional string)
~doc:"string Nodes to sync with"
and hash_to_sync =
flag "--hash-to-sync" (optional string)
~doc:"string Hash to sync with in decimal string form"
and testing_mode =
flag "--random-sk" no_arg
~doc:"Run in testing mode, the signer key will be generated randomly"
and no_migrations =
flag "--no-migrations" no_arg ~doc:"Do not run migrations"
in
fun () ->
let signer =
if testing_mode then Private_key.(create () |> to_base58_check)
else Sys.getenv_exn "MINA_PRIVATE_KEY"
in
let logger = Logger.create () in
let nodes_to_sync =
List.map nodes_to_sync ~f:(fun node_to_sync ->
Cli_lib.Flag.Types.
{ value = Core_kernel.Host_and_port.of_string node_to_sync
; name = "node-to-sync"
} )
let sync_arg =
match (node_to_sync, hash_to_sync) with
| Some node_to_sync, Some hash_to_sync ->
Some
( Cli_lib.Flag.Types.
{ value = Core_kernel.Host_and_port.of_string node_to_sync
; name = "node-to-sync"
}
, Mina_base.Ledger_hash.of_decimal_string hash_to_sync )
| None, None ->
None
| _ ->
failwith "Both node-to-sync and hash-to-sync must be provided"
in
let%bind () =
Deferred.ignore_m
@@ Da_layer.Node.create_server ~nodes_to_sync ~logger ~port ~db_dir
~signer_sk:signer ()
@@ Da_layer.Node.create_server ~sync_arg ~logger ~port ~db_dir
~signer_sk:signer ~no_migrations ()
in
[%log info] "Server started on port $port"
~metadata:[ ("port", `Int port) ] ;
Expand Down
73 changes: 40 additions & 33 deletions src/app/zeko/da_layer/lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,21 @@ module Rpc = struct
with
| Ok diff ->
return (Ok diff)
| Error _ ->
(* TODO: do this only if the error is that the rpc method doesn't exist *)
(* Fallback to older version *)
let%bind.Deferred.Result result =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V1.t
ledger_hash
| Error e ->
let v2_unimplemented =
Error.to_string_mach e
|> String.is_substring
~substring:"Unimplemented_rpc Get_diff (Version 2)"
in
return (Ok (Option.map result ~f:(fun x -> Diff.Stable.V1.to_latest x)))
if v2_unimplemented then
(* Fallback to older version *)
let%bind.Deferred.Result result =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V1.t
ledger_hash
in
return
(Ok (Option.map result ~f:(fun x -> Diff.Stable.V1.to_latest x)))
else return (Error e)

let get_all_keys ~logger ~node_location () =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_all_keys.V1.t ()
Expand All @@ -67,6 +74,14 @@ module Rpc = struct
let get_signature ~logger ~node_location ~ledger_hash =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_signature.V1.t
ledger_hash

let get_ledger_hashes_chain ~logger ~node_location ~source ~target =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_ledger_hashes_chain.V1.t
{ source; target }

let get_diffs_chain ~logger ~node_location ~source ~target =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diffs_chain.V1.t
{ source; target }
end

module Config = struct
Expand Down Expand Up @@ -175,21 +190,18 @@ let try_all_nodes ~config ~f =
in
try_first ~accum_errors:[] (Config.nodes config)

(** Get the chain of ledger hashes from empty ledger hash to [target_ledger_hash] *)
let get_ledger_hashes_chain ~logger ~config ~depth ~target_ledger_hash =
let rec go current =
if Ledger_hash.equal current (Diff.empty_ledger_hash ~depth) then
return (Ok [])
else
let%bind.Deferred.Result source =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_diff_source ~logger ~node_location ~ledger_hash:current )
in
let%bind.Deferred.Result next = go source in
return (Ok (current :: next))
in
let%bind.Deferred.Result from_target_to_genesis = go target_ledger_hash in
return (Ok (List.rev from_target_to_genesis))
(** Get the chain of ledger hashes from [source_ledger_hash] hash to [target_ledger_hash] *)
let get_ledger_hashes_chain ~logger ~config ~source_ledger_hash
~target_ledger_hash =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_ledger_hashes_chain ~logger ~node_location
~source:source_ledger_hash ~target:target_ledger_hash )

(** Get the chain of diffs from [source_ledger_hash] hash to [target_ledger_hash] *)
let get_diffs_chain ~logger ~config ~source_ledger_hash ~target_ledger_hash =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_diffs_chain ~logger ~node_location ~source:source_ledger_hash
~target:target_ledger_hash )

(** Try to get the diff from the first node in the list, if it fails, try the next one *)
let get_diff ~logger ~config ~ledger_hash =
Expand Down Expand Up @@ -242,21 +254,16 @@ let attach_openings ~diffs ~depth =
(diff, openings) )

let sync_nodes ~logger ~config ~depth ~target_ledger_hash =
let%bind.Deferred.Result ledger_hashes_chain =
get_ledger_hashes_chain ~logger ~config ~depth ~target_ledger_hash
in
let diffs_with_openings =
lazy
(let%bind.Deferred.Result diffs_with_timestamps =
Deferred.List.map ledger_hashes_chain ~how:(`Max_concurrent_jobs 5)
~f:(fun ledger_hash -> get_diff ~logger ~config ~ledger_hash)
>>| Result.all
(* TODO: don't fetch all the diffs from genesis, using binary search determine which diffs is the node missing *)
(let%bind.Deferred.Result diffs =
get_diffs_chain ~logger ~config ~source_ledger_hash:`Genesis
~target_ledger_hash
in
return
(Ok
(attach_openings
~diffs:(List.map diffs_with_timestamps ~f:Diff.drop_time)
~depth ) ) )
(Ok (attach_openings ~diffs:(List.map diffs ~f:Diff.drop_time) ~depth))
)
in
Deferred.List.map config.nodes ~f:(fun node ->
match%bind
Expand Down
78 changes: 78 additions & 0 deletions src/app/zeko/da_layer/lib/db.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
open Core_kernel
open Mina_base

(** Holds keys to all the diffes *)
module Index = struct
[%%versioned
module Stable = struct
module V1 = struct
type t = Ledger_hash.Stable.V1.t list

let to_latest = Fn.id
end
end]

let to_bigstring = Binable.to_bigstring (module Stable.Latest)

let of_bigstring = Binable.of_bigstring (module Stable.Latest)
end

module Key_value = struct
type _ t =
| Diff : (Ledger_hash.t * Diff.t) t
| Diff_index : (unit * Index.t) t
| Migration : (unit * int) t

let serialize_key : type k v. (k * v) t -> k -> Bigstring.t =
fun pair_type key ->
match pair_type with
| Diff ->
Bigstring.concat
[ Bigstring.of_string "diff"
; Bigstring.of_string @@ Ledger_hash.to_decimal_string key
]
| Diff_index ->
Bigstring.of_string "diff_index"
| Migration ->
Bigstring.of_string "migration"

let serialize_value : type k v. (k * v) t -> v -> Bigstring.t =
fun pair_type value ->
match pair_type with
| Diff ->
Diff.to_bigstring value
| Diff_index ->
Index.to_bigstring value
| Migration ->
Bigstring.of_string @@ Int.to_string value

let deserialize_value : type k v. (k * v) t -> Bigstring.t -> v =
fun pair_type data ->
match pair_type with
| Diff ->
Diff.of_bigstring data |> Or_error.ok_exn
| Diff_index ->
Index.of_bigstring data
| Migration ->
Int.of_string @@ Bigstring.to_string data
end

include Kvdb_base.Make (Key_value)

let set_index t ~index = set t Diff_index ~key:() ~data:index

let get_index t = get t Diff_index ~key:() |> Option.value ~default:[]

let add_diff t ~ledger_hash ~diff =
let index = get_index t in
if List.mem index ledger_hash ~equal:Ledger_hash.equal then `Already_existed
else (
set t Diff ~key:ledger_hash ~data:diff ;
set_index t ~index:(ledger_hash :: index) ;
`Added )

let get_diff t ~ledger_hash = get t Diff ~key:ledger_hash

let get_migration t = get t Migration ~key:() |> Option.value ~default:0

let set_migration t ~migration = set t Migration ~key:() ~data:migration
26 changes: 23 additions & 3 deletions src/app/zeko/da_layer/lib/diff.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ open Mina_ledger

[%%versioned
module Stable = struct
[@@@with_top_version_tag]

module V2 = struct
type t =
{ source_ledger_hash : Ledger_hash.Stable.V1.t
Expand All @@ -15,7 +17,7 @@ module Stable = struct
(** Optionally add command with corresponding action steps to store the history *)
; timestamp : Block_time.Stable.V1.t (** Timestamp of the diff *)
}
[@@deriving yojson, fields, sexp_of]
[@@deriving yojson, fields, sexp_of, compare]

let to_latest = Fn.id
end
Expand Down Expand Up @@ -72,10 +74,28 @@ let drop_time
; command_with_action_step_flags
}

let to_bigstring = Binable.to_bigstring (module Stable.Latest)
let to_bigstring =
Binable.to_bigstring (module Stable.Latest.With_top_version_tag)

let of_bigstring = Binable.of_bigstring (module Stable.Latest)
let of_bigstring bigstring =
let pos_ref = ref 0 in
Stable.bin_read_top_tagged_to_latest ~pos_ref bigstring

(** [Ledger_hash.empty_hash] is [zero], so we need this for the genesis state of the rollup *)
let empty_ledger_hash ~depth =
Ledger.merkle_root @@ Ledger.create_ephemeral ~depth ()

let%test_unit "diff versioning" =
let v1 =
Stable.V1.
{ source_ledger_hash = Ledger_hash.empty_hash
; changed_accounts = []
; command_with_action_step_flags = None
}
in
let v1_serialized =
Binable.to_bigstring (module Stable.V1.With_top_version_tag) v1
in
let v2 = of_bigstring v1_serialized |> Or_error.ok_exn in

[%test_eq: Stable.V2.t] (Stable.V1.to_latest v1) (Stable.V2.to_latest v2)
1 change: 1 addition & 0 deletions src/app/zeko/da_layer/lib/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(library
(name da_layer)
(inline_tests)
(libraries
kvdb_base
zkapps_rollup
Expand Down
41 changes: 41 additions & 0 deletions src/app/zeko/da_layer/lib/migrations.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
open Core_kernel

type migration = Db.t -> unit

let progress_bar ?(width = 30) progress =
let filled_length = int_of_float (float_of_int width *. progress) in
let bar =
String.make filled_length '#' ^ String.make (width - filled_length) '-'
in
Printf.printf "\r[%s] %.0f%%%!" bar (progress *. 100.0) ;
if Float.(progress >= 1.0) then Printf.printf "\n%!"

let add_version_tag db =
let try_read buff =
try
let pos_ref = ref 0 in
Ok (Diff.Stable.V1.bin_read_t ~pos_ref buff)
with _ -> Error "Failed to convert diff"
in
let all = Db.to_alist db in
let l = List.length all in
List.iteri all ~f:(fun i (key, value) ->
progress_bar (Float.of_int i /. Float.of_int l) ;
match try_read value with
| Error _ ->
( (* The pair is something different from diff *) )
| Ok diff ->
let v2 = Diff.to_bigstring @@ Diff.Stable.V1.to_latest diff in
Db.set_raw db ~key ~data:v2 )

let migrations : migration list = [ add_version_tag ]

let latest_migration = List.length migrations

let run_migrations ~logger db =
let old_migration = Db.get_migration db in
List.drop migrations old_migration
|> List.iteri ~f:(fun i migration ->
[%log info] "Running migration %d" (old_migration + i + 1) ;
migration db ;
Db.set_migration db ~migration:(old_migration + i + 1) )
Loading