Skip to content

Commit

Permalink
Replace fetching of hash chain with diff chain
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinOndejka committed Dec 2, 2024
1 parent 11e9f86 commit 4953faf
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 62 deletions.
24 changes: 12 additions & 12 deletions src/app/zeko/da_layer/lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ let get_ledger_hashes_chain ~logger ~config ~source_ledger_hash
Rpc.get_ledger_hashes_chain ~logger ~node_location
~source:source_ledger_hash ~target:target_ledger_hash )

(** Get the chain of diffs from [source_ledger_hash] hash to [target_ledger_hash] *)
let get_diffs_chain ~logger ~config ~source_ledger_hash ~target_ledger_hash =
try_all_nodes ~config ~f:(fun ~node_location () ->
Rpc.get_diffs_chain ~logger ~node_location ~source:source_ledger_hash
~target:target_ledger_hash )

(** Try to get the diff from the first node in the list, if it fails, try the next one *)
let get_diff ~logger ~config ~ledger_hash =
try_all_nodes ~config ~f:(fun ~node_location () ->
Expand Down Expand Up @@ -248,22 +254,16 @@ let attach_openings ~diffs ~depth =
(diff, openings) )

let sync_nodes ~logger ~config ~depth ~target_ledger_hash =
let%bind.Deferred.Result ledger_hashes_chain =
get_ledger_hashes_chain ~logger ~config ~source_ledger_hash:None
~target_ledger_hash
in
let diffs_with_openings =
lazy
(let%bind.Deferred.Result diffs_with_timestamps =
Deferred.List.map ledger_hashes_chain ~how:(`Max_concurrent_jobs 5)
~f:(fun ledger_hash -> get_diff ~logger ~config ~ledger_hash)
>>| Result.all
(* TODO: don't fetch all the diffs from genesis, using binary search determine which diffs is the node missing *)
(let%bind.Deferred.Result diffs =
get_diffs_chain ~logger ~config ~source_ledger_hash:`Genesis
~target_ledger_hash
in
return
(Ok
(attach_openings
~diffs:(List.map diffs_with_timestamps ~f:Diff.drop_time)
~depth ) ) )
(Ok (attach_openings ~diffs:(List.map diffs ~f:Diff.drop_time) ~depth))
)
in
Deferred.List.map config.nodes ~f:(fun node ->
match%bind
Expand Down
79 changes: 43 additions & 36 deletions src/app/zeko/da_layer/lib/node.ml
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ let get_signature t ~ledger_hash =
let implementations t =
Async.Rpc.Implementations.create_exn ~on_unknown_rpc:`Raise
~implementations:
(* Post_diff *)
[ Async.Rpc.Rpc.implement Rpc.Post_diff.V1.t
[ (* Post_diff *)
Async.Rpc.Rpc.implement Rpc.Post_diff.V1.t
(fun () { ledger_openings; diff } ->
match post_diff t ~ledger_openings ~diff with
| Ok signature ->
Expand All @@ -340,18 +340,18 @@ let implementations t =
[%log warn] "Error posting diff: $error"
~metadata:[ ("error", `String (Error.to_string_hum e)) ] ;
failwith (Error.to_string_hum e) )
(* Get_diff *)
; Async.Rpc.Rpc.implement Rpc.Get_diff.V1.t (fun () query ->
; (* Get_diff *)
Async.Rpc.Rpc.implement Rpc.Get_diff.V1.t (fun () query ->
let v2_diff = Db.get_diff t.db ~ledger_hash:query in
let v1_diff = Option.map v2_diff ~f:Diff.drop_time in
Async.return @@ v1_diff )
; Async.Rpc.Rpc.implement Rpc.Get_diff.V2.t (fun () query ->
Async.return @@ Db.get_diff t.db ~ledger_hash:query )
(* Get_all_keys *)
; Async.Rpc.Rpc.implement Rpc.Get_all_keys.V1.t (fun () () ->
; (* Get_all_keys *)
Async.Rpc.Rpc.implement Rpc.Get_all_keys.V1.t (fun () () ->
Async.return @@ Db.get_index t.db )
(* Get_diff_source *)
; Async.Rpc.Rpc.implement Rpc.Get_diff_source.V1.t (fun () query ->
; (* Get_diff_source *)
Async.Rpc.Rpc.implement Rpc.Get_diff_source.V1.t (fun () query ->
Async.return @@ Diff.Stable.Latest.source_ledger_hash
@@ Option.value_exn
~error:
Expand All @@ -361,14 +361,14 @@ let implementations t =
hash %s"
(Ledger_hash.to_decimal_string query) )
@@ Db.get_diff t.db ~ledger_hash:query )
(* Get_signed_public_key *)
; Async.Rpc.Rpc.implement Rpc.Get_signer_public_key.V1.t (fun () () ->
; (* Get_signed_public_key *)
Async.Rpc.Rpc.implement Rpc.Get_signer_public_key.V1.t (fun () () ->
Async.return @@ Public_key.compress @@ t.signer.public_key )
(* Get_signature *)
; Async.Rpc.Rpc.implement Rpc.Get_signature.V1.t (fun () query ->
; (* Get_signature *)
Async.Rpc.Rpc.implement Rpc.Get_signature.V1.t (fun () query ->
Async.return @@ get_signature t ~ledger_hash:query )
(* Get_ledger_hashes_chain *)
; Async.Rpc.Rpc.implement Rpc.Get_ledger_hashes_chain.V1.t
; (* Get_ledger_hashes_chain *)
Async.Rpc.Rpc.implement Rpc.Get_ledger_hashes_chain.V1.t
(fun () { source; target } ->
let source =
Option.value
Expand All @@ -387,33 +387,40 @@ let implementations t =
in
current :: go source
in
let from_target_to_genesis = go target in
Async.return (List.rev from_target_to_genesis) )
(* Get_diffs_chain *)
; Async.Rpc.Rpc.implement Rpc.Get_diffs_chain.V1.t
(fun () { source; target } ->
Async.return @@ List.rev (go target) )
; (* Get_diffs_chain *)
Async.Rpc.Rpc.implement Rpc.Get_diffs_chain.V1.t
(fun () { source = source_opt; target } ->
let source =
Option.value
~default:
(Diff.empty_ledger_hash
~depth:constraint_constants.ledger_depth )
source
match source_opt with
| `Genesis ->
Diff.empty_ledger_hash
~depth:constraint_constants.ledger_depth
| `Specific source ->
source
in
let get_diff ledger_hash =
Db.get_diff ~ledger_hash t.db
|> Option.value_exn ~here:[%here] ~message:"Diff not found"
in
let rec go current =
let current_ledger_hash =
Diff.Stable.V2.source_ledger_hash current
in
let rec go current_ledger_hash =
if Ledger_hash.equal current_ledger_hash source then []
else
let source = get_diff current_ledger_hash in
current :: go source
let current =
Db.get_diff ~ledger_hash:current_ledger_hash t.db
|> Option.value_exn ~here:[%here] ~message:"Diff not found"
in
current :: go (Diff.Stable.V2.source_ledger_hash current)
in
let from_target_to_genesis = go (get_diff target) in
Async.return (List.rev from_target_to_genesis) )
let chain = List.rev (go target) in
match (source_opt, chain) with
| _, [] ->
failwith "No diffs found"
| ( `Specific wanted_source
, { source_ledger_hash = first_source; _ } :: _ )
when not (Ledger_hash.equal wanted_source first_source) ->
failwithf
"Get_diffs_chain: source ledger hash %s is not in the chain"
(Ledger_hash.to_decimal_string source)
()
| _ ->
Async.return chain )
]

let create_server ~nodes_to_sync ~port ~logger ~db_dir ~signer_sk () =
Expand Down
2 changes: 1 addition & 1 deletion src/app/zeko/da_layer/lib/rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ module Get_diffs_chain = struct
module V1 = struct
module Query = struct
type t =
{ source : Ledger_hash.Stable.V1.t option
{ source : [ `Genesis | `Specific of Ledger_hash.Stable.V1.t ]
; target : Ledger_hash.Stable.V1.t
}
[@@deriving bin_io_unversioned]
Expand Down
7 changes: 3 additions & 4 deletions src/app/zeko/sequencer/archive_relay/run.ml
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,10 @@ end

let sync_archive ~(state : State.t) ~hash =
let logger = state.logger in
let%bind.Deferred.Result chain =
Da_layer.Client.get_ledger_hashes_chain ~logger ~config:state.da_config
~source_ledger_hash:None ~target_ledger_hash:hash
let%bind.Deferred.Result diffs =
Da_layer.Client.get_diffs_chain ~logger ~config:state.da_config
~source_ledger_hash:`Genesis ~target_ledger_hash:hash
in
let%bind diffs = Deferred.List.map chain ~f:(fetch_diff ~state) in
Ledger.with_ledger ~depth:constraint_constants.ledger_depth ~f:(fun ledger ->
let protocol_state = ref compile_time_genesis_state in
Deferred.List.iter diffs ~f:(fun diff ->
Expand Down
16 changes: 7 additions & 9 deletions src/app/zeko/sequencer/lib/zeko_sequencer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -787,18 +787,13 @@ module Sequencer = struct
printf "Init root: %s\n%!" Ledger_hash.(to_decimal_string (get_root t)) ;

(* apply diffs from DA layer *)
let%bind ledger_hashes_chain =
Da_layer.Client.get_ledger_hashes_chain ~logger ~config:da_config
~source_ledger_hash:None ~target_ledger_hash:committed_ledger_hash
let%bind diffs =
Da_layer.Client.get_diffs_chain ~logger ~config:da_config
~source_ledger_hash:`Genesis ~target_ledger_hash:committed_ledger_hash
|> Deferred.map ~f:Or_error.ok_exn
in
let%bind () =
Deferred.List.iter ~how:`Sequential ledger_hashes_chain
~f:(fun ledger_hash ->
let%bind diff : Da_layer.Diff.t Deferred.t =
Da_layer.Client.get_diff ~logger ~config:da_config ~ledger_hash
>>| Or_error.ok_exn
in
Deferred.List.iter ~how:`Sequential diffs ~f:(fun diff ->
assert (
Ledger_hash.equal
(Da_layer.Diff.Stable.Latest.source_ledger_hash diff)
Expand Down Expand Up @@ -1121,6 +1116,7 @@ let%test_module "Sequencer tests" =
}

let%test_unit "apply commands and commit" =
print_endline "Started test 'apply commands and commit'" ;
Quickcheck.test ~trials:1 (Sequencer_test_spec.gen ())
~f:(fun { zkapp_keypair; signer; ephemeral_ledger; specs; sequencer } ->
let batch1, batch2 = List.split_n specs 3 in
Expand Down Expand Up @@ -1344,6 +1340,7 @@ let%test_module "Sequencer tests" =
final_ledger_hash ) )

let%test_unit "dummy signature should fail" =
print_endline "Started test 'dummy signature should fail'" ;
Quickcheck.test ~trials:1 (Sequencer_test_spec.gen ())
~f:(fun { zkapp_keypair; signer; ephemeral_ledger; specs; sequencer } ->
let dummy_signature_command : Zkapp_command.t =
Expand Down Expand Up @@ -1380,6 +1377,7 @@ let%test_module "Sequencer tests" =
Error.raise unexpected_error )

let%test_unit "deposits" =
print_endline "Started test 'deposits'" ;
Quickcheck.test ~trials:1 (Sequencer_test_spec.gen ~delay_deposit:2 ())
~f:(fun { zkapp_keypair; signer; ephemeral_ledger; specs; sequencer } ->
(* Create l1 accounts *)
Expand Down

0 comments on commit 4953faf

Please sign in to comment.