Skip to content

Commit

Permalink
Parallel proving of transactions (#227)
Browse files Browse the repository at this point in the history
* Add analytics to the sequencer

* Add analytics gql api

* Add average fee to analytics

* Add prover for txn snark

* Make prover general

* Add prover client

* Fix da diff timestamp backwards compatibility

* Add round robin to prover client

* Implement timeout and retry into prover client

* Add provers to the sequencer cli

* Add connection error handling

* Add remaining proofs to the prover

* Remove dependency on circuits in sequencer

* Remove sequencer module functor

* Add parallel prover library

* Ensure correct ordering of commits

* Add context to the parallel prover

* Renam prover to merger

* Integrate into the sequencer

* Clean up old snark queue

* Fix pool size check

* Add round robin again

* Add requeuing

* Cleanup parallel merger

* Add spec for parallel merger

* Add prover spec

* Typos

* More typos
  • Loading branch information
MartinOndejka authored Nov 25, 2024
1 parent c27a7a3 commit c3f4ef2
Show file tree
Hide file tree
Showing 26 changed files with 3,463 additions and 2,370 deletions.
4 changes: 2 additions & 2 deletions src/app/zeko/da_layer/lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module Rpc = struct
go max_tries []

let post_diff ~logger ~node_location ~ledger_openings ~diff =
dispatch ~logger node_location Rpc.Post_diff.v1 { ledger_openings; diff }
dispatch ~logger node_location Rpc.Post_diff.v2 { ledger_openings; diff }

let get_diff ~logger ~node_location ~ledger_hash =
dispatch ~max_tries:1 ~logger node_location Rpc.Get_diff.v2 ledger_hash
Expand Down Expand Up @@ -198,7 +198,7 @@ let distribute_genesis_diff ~logger ~config ~ledger =
~f:(fun acc (index, _) -> Sparse_ledger.set_exn acc index Account.empty)
in
let diff =
Diff.create
Diff.Without_timestamp.create
~source_ledger_hash:(Diff.empty_ledger_hash ~depth:(Ledger.depth ledger))
~changed_accounts ~command_with_action_step_flags:None
in
Expand Down
63 changes: 44 additions & 19 deletions src/app/zeko/da_layer/lib/diff.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,39 @@ open Core_kernel
open Mina_base
open Mina_ledger

module Without_timestamp = struct
[%%versioned
module Stable = struct
module V1 = struct
type t =
{ source_ledger_hash : Ledger_hash.Stable.V1.t
(** Source ledger hash of the diff *)
; changed_accounts : (int * Account.Stable.V2.t) list
(** List of changed accounts with corresponding index in the ledger *)
; command_with_action_step_flags :
(User_command.Stable.V2.t * bool list) option
(** Optionally add command with corresponding action steps to store the history *)
}
[@@deriving yojson, fields, sexp_of]

let to_latest = Fn.id
end
end]

let create ~source_ledger_hash ~changed_accounts
~command_with_action_step_flags =
{ source_ledger_hash; changed_accounts; command_with_action_step_flags }
end

[%%versioned
module Stable = struct
module V2 = struct
type t = Without_timestamp.Stable.V1.t * Block_time.Stable.V1.t
[@@deriving yojson, sexp_of]

let to_latest = Fn.id
end

module V1 = struct
type t =
{ source_ledger_hash : Ledger_hash.Stable.V1.t
Expand All @@ -16,13 +47,22 @@ module Stable = struct
}
[@@deriving yojson, fields, sexp_of]

let to_latest = Fn.id
let to_latest t = (Block_time.zero, t)
end
end]

let create ~source_ledger_hash ~changed_accounts ~command_with_action_step_flags
=
{ source_ledger_hash; changed_accounts; command_with_action_step_flags }
let changed_accounts { Without_timestamp.Stable.V1.changed_accounts; _ } =
changed_accounts

let source_ledger_hash { Without_timestamp.Stable.V1.source_ledger_hash; _ } =
source_ledger_hash

let command_with_action_step_flags
{ Without_timestamp.Stable.V1.command_with_action_step_flags; _ } =
command_with_action_step_flags

let add_time ~logger (t : Without_timestamp.t) : t =
(t, Block_time.now (Block_time.Controller.basic ~logger))

let to_bigstring = Binable.to_bigstring (module Stable.Latest)

Expand All @@ -31,18 +71,3 @@ let of_bigstring = Binable.of_bigstring (module Stable.Latest)
(** [Ledger_hash.empty_hash] is [zero], so we need this for the genesis state of the rollup *)
let empty_ledger_hash ~depth =
Ledger.merkle_root @@ Ledger.create_ephemeral ~depth ()

module With_timestamp = struct
[%%versioned
module Stable = struct
module V1 = struct
type t = Stable.V1.t * Block_time.Stable.V1.t [@@deriving yojson, sexp_of]

let to_latest = Fn.id
end
end]

let to_bigstring = Binable.to_bigstring (module Stable.Latest)

let of_bigstring = Binable.of_bigstring (module Stable.Latest)
end
16 changes: 8 additions & 8 deletions src/app/zeko/da_layer/lib/node.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module Db = struct

module Key_value = struct
type _ t =
| Diff : (Ledger_hash.t * Diff.With_timestamp.t) t
| Diff : (Ledger_hash.t * Diff.t) t
| Diff_index : (unit * Index.t) t

let serialize_key : type k v. (k * v) t -> k -> Bigstring.t =
Expand All @@ -40,15 +40,15 @@ module Db = struct
fun pair_type value ->
match pair_type with
| Diff ->
Diff.With_timestamp.to_bigstring value
Diff.to_bigstring value
| Diff_index ->
Index.to_bigstring value

let deserialize_value : type k v. (k * v) t -> Bigstring.t -> v =
fun pair_type data ->
match pair_type with
| Diff ->
Diff.With_timestamp.of_bigstring data
Diff.of_bigstring data
| Diff_index ->
Index.of_bigstring data
end
Expand Down Expand Up @@ -124,7 +124,9 @@ let post_diff t ~ledger_openings ~diff =
| false ->
Ok ()
| true ->
Error (Error.create "Duplicate indices" diff [%sexp_of: Diff.t])
Error
(Error.create "Duplicate indices" diff
[%sexp_of: Diff.Without_timestamp.t] )
in

(* 4 *)
Expand Down Expand Up @@ -249,9 +251,7 @@ let post_diff t ~ledger_openings ~diff =
in

(* 7 *)
let diff =
(diff, Block_time.now (Block_time.Controller.basic ~logger:t.logger))
in
let diff = Diff.add_time ~logger diff in

(* 8 *)
(* We don't care if the diff already existed *)
Expand Down Expand Up @@ -327,7 +327,7 @@ let get_signature t ~ledger_hash =
let implementations t =
Async.Rpc.Implementations.create_exn ~on_unknown_rpc:`Raise
~implementations:
[ Async.Rpc.Rpc.implement Rpc.Post_diff.v1
[ Async.Rpc.Rpc.implement Rpc.Post_diff.v2
(fun () { ledger_openings; diff } ->
match post_diff t ~ledger_openings ~diff with
| Ok signature ->
Expand Down
10 changes: 5 additions & 5 deletions src/app/zeko/da_layer/lib/rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ module Post_diff = struct
module Query = struct
[%%versioned
module Stable = struct
module V1 = struct
module V2 = struct
type t =
{ ledger_openings : Sparse_ledger.Stable.V2.t
; diff : Diff.Stable.V1.t
; diff : Diff.Without_timestamp.Stable.V1.t
}

let to_latest = Fn.id
end
end]
end

let v1 : (Query.t, Signature.t) Rpc.Rpc.t =
Rpc.Rpc.create ~name:"Post_diff" ~version:1 ~bin_query:Query.Stable.V1.bin_t
let v2 : (Query.t, Signature.t) Rpc.Rpc.t =
Rpc.Rpc.create ~name:"Post_diff" ~version:2 ~bin_query:Query.Stable.V2.bin_t
~bin_response:Signature.Stable.V1.bin_t
end

Expand All @@ -31,7 +31,7 @@ module Get_diff = struct
[%%versioned
module Stable = struct
module V2 = struct
type t = Diff.With_timestamp.Stable.V1.t option
type t = Diff.Stable.V2.t option

let to_latest = Fn.id
end
Expand Down
114 changes: 114 additions & 0 deletions src/app/zeko/parallel_merger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Parallel merger

## Abstraction

This is a library used by sequencer to parallelize the merging of transaction snarks.
It tries to abstract away from the actual logic of sequencer, so it accepts 3 modules as arguments into the functor.
All of them have `type t` that represents a witness and `process` function that takes a witness and returns a deferred result.

Context is a module that is used by sequencer to provide handles to the e.g. `da_client` or `prover_client` and also takes care of persisting and restarting the proving process in case of a sequencer restart.

```ocaml
Context : sig
type t
end
```

```ocaml
Merge : sig
type t
val process : Context.t -> t -> t -> t Deferred.t
end
```

```ocaml
Base : sig
type t
val process : Context.t -> t -> Merge.t Deferred.t
end
```

```ocaml
Commit : sig
type t
val process : Context.t -> t -> Merge.t -> unit Deferred.t
end
```

## Inner workings

```ocaml
module Tree = struct
type t =
{ mutable jobs : Job_status.t With_id.t list
; mutable closed : bool (** No new jobs can be added *)
; finished : Finished_job.t Ivar.t (** All jobs are done *)
; ready_to_commit : unit Ivar.t (** All jobs are done and ready to commit *)
}
end
module Forest = struct
type t = { mutable trees : Tree.t list }
end
```

Proving of transactions is done in a forest of trees, where each tree consists of one batch of transactions to commit.
The tree is represented by a list of jobs. It's list because we care only about actionable jobs and not the nodes that have already been proven or don't have a witness yet.

### Job

```ocaml
module Available_job = struct
type t = Base of Base.t | Merge of Merge.t * Merge.t
end
module Finished_job = struct
type t = Merge.t
end
module Job_status = struct
type t = Todo of Available_job.t | Done of Finished_job.t
end
```

The whole logic implemented revolves around 2 rules:

1. If there is a `Todo` job, process it.
2. If there are 2 `Done` jobs next to each other in one tree, merge them.

## Adding a job

```ocaml
let add_job t ctx ~(data : Base.t)
```

Adding a job creates a `Todo (Base witness)` job at the end of a last tree and calls a process function.
After completion it is transformed into a `Done (Merge witness)` job by `finish_job_exn` function, and checks if it created an opportunity to merge.
Opportunity to merge are two consecutive `Done (Merge witness)` jobs in one tree.
Finished job can create only one opportunity to merge. If there is one, create new `Todo (Merge (fst, snd))` job and process it.

Additionally all the jobs have unique id, so we can track them.

```ocaml
module With_id = struct
type 'd t = { id : string; value : 'd } [@@deriving sexp, yojson]
end
```

## Committing

```ocaml
let commit t ctx ~commit_witness
```

Sequencer can commit a tree at any time. Committing in proving context means that current tree should be closed (no new jobs added) and once it's all proven it should be committed.
Closing a tree means that there needs to be a new tree created where all the new jobs will go to.
Commit function will:

1. Close the last tree and create a new one.
2. Wait for the previous tree to finish (because of the order of commits).
3. Wait until there is only one `Done (Merge w)` job left in the list.
4. Call `Commit.process`
11 changes: 11 additions & 0 deletions src/app/zeko/parallel_merger/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
(env
(_
(flags
(:standard -w -4))))

(library
(name parallel_merger)
(inline_tests)
(libraries async core_kernel yojson sexplib sexplib0)
(preprocess
(pps ppx_jane ppx_mina)))
Loading

0 comments on commit c3f4ef2

Please sign in to comment.