Skip to content

Commit

Permalink
Refactor da versioning (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinOndejka authored Nov 29, 2024
1 parent c3f4ef2 commit 8a98430
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 128 deletions.
62 changes: 47 additions & 15 deletions src/app/zeko/da_layer/lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,44 @@ module Rpc = struct
go max_tries []

let post_diff ~logger ~node_location ~ledger_openings ~diff =
dispatch ~logger node_location Rpc.Post_diff.v2 { ledger_openings; diff }
dispatch ~logger node_location Rpc.Post_diff.V1.t { ledger_openings; diff }

let get_diff ~logger ~node_location ~ledger_hash =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.v2 ledger_hash
match%bind
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V2.t ledger_hash
with
| Ok diff ->
return (Ok diff)
| Error _ ->
(* TODO: do this only if the error is that the rpc method doesn't exist *)
(* Fallback to older version *)
let%bind.Deferred.Result result =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.V1.t
ledger_hash
in
return (Ok (Option.map result ~f:(fun x -> Diff.Stable.V1.to_latest x)))

let get_all_keys ~logger ~node_location () =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_all_keys.v1 ()
dispatch ~max_tries:1 ~logger node_location Rpc.Get_all_keys.V1.t ()

let get_diff_source ~logger ~node_location ~ledger_hash =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff_source.v1
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff_source.V1.t
ledger_hash

let get_node_public_key ~logger ~node_location () =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_signer_public_key.v1 ()
dispatch ~max_tries:1 ~logger node_location Rpc.Get_signer_public_key.V1.t
()

let get_signature ~logger ~node_location ~ledger_hash =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_signature.v1 ledger_hash
dispatch ~max_tries:1 ~logger node_location Rpc.Get_signature.V1.t
ledger_hash
end

module Config = struct
type t = { nodes : Host_and_port.t Cli_lib.Flag.Types.with_name list }
type t =
{ mutable nodes : Host_and_port.t Cli_lib.Flag.Types.with_name list
(** Mutable in case we want to throw out some node *)
}
[@@deriving fields]

let of_string_list uris =
Expand All @@ -67,6 +84,11 @@ module Config = struct
; name = sprintf "da-node-%d" i
} )
}

let throw_out_node t ~(node : Host_and_port.t Cli_lib.Flag.Types.with_name) =
let open Cli_lib.Flag.Types in
t.nodes <-
List.filter t.nodes ~f:(fun n -> not (String.equal n.name node.name))
end

(** Send the diff to all the nodes in the [~config] *)
Expand Down Expand Up @@ -198,7 +220,7 @@ let distribute_genesis_diff ~logger ~config ~ledger =
~f:(fun acc (index, _) -> Sparse_ledger.set_exn acc index Account.empty)
in
let diff =
Diff.Without_timestamp.create
Diff.create
~source_ledger_hash:(Diff.empty_ledger_hash ~depth:(Ledger.depth ledger))
~changed_accounts ~command_with_action_step_flags:None
in
Expand Down Expand Up @@ -233,7 +255,7 @@ let sync_nodes ~logger ~config ~depth ~target_ledger_hash =
return
(Ok
(attach_openings
~diffs:(List.map diffs_with_timestamps ~f:fst)
~diffs:(List.map diffs_with_timestamps ~f:Diff.drop_time)
~depth ) ) )
in
Deferred.List.map config.nodes ~f:(fun node ->
Expand All @@ -244,14 +266,24 @@ let sync_nodes ~logger ~config ~depth ~target_ledger_hash =
printf "Node %s is already synced\n%!"
(Host_and_port.to_string node.value) ;
return (Ok ())
| Ok None | Error _ ->
| Ok None | Error _ -> (
printf "Syncing node %s\n%!" (Host_and_port.to_string node.value) ;
let%bind.Deferred.Result diffs_with_openings =
Lazy.force diffs_with_openings
in
Deferred.List.map diffs_with_openings ~f:(fun (diff, openings) ->
Rpc.post_diff ~logger ~node_location:node
~ledger_openings:openings ~diff
>>| Result.ignore_m )
>>| Result.all_unit )
match%bind
Deferred.List.map diffs_with_openings ~f:(fun (diff, openings) ->
Rpc.post_diff ~logger ~node_location:node
~ledger_openings:openings ~diff
>>| Result.ignore_m )
>>| Result.all_unit
with
| Ok () ->
return (Ok ())
| Error e ->
printf "Error syncing node %s: %s\n%!"
(Host_and_port.to_string node.value)
(Error.to_string_hum e) ;
Config.throw_out_node config ~node ;
return (Ok ()) ) )
>>| Result.all_unit
76 changes: 42 additions & 34 deletions src/app/zeko/da_layer/lib/diff.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,20 @@ open Core_kernel
open Mina_base
open Mina_ledger

