Skip to content

Commit

Permalink
DA improve syncing (#234)
Browse files Browse the repository at this point in the history
* Add methods to sync whole da

* Fix version fallback in da client

* Replace fetching of hash chain with diff chain

* Add ledger caching to the archive relay syncing

* Remove dead code

* Move db from da node to separate file

* Add posibility of migrations in da node

* Add version tag to diff

* Add migration that adds version tag
  • Loading branch information
MartinOndejka authored Dec 4, 2024
1 parent 8a98430 commit f106367
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 283 deletions.
34 changes: 23 additions & 11 deletions src/app/zeko/da_layer/cli.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,42 @@ let run_node =
flag "--port"
(optional_with_default 8080 int)
~doc:"int Port to listen on"
and nodes_to_sync =
flag "--da-node-to-sync" (listed string)
~doc:"string list Nodes to sync with"
and node_to_sync =
flag "--da-node-to-sync" (optional string)
~doc:"string Nodes to sync with"
and hash_to_sync =
flag "--hash-to-sync" (optional string)
~doc:"string Hash to sync with in decimal string form"
and testing_mode =
flag "--random-sk" no_arg
~doc:"Run in testing mode, the signer key will be generated randomly"
and no_migrations =
flag "--no-migrations" no_arg ~doc:"Do not run migrations"
in
fun () ->
let signer =
if testing_mode then Private_key.(create () |> to_base58_check)
else Sys.getenv_exn "MINA_PRIVATE_KEY"
in
let logger = Logger.create () in
let nodes_to_sync =
List.map nodes_to_sync ~f:(fun node_to_sync ->
Cli_lib.Flag.Types.
{ value = Core_kernel.Host_and_port.of_string node_to_sync
; name = "node-to-sync"
} )
let sync_arg =
match (node_to_sync, hash_to_sync) with
| Some node_to_sync, Some hash_to_sync ->
Some
( Cli_lib.Flag.Types.
{ value = Core_kernel.Host_and_port.of_string node_to_sync
; name = "node-to-sync"
}
, Mina_base.Ledger_hash.of_decimal_string hash_to_sync )
| None, None ->
None
| _ ->
failwith "Both node-to-sync and hash-to-sync must be provided"
in
let%bind () =
Deferred.ignore_m
@@ Da_layer.Node.create_server ~nodes_to_sync ~logger ~port ~db_dir
~signer_sk:signer ()
@@ Da_layer.Node.create_server ~sync_arg ~logger ~port ~db_dir
~signer_sk:signer ~no_migrations ()
in
[%log info] "Server started on port $port"
~metadata:[ ("port", `Int port) ] ;
Expand Down
73 changes: 40 additions & 33 deletions src/app/zeko/da_layer/lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,21 @@ module Rpc = struct
with
| Ok diff ->
return (Ok diff)
| Error _ ->
(* TODO: do this only if the error is that the rpc method doesn't exist *)
(* Fallback to older version *)
let%bind.Deferred.Result result =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V1.t
ledger_hash
| Error e ->
let v2_unimplemented =
Error.to_string_mach e
|> String.is_substring
~substring:"Unimplemented_rpc Get_diff (Version 2)"
in
return (Ok (Option.map result ~f:(fun x -> Diff.Stable.V1.to_latest x)))
if v2_unimplemented then
(* Fallback to older version *)
let%bind.Deferred.Result result =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V1.t
ledger_hash
in
return
(Ok (Option.map result ~f:(fun x -> Diff.Stable.V1.to_latest x)))
else return (Error e)

let get_all_keys ~logger ~node_location () =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_all_keys.V1.t ()
Expand All @@ -67,6 +74,14 @@ module Rpc = struct
let get_signature ~logger ~node_location ~ledger_hash =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_signature.V1.t
ledger_hash

let get_ledger_hashes_chain ~logger ~node_location ~source ~target =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_ledger_hashes_chain.V1.t
{ source; target }

let get_diffs_chain ~logger ~node_location ~source ~target =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diffs_chain.V1.t
{ source; target }
end

module Config = struct
Expand Down Expand Up @@ -175,21 +190,18 @@ let try_all_nodes ~config ~f =
in
try_first ~accum_errors:[] (Config.nodes config)

(** Get the chain of ledger hashes from empty ledger hash to [target_ledger_hash] *)
let get_ledger_hashes_chain ~logger ~config ~depth ~target_ledger_hash =
let rec go current =
if Ledger_hash.equal current (Diff.empty_ledger_hash ~depth) then
return (Ok [])
else
let%bind.Deferred.Result source =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_diff_source ~logger ~node_location ~ledger_hash:current )
in
let%bind.Deferred.Result next = go source in
return (Ok (current :: next))
in
let%bind.Deferred.Result from_target_to_genesis = go target_ledger_hash in
return (Ok (List.rev from_target_to_genesis))
(** Get the chain of ledger hashes from [source_ledger_hash] hash to [target_ledger_hash] *)
let get_ledger_hashes_chain ~logger ~config ~source_ledger_hash
~target_ledger_hash =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_ledger_hashes_chain ~logger ~node_location
~source:source_ledger_hash ~target:target_ledger_hash )

(** Get the chain of diffs from [source_ledger_hash] hash to [target_ledger_hash] *)
let get_diffs_chain ~logger ~config ~source_ledger_hash ~target_ledger_hash =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_diffs_chain ~logger ~node_location ~source:source_ledger_hash
~target:target_ledger_hash )

(** Try to get the diff from the first node in the list, if it fails, try the next one *)
let get_diff ~logger ~config ~ledger_hash =
Expand Down Expand Up @@ -242,21 +254,16 @@ let attach_openings ~diffs ~depth =
(diff, openings) )

let sync_nodes ~logger ~config ~depth ~target_ledger_hash =
let%bind.Deferred.Result ledger_hashes_chain =
get_ledger_hashes_chain ~logger ~config ~depth ~target_ledger_hash
in
let diffs_with_openings =
lazy
(let%bind.Deferred.Result diffs_with_timestamps =
Deferred.List.map ledger_hashes_chain ~how:(`Max_concurrent_jobs 5)
~f:(fun ledger_hash -> get_diff ~logger ~config ~ledger_hash)
>>| Result.all
(* TODO: don't fetch all the diffs from genesis, using binary search determine which diffs is the node missing *)
(let%bind.Deferred.Result diffs =
get_diffs_chain ~logger ~config ~source_ledger_hash:`Genesis
~target_ledger_hash
in
return
(Ok
(attach_openings
~diffs:(List.map diffs_with_timestamps ~f:Diff.drop_time)
~depth ) ) )
(Ok (attach_openings ~diffs:(List.map diffs ~f:Diff.drop_time) ~depth))
)
in
Deferred.List.map config.nodes ~f:(fun node ->
match%bind
Expand Down
78 changes: 78 additions & 0 deletions src/app/zeko/da_layer/lib/db.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
open Core_kernel
open Mina_base

(** Holds keys to all the diffes *)
module Index = struct
[%%versioned
module Stable = struct
module V1 = struct
type t = Ledger_hash.Stable.V1.t list

let to_latest = Fn.id
end
end]

let to_bigstring = Binable.to_bigstring (module Stable.Latest)

let of_bigstring = Binable.of_bigstring (module Stable.Latest)
end

module Key_value = struct
type _ t =
| Diff : (Ledger_hash.t * Diff.t) t
| Diff_index : (unit * Index.t) t
| Migration : (unit * int) t

let serialize_key : type k v. (k * v) t -> k -> Bigstring.t =
fun pair_type key ->
match pair_type with
| Diff ->
Bigstring.concat
[ Bigstring.of_string "diff"
; Bigstring.of_string @@ Ledger_hash.to_decimal_string key
]
| Diff_index ->
Bigstring.of_string "diff_index"
| Migration ->
Bigstring.of_string "migration"

let serialize_value : type k v. (k * v) t -> v -> Bigstring.t =
fun pair_type value ->
match pair_type with
| Diff ->
Diff.to_bigstring value
| Diff_index ->
Index.to_bigstring value
| Migration ->
Bigstring.of_string @@ Int.to_string value

let deserialize_value : type k v. (k * v) t -> Bigstring.t -> v =
fun pair_type data ->
match pair_type with
| Diff ->
Diff.of_bigstring data |> Or_error.ok_exn
| Diff_index ->
Index.of_bigstring data
| Migration ->
Int.of_string @@ Bigstring.to_string data
end

include Kvdb_base.Make (Key_value)

let set_index t ~index = set t Diff_index ~key:() ~data:index

let get_index t = get t Diff_index ~key:() |> Option.value ~default:[]

let add_diff t ~ledger_hash ~diff =
let index = get_index t in
if List.mem index ledger_hash ~equal:Ledger_hash.equal then `Already_existed
else (
set t Diff ~key:ledger_hash ~data:diff ;
set_index t ~index:(ledger_hash :: index) ;
`Added )

let get_diff t ~ledger_hash = get t Diff ~key:ledger_hash

let get_migration t = get t Migration ~key:() |> Option.value ~default:0

let set_migration t ~migration = set t Migration ~key:() ~data:migration
26 changes: 23 additions & 3 deletions src/app/zeko/da_layer/lib/diff.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ open Mina_ledger

[%%versioned
module Stable = struct
[@@@with_top_version_tag]

module V2 = struct
type t =
{ source_ledger_hash : Ledger_hash.Stable.V1.t
Expand All @@ -15,7 +17,7 @@ module Stable = struct
(** Optionally add command with corresponding action steps to store the history *)
; timestamp : Block_time.Stable.V1.t (** Timestamp of the diff *)
}
[@@deriving yojson, fields, sexp_of]
[@@deriving yojson, fields, sexp_of, compare]

let to_latest = Fn.id
end
Expand Down Expand Up @@ -72,10 +74,28 @@ let drop_time
; command_with_action_step_flags
}

let to_bigstring = Binable.to_bigstring (module Stable.Latest)
let to_bigstring =
Binable.to_bigstring (module Stable.Latest.With_top_version_tag)

let of_bigstring = Binable.of_bigstring (module Stable.Latest)
let of_bigstring bigstring =
let pos_ref = ref 0 in
Stable.bin_read_top_tagged_to_latest ~pos_ref bigstring

(** [Ledger_hash.empty_hash] is [zero], so we need this for the genesis state of the rollup *)
let empty_ledger_hash ~depth =
Ledger.merkle_root @@ Ledger.create_ephemeral ~depth ()

let%test_unit "diff versioning" =
let v1 =
Stable.V1.
{ source_ledger_hash = Ledger_hash.empty_hash
; changed_accounts = []
; command_with_action_step_flags = None
}
in
let v1_serialized =
Binable.to_bigstring (module Stable.V1.With_top_version_tag) v1
in
let v2 = of_bigstring v1_serialized |> Or_error.ok_exn in

[%test_eq: Stable.V2.t] (Stable.V1.to_latest v1) (Stable.V2.to_latest v2)
1 change: 1 addition & 0 deletions src/app/zeko/da_layer/lib/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(library
(name da_layer)
(inline_tests)
(libraries
kvdb_base
zkapps_rollup
Expand Down
41 changes: 41 additions & 0 deletions src/app/zeko/da_layer/lib/migrations.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
open Core_kernel

type migration = Db.t -> unit

let progress_bar ?(width = 30) progress =
let filled_length = int_of_float (float_of_int width *. progress) in
let bar =
String.make filled_length '#' ^ String.make (width - filled_length) '-'
in
Printf.printf "\r[%s] %.0f%%%!" bar (progress *. 100.0) ;
if Float.(progress >= 1.0) then Printf.printf "\n%!"

let add_version_tag db =
let try_read buff =
try
let pos_ref = ref 0 in
Ok (Diff.Stable.V1.bin_read_t ~pos_ref buff)
with _ -> Error "Failed to convert diff"
in
let all = Db.to_alist db in
let l = List.length all in
List.iteri all ~f:(fun i (key, value) ->
progress_bar (Float.of_int i /. Float.of_int l) ;
match try_read value with
| Error _ ->
( (* The pair is something different from diff *) )
| Ok diff ->
let v2 = Diff.to_bigstring @@ Diff.Stable.V1.to_latest diff in
Db.set_raw db ~key ~data:v2 )

let migrations : migration list = [ add_version_tag ]

let latest_migration = List.length migrations

let run_migrations ~logger db =
let old_migration = Db.get_migration db in
List.drop migrations old_migration
|> List.iteri ~f:(fun i migration ->
[%log info] "Running migration %d" (old_migration + i + 1) ;
migration db ;
Db.set_migration db ~migration:(old_migration + i + 1) )
Loading

0 comments on commit f106367

Please sign in to comment.