module Without_timestamp = struct
[%%versioned
module Stable = struct
module V1 = struct
type t =
{ source_ledger_hash : Ledger_hash.Stable.V1.t
(** Source ledger hash of the diff *)
; changed_accounts : (int * Account.Stable.V2.t) list
(** List of changed accounts with corresponding index in the ledger *)
; command_with_action_step_flags :
(User_command.Stable.V2.t * bool list) option
(** Optionally add command with corresponding action steps to store the history *)
}
[@@deriving yojson, fields, sexp_of]

let to_latest = Fn.id
end
end]

let create ~source_ledger_hash ~changed_accounts
~command_with_action_step_flags =
{ source_ledger_hash; changed_accounts; command_with_action_step_flags }
end

[%%versioned
module Stable = struct
module V2 = struct
type t = Without_timestamp.Stable.V1.t * Block_time.Stable.V1.t
[@@deriving yojson, sexp_of]
type t =
{ source_ledger_hash : Ledger_hash.Stable.V1.t
(** Source ledger hash of the diff *)
; changed_accounts : (int * Account.Stable.V2.t) list
(** List of changed accounts with corresponding index in the ledger *)
; command_with_action_step_flags :
(User_command.Stable.V2.t * bool list) option
(** Optionally add command with corresponding action steps to store the history *)
; timestamp : Block_time.Stable.V1.t (** Timestamp of the diff *)
}
[@@deriving yojson, fields, sexp_of]

let to_latest = Fn.id
end
Expand All @@ -47,22 +32,45 @@ module Stable = struct
}
[@@deriving yojson, fields, sexp_of]

let to_latest t = (Block_time.zero, t)
let to_latest ?(timestamp = Block_time.zero) (t : t) =
{ V2.source_ledger_hash = t.source_ledger_hash
; changed_accounts = t.changed_accounts
; command_with_action_step_flags = t.command_with_action_step_flags
; timestamp
}
end
end]

let changed_accounts { Without_timestamp.Stable.V1.changed_accounts; _ } =
changed_accounts
let create ~source_ledger_hash ~changed_accounts ~command_with_action_step_flags
=
{ Stable.V1.source_ledger_hash
; changed_accounts
; command_with_action_step_flags
}

let source_ledger_hash { Without_timestamp.Stable.V1.source_ledger_hash; _ } =
source_ledger_hash
let changed_accounts { Stable.V1.changed_accounts; _ } = changed_accounts

let source_ledger_hash { Stable.V1.source_ledger_hash; _ } = source_ledger_hash

let command_with_action_step_flags
{ Without_timestamp.Stable.V1.command_with_action_step_flags; _ } =
{ Stable.V1.command_with_action_step_flags; _ } =
command_with_action_step_flags

let add_time ~logger (t : Without_timestamp.t) : t =
(t, Block_time.now (Block_time.Controller.basic ~logger))
let add_time ~logger t =
Stable.V1.to_latest
~timestamp:(Block_time.now (Block_time.Controller.basic ~logger))
t

let drop_time
{ Stable.V2.source_ledger_hash
; changed_accounts
; command_with_action_step_flags
; _
} =
{ Stable.V1.source_ledger_hash
; changed_accounts
; command_with_action_step_flags
}

let to_bigstring = Binable.to_bigstring (module Stable.Latest)

Expand Down
30 changes: 20 additions & 10 deletions src/app/zeko/da_layer/lib/node.ml
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ let post_diff t ~ledger_openings ~diff =
Ok ()
| true ->
Error
(Error.create "Duplicate indices" diff
[%sexp_of: Diff.Without_timestamp.t] )
(Error.create "Duplicate indices" diff [%sexp_of: Diff.Stable.V1.t])
in

(* 4 *)
Expand Down Expand Up @@ -251,7 +250,8 @@ let post_diff t ~ledger_openings ~diff =
in

(* 7 *)
let diff = Diff.add_time ~logger diff in
(* V2 was added time *)
let diff : Diff.Stable.V2.t = Diff.add_time ~logger diff in

(* 8 *)
(* We don't care if the diff already existed *)
Expand Down Expand Up @@ -327,7 +327,8 @@ let get_signature t ~ledger_hash =
let implementations t =
Async.Rpc.Implementations.create_exn ~on_unknown_rpc:`Raise
~implementations:
[ Async.Rpc.Rpc.implement Rpc.Post_diff.v2
(* Post_diff *)
[ Async.Rpc.Rpc.implement Rpc.Post_diff.V1.t
(fun () { ledger_openings; diff } ->
match post_diff t ~ledger_openings ~diff with
| Ok signature ->
Expand All @@ -337,12 +338,19 @@ let implementations t =
[%log warn] "Error posting diff: $error"
~metadata:[ ("error", `String (Error.to_string_hum e)) ] ;
failwith (Error.to_string_hum e) )
; Async.Rpc.Rpc.implement Rpc.Get_diff.v2 (fun () query ->
(* Get_diff *)
; Async.Rpc.Rpc.implement Rpc.Get_diff.V1.t (fun () query ->
let v2_diff = Db.get_diff t.db ~ledger_hash:query in
let v1_diff = Option.map v2_diff ~f:Diff.drop_time in
Async.return @@ v1_diff )
; Async.Rpc.Rpc.implement Rpc.Get_diff.V2.t (fun () query ->
Async.return @@ Db.get_diff t.db ~ledger_hash:query )
; Async.Rpc.Rpc.implement Rpc.Get_all_keys.v1 (fun () () ->
(* Get_all_keys *)
; Async.Rpc.Rpc.implement Rpc.Get_all_keys.V1.t (fun () () ->
Async.return @@ Db.get_index t.db )
; Async.Rpc.Rpc.implement Rpc.Get_diff_source.v1 (fun () query ->
Async.return @@ Diff.source_ledger_hash @@ fst
(* Get_diff_source *)
; Async.Rpc.Rpc.implement Rpc.Get_diff_source.V1.t (fun () query ->
Async.return @@ Diff.Stable.Latest.source_ledger_hash
@@ Option.value_exn
~error:
( Error.of_string
Expand All @@ -351,9 +359,11 @@ let implementations t =
hash %s"
(Ledger_hash.to_decimal_string query) )
@@ Db.get_diff t.db ~ledger_hash:query )
; Async.Rpc.Rpc.implement Rpc.Get_signer_public_key.v1 (fun () () ->
(* Get_signed_public_key *)
; Async.Rpc.Rpc.implement Rpc.Get_signer_public_key.V1.t (fun () () ->
Async.return @@ Public_key.compress @@ t.signer.public_key )
; Async.Rpc.Rpc.implement Rpc.Get_signature.v1 (fun () query ->
(* Get_signature *)
; Async.Rpc.Rpc.implement Rpc.Get_signature.V1.t (fun () query ->
Async.return @@ get_signature t ~ledger_hash:query )
]

Expand Down
Loading

0 comments on commit 8a98430

Please sign in to comment.