From df79f9c161ce675ddfb6434fea686ce3c16fc678 Mon Sep 17 00:00:00 2001 From: Paul-Elliot Date: Thu, 4 May 2023 17:43:32 +0200 Subject: [PATCH 01/14] WIP: add upload pack Signed-off-by: Paul-Elliot Co-authored-by: Jules Aguillon --- src/not-so-smart/dune | 2 +- src/not-so-smart/nss.ml | 1 + src/not-so-smart/smart_git.ml | 19 +++++++ src/not-so-smart/smart_git_intf.ml | 72 ++++++++++++++++++++++++- src/not-so-smart/upload_pack.ml | 86 ++++++++++++++++++++++++++++++ test/smart/dune | 1 + 6 files changed, 179 insertions(+), 2 deletions(-) create mode 100644 src/not-so-smart/upload_pack.ml diff --git a/src/not-so-smart/dune b/src/not-so-smart/dune index d6374a147..af82497d3 100644 --- a/src/not-so-smart/dune +++ b/src/not-so-smart/dune @@ -43,7 +43,7 @@ (library (name nss) (public_name git.nss) - (modules nss fetch push) + (modules nss fetch push upload_pack) (libraries fmt result diff --git a/src/not-so-smart/nss.ml b/src/not-so-smart/nss.ml index ec5235b3a..31109b73a 100644 --- a/src/not-so-smart/nss.ml +++ b/src/not-so-smart/nss.ml @@ -1,2 +1,3 @@ module Fetch = Fetch module Push = Push +module Upload_pack = Upload_pack diff --git a/src/not-so-smart/smart_git.ml b/src/not-so-smart/smart_git.ml index 058d21502..fe9a6b514 100644 --- a/src/not-so-smart/smart_git.ml +++ b/src/not-so-smart/smart_git.ml @@ -357,6 +357,7 @@ struct module Flow = Unixiz.Make (Mimic) module Fetch = Nss.Fetch.Make (Scheduler) (Lwt) (Flow) (Uid) (Ref) module Push = Nss.Push.Make (Scheduler) (Lwt) (Flow) (Uid) (Ref) + module Upload_pack = Nss.Upload_pack.Make (Scheduler) (Lwt) (Flow) (Uid) (Ref) let fetch_v1 ?(uses_git_transport = false) ~push_stdout ~push_stderr ~capabilities path flow ?deepen ?want hostname store access fetch_cfg pack @@ -666,3 +667,21 @@ struct | Failure err -> Lwt.return_error (R.msg err) | exn -> Lwt.return_error (`Exn exn) end + +module Make_server + (Scheduler : Sigs.SCHED with type +'a s = 'a Lwt.t) + (Pack : APPEND with type +'a fiber = 'a Lwt.t) + (Index : APPEND with type +'a fiber = 'a Lwt.t) + (Uid : UID) + (Ref : Sigs.REF) = +struct + let upload_pack : + (Uid.t, _, _, 'g, Scheduler.t) Sigs.access + * Uid.t Carton_lwt.Thin.light_load + * Uid.t Carton_lwt.Thin.heavy_load -> + (Uid.t, _, 'g) Sigs.store -> + flow:Mimic.flow -> + (unit, ([> `Exn of exn ] as 'err)) result Lwt.t = + fun (access, light_load, heavy_load) store ~flow -> + Mimic.close flow >>= fun () -> Lwt.return_ok () +end diff --git a/src/not-so-smart/smart_git_intf.ml b/src/not-so-smart/smart_git_intf.ml index a67bed37c..021a5a8d3 100644 --- a/src/not-so-smart/smart_git_intf.ml +++ b/src/not-so-smart/smart_git_intf.ml @@ -119,7 +119,7 @@ module type SMART_GIT = sig val git_uri : Uri.t Mimic.value - module Make + module Make_client (Scheduler : Sigs.SCHED with type +'a s = 'a Lwt.t) (Pack : APPEND with type +'a fiber = 'a Lwt.t) (Index : APPEND with type +'a fiber = 'a Lwt.t) @@ -162,4 +162,74 @@ module type SMART_GIT = sig [ `Create of Ref.t | `Delete of Ref.t | `Update of Ref.t * Ref.t ] list -> (unit, ([> `Exn of exn | Mimic.error ] as 'err)) result Lwt.t end + + module Make_server + (Scheduler : Sigs.SCHED with type +'a s = 'a Lwt.t) + (Pack : APPEND with type +'a fiber = 'a Lwt.t) + (Index : APPEND with type +'a fiber = 'a Lwt.t) + (Uid : UID) + (Ref : Sigs.REF) : sig + val upload_pack : + (Uid.t, _, _, 'g, Scheduler.t) Sigs.access + * Uid.t Carton_lwt.Thin.light_load + * Uid.t Carton_lwt.Thin.heavy_load -> + (Uid.t, _, 'g) Sigs.store -> + flow:Mimic.flow -> + (* -> capabilities:Smart.Capability.t list empty list for now *) + (unit, ([> `Exn of exn ] as 'err)) result Lwt.t + (** Answers a [git fetch] *) + + val receive_pack : + (Uid.t, _, Uid.t, 'g, Scheduler.t) Sigs.access + * Uid.t Carton_lwt.Thin.light_load + * Uid.t Carton_lwt.Thin.heavy_load -> + (Uid.t, _, 'g) Sigs.store -> + flow:Mimic.flow -> + ?version:[> `V1 ] -> + ?capabilities:Smart.Capability.t list -> + Pack.t -> + Index.t -> + src:Pack.uid -> + dst:Pack.uid -> + idx:Index.uid -> + (unit, ([> `Exn of exn ] as 'err)) result Lwt.t + (** Answers a [git push] *) + + val fetch : + ?push_stdout:(string -> unit) -> + ?push_stderr:(string -> unit) -> + ?bounds:int -> + ?threads:int -> + ctx:Mimic.ctx -> + (Uid.t, _, Uid.t * int ref * int64, 'g, Scheduler.t) Sigs.access + * Uid.t Carton_lwt.Thin.light_load + * Uid.t Carton_lwt.Thin.heavy_load -> + (Uid.t, Uid.t * int ref * int64, 'g) Sigs.store -> + Endpoint.t -> + ?version:[> `V1 ] -> + ?capabilities:Smart.Capability.t list -> + ?deepen:[ `Depth of int | `Timestamp of int64 ] -> + [ `All | `Some of Ref.t list | `None ] -> + Pack.t -> + Index.t -> + src:Pack.uid -> + dst:Pack.uid -> + idx:Index.uid -> + ( [ `Pack of Uid.t * (Ref.t * Uid.t) list | `Empty ], + ([> `Exn of exn | Mimic.error ] as 'err) ) + result + Lwt.t + + val push : + ctx:Mimic.ctx -> + (Uid.t, Ref.t, Uid.t Pck.t, 'g, Scheduler.t) Sigs.access + * Uid.t Carton_lwt.Thin.light_load + * Uid.t Carton_lwt.Thin.heavy_load -> + (Uid.t, Uid.t Pck.t, 'g) Sigs.store -> + Endpoint.t -> + ?version:[> `V1 ] -> + ?capabilities:Smart.Capability.t list -> + [ `Create of Ref.t | `Delete of Ref.t | `Update of Ref.t * Ref.t ] list -> + (unit, ([> `Exn of exn | Mimic.error ] as 'err)) result Lwt.t + end end diff --git a/src/not-so-smart/upload_pack.ml b/src/not-so-smart/upload_pack.ml new file mode 100644 index 000000000..9869041c7 --- /dev/null +++ b/src/not-so-smart/upload_pack.ml @@ -0,0 +1,86 @@ +open Rresult + +type configuration = { stateless : bool } + +let configuration ?(stateless = false) () = { stateless } + +module S = Sigs + +module Make + (Scheduler : S.SCHED) + (IO : S.IO with type 'a t = 'a Scheduler.s) + (Flow : S.FLOW with type 'a fiber = 'a Scheduler.s) + (Uid : S.UID) + (Ref : S.REF) = +struct + let src = Logs.Src.create "upload-pack" + + module Log = (val Logs.src_log src : Logs.LOG) + open Scheduler + + let ( >>= ) x f = IO.bind x f + let return x = IO.return x + let ( >>| ) x f = x >>= fun x -> return (f x) + + let sched = + S. + { + bind = (fun x f -> inj (prj x >>= fun x -> prj (f x))); + return = (fun x -> inj (return x)); + } + + let fail exn = inj (IO.fail exn) + + let io = + S. + { + recv = (fun flow raw -> inj (Flow.recv flow raw)); + send = (fun flow raw -> inj (Flow.send flow raw)); + pp_error = Flow.pp_error; + } + + let upload_pack flow store access pack = + let fiber ctx = + let open Smart in + let adv_ref = Advertised_refs.v1 ~capabilities:[] [] (* TODO *) in + let* () = send ctx send_advertised_refs adv_ref in + recv ctx recv_want + in + let ctx = Smart.Context.make ~client_caps in + Smart_flow.run sched fail io flow (fiber ctx) |> prj >>= fun wants -> + let rec go = + let fiber ctx = + let open Smart in + let* h = recv ctx recv_have in + let haves = Smart.a in + return (w, h) Smart_flow.run sched fail io flow haves |> prj + >>= fun haves -> _ + in + go + (* + Not implemented: send shallow information + Go: + - recv have: string list * [`Flush | `Done] + - send acks for each commit acknowledged *) + >>= + fun haves -> + Pck.get_uncommon_objects sched ~compare:Uid.compare access store + ~exclude:haves ~sources:wants + |> prj + >>= fun uids -> + Log.debug (fun m -> m "Prepare a pack of %d object(s)." (List.length uids)); + let stream = pack uids in + let side_band = + Smart.Context.is_cap_shared ctx `Side_band + || Smart.Context.is_cap_shared ctx `Side_band_64k + in + let pack = Smart.send_pack side_band in + let rec go () = + stream () >>= function + | None -> return () + | Some payload -> + Smart_flow.run sched fail io flow Smart.(send ctx pack payload) |> prj + >>= fun () -> go () + in + go () +end diff --git a/test/smart/dune b/test/smart/dune index 42605220d..254f99dd3 100644 --- a/test/smart/dune +++ b/test/smart/dune @@ -8,6 +8,7 @@ lwt_backend ref store_backend + server test uid unix_backend From 1970851233011946ccd343122364d0c89adce5f3 Mon Sep 17 00:00:00 2001 From: Paul-Elliot Date: Thu, 4 May 2023 17:44:05 +0200 Subject: [PATCH 02/14] Add test for server side Signed-off-by: Paul-Elliot Co-authored-by: Jules Aguillon --- test/smart/server.ml | 169 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 test/smart/server.ml diff --git a/test/smart/server.ml b/test/smart/server.ml new file mode 100644 index 000000000..1fe383221 --- /dev/null +++ b/test/smart/server.ml @@ -0,0 +1,169 @@ +open Bos +open Rresult +open Lwt_backend +open Store_backend + +(** to keep track of directories created by unit tests and clean them up afterwards *) +module Tmp_dirs = struct + let rm_r dir = OS.Dir.delete ~recurse:true dir |> ignore + let t = ref Fpath.Set.empty + let add file = t := Fpath.Set.add file !t + + let remove_all () = + Fpath.Set.iter rm_r !t; + t := Fpath.Set.empty + + let are_valid = ref true +end + +let ( >>? ) x f = + let open Lwt.Infix in + x >>= function Ok x -> f x | Error err -> Lwt.return_error err + +module Option = struct + include Option + + let value_else o ~else_ = match o with Some v -> v | None -> else_ () +end + +let create_tmp_dir ?(mode = 0o700) ?prefix_path pat = + let dir = Option.value_else prefix_path ~else_:OS.Dir.default_tmp in + let failed_too_many_times () = + R.error_msgf + "create temporary directory %s in %a: too many failing attempts" + (Fmt.str pat "XXXXXX") Fpath.pp dir + in + let rec loop count = + if count < 0 then failed_too_many_times () + else + let dir = + let rand = Random.bits () land 0xffffff in + Fpath.(dir / Fmt.str pat (Fmt.str "%06x" rand)) + in + try + Ok + (Unix.mkdir (Fpath.to_string dir) mode; + dir) + with + | Unix.Unix_error (Unix.EEXIST, _, _) -> loop (count - 1) + | Unix.Unix_error (Unix.EINTR, _, _) -> loop count + | Unix.Unix_error (e, _, _) -> + R.error_msgf "create temporary directory %s in %a: %s" + (Fmt.str pat "XXXXXX") Fpath.pp dir (Unix.error_message e) + in + match loop 10000 with + | Ok dir as r -> + Tmp_dirs.add dir; + r + | Error _ as e -> e + +let git_version = + match + Bos.( + OS.Cmd.run_out Cmd.(v "git" % "--version") |> OS.Cmd.out_string ~trim:true) + with + | Error (`Msg err) -> failwith err + | Ok (str, _) -> ( + match Git_version.parse str with + | Some version -> version + | None -> Fmt.failwith "Impossible to parse the Git version: %s" str) + +let v2_28_0 = + { + Git_version.major = 2; + minor = 28; + patch = Some "0"; + revision = None; + release_candidate = None; + } + +let git_init_with_branch branch = + let open Bos in + let open Rresult in + if Git_version.compare git_version v2_28_0 < 0 then + OS.Cmd.run Cmd.(v "git" % "init") >>= fun () -> + OS.Cmd.run Cmd.(v "git" % "config" % "init.defaultBranch" % branch) + else OS.Cmd.run Cmd.(v "git" % "init" % "-b" % branch) + +let create_new_git_store _sw = + let create () = + (* XXX(dinosaure): a hook is already added by [Bos] to delete the + directory. *) + create_tmp_dir "git-%s" >>= fun root -> + OS.Dir.with_current root git_init_with_branch "master" |> R.join + >>= fun () -> + let access = access lwt in + let light_load uid = lightly_load lwt root uid |> Scheduler.prj in + let heavy_load uid = heavily_load lwt root uid |> Scheduler.prj in + let store = store_inj { path = root; tbl = Hashtbl.create 0x100 } in + R.ok ((access, light_load, heavy_load), store) + in + match create () with + | Ok res -> Lwt.return res + | Error err -> Fmt.failwith "%a" R.pp_msg err + +module Git_sync = + Smart_git.Make_server (Scheduler) (Append) (Append) (Uid) (Ref) + +let loopback_endpoint, loopback = + Mimic.register ~name:"loopback" (module Loopback) + +let ctx_with_payloads ?(transmission = `Git) payloads = + Mimic.empty + |> Mimic.add loopback_endpoint payloads + |> Mimic.add Smart_git.git_transmission transmission + +let flow_with_payloads ?(transmission = `Git) payloads = + let ctx = ctx_with_payloads ~transmission payloads in + Mimic.resolve ctx + +let handle_error = function + | Ok x -> Lwt.return x + | Error (`Exn exn) -> Alcotest.failf "%s" (Printexc.to_string exn) + | Error (#Mimic.error as err) -> Alcotest.failf "%a" Mimic.pp_error err + | Error `Invalid_flow -> Alcotest.fail "Invalid flow" + +let test_cancelled_fetch () = + Alcotest_lwt.test_case "cancelled fetch" `Quick @@ fun sw () -> + let open Lwt.Infix in + let run () = + (* let capabilities = [] in *) + let payloads = [ "\x30\x30\x30\x30" (* 0000 *) ] in + create_new_git_store sw >>= fun (access, store) -> + let { path; _ } = store_prj store in + let pack, index = + ( Fpath.(path / ".git" / "objects" / "pack"), + Fpath.(path / ".git" / "objects" / "pack") ) + in + flow_with_payloads (payloads, ignore) >>? fun flow -> + Git_sync.upload_pack ~flow access store pack index + in + (* TODO: Test that the flow receive the expected response: + - List of references (head, master) *) + run () >>= handle_error + +let test_fetch_all () = + Alcotest_lwt.test_case "fetch all" `Quick @@ fun sw () -> + let open Lwt.Infix in + let run () = + Lwt.return @@ Ok () + (* Client send 0009done *) + (* Server should send: + - List of refs + - pack containing all commits *) + in + run () >>= handle_error + +let test = + Alcotest_lwt.run "smart" [ "regression", [ test_cancelled_fetch () ] ] + +let tmp = "tmp" + +let () = + let fiber = + OS.Dir.current () >>= fun current -> + OS.Dir.create Fpath.(current / tmp) >>= fun _ -> R.ok Fpath.(current / tmp) + in + let tmp = R.failwith_error_msg fiber in + OS.Dir.set_default_tmp tmp; + Lwt_main.run test From bbd53b0f0359dd8d0a34c9f1b6ba9c454ab9902b Mon Sep 17 00:00:00 2001 From: Paul-Elliot Date: Thu, 4 May 2023 19:32:15 +0200 Subject: [PATCH 03/14] WIP upload-pack Signed-off-by: Paul-Elliot --- src/not-so-smart/upload_pack.ml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/not-so-smart/upload_pack.ml b/src/not-so-smart/upload_pack.ml index 9869041c7..ffbf38f21 100644 --- a/src/not-so-smart/upload_pack.ml +++ b/src/not-so-smart/upload_pack.ml @@ -48,11 +48,20 @@ struct in let ctx = Smart.Context.make ~client_caps in Smart_flow.run sched fail io flow (fiber ctx) |> prj >>= fun wants -> - let rec go = + let rec go haves = let fiber ctx = let open Smart in - let* h = recv ctx recv_have in - let haves = Smart.a in + let* h, cmd = recv ctx recv_have in + let haves = h @ haves in + match cmd with + | `Done -> + (* let common_base = compute_common_base store access wants in *) + return (new_have @ haves) + | `Flush -> + let acks = _ store access haves in + let* () = send ctx send_ack acks in + go haves + in return (w, h) Smart_flow.run sched fail io flow haves |> prj >>= fun haves -> _ in From 5c991f70aaf84124c14d1dcfbdf273bc629c82d7 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Fri, 5 May 2023 14:08:55 +0200 Subject: [PATCH 04/14] Remove constructor Negotiation.NAK This messages need not to be passed around like ACKs and is moved to a super type. Co-authored-by: Paul-Elliot --- src/not-so-smart/find_common.ml | 119 ++++++++++++++++---------------- src/not-so-smart/protocol.ml | 19 ++--- src/not-so-smart/protocol.mli | 6 +- src/not-so-smart/smart.ml | 2 +- src/not-so-smart/smart.mli | 4 +- 5 files changed, 73 insertions(+), 77 deletions(-) diff --git a/src/not-so-smart/find_common.ml b/src/not-so-smart/find_common.ml index 8518186a2..0c99622ec 100644 --- a/src/not-so-smart/find_common.ml +++ b/src/not-so-smart/find_common.ml @@ -174,56 +174,58 @@ let find_common (type t) scheduler io flow cfg let rec loop () = Smart_flow.run scheduler raise io flow Smart.(recv ctx recv_ack) - >>| Smart.Negotiation.map ~f:of_hex - >>= fun ack -> - match ack with - | Smart.Negotiation.NAK -> + >>= function + | `NAK -> Log.debug (fun m -> m "Receive NAK."); return `Continue - | ACK _ -> - flushes := 0; - cfg.multi_ack <- `None; - (* XXX(dinosaure): [multi_ack] supported by the client but it - is not supported by the server. TODO: use [Context.shared]. *) - retval := 0; - return `Done - | ACK_common uid | ACK_ready uid | ACK_continue uid -> ( - access.get uid store >>= function - | None -> assert false - | Some obj -> - Default.ack scheduler ~parents:access.parents store - negotiator obj - >>= fun was_common -> - if - stateless - && Smart.Negotiation.is_common ack - && not was_common - then ( - (* we need to replay the have for this object on the next RPC request so - the peer kows it is in common with us. *) - Log.debug (fun m -> m "[+] have %s." (to_hex uid)); - unsafe_write_have ctx (to_hex uid); - (* reset [in_vain] because an ack for this commit has not been seen. *) - in_vain := 0; - retval := 0; - got_continue := true; - loop ()) - else if - (not stateless) - || not (Smart.Negotiation.is_common ack) - then ( - in_vain := 0; - retval := 0; - got_continue := true; - if Smart.Negotiation.is_ready ack then - got_ready := true; - loop ()) - else ( - retval := 0; - got_continue := true; - if Smart.Negotiation.is_ready ack then - got_ready := true; - loop ())) + | `ACK ack -> ( + let ack = Smart.Negotiation.map ~f:of_hex ack in + match ack with + | ACK _ -> + flushes := 0; + cfg.multi_ack <- `None; + (* XXX(dinosaure): [multi_ack] supported by the client but it + is not supported by the server. TODO: use [Context.shared]. *) + retval := 0; + return `Done + | ACK_common uid | ACK_ready uid | ACK_continue uid -> ( + access.get uid store >>= function + | None -> assert false + | Some obj -> + Default.ack scheduler ~parents:access.parents + store negotiator obj + >>= fun was_common -> + if + stateless + && Smart.Negotiation.is_common ack + && not was_common + then ( + (* we need to replay the have for this object on the next RPC request so + the peer kows it is in common with us. *) + Log.debug (fun m -> + m "[+] have %s." (to_hex uid)); + unsafe_write_have ctx (to_hex uid); + (* reset [in_vain] because an ack for this commit has not been seen. *) + in_vain := 0; + retval := 0; + got_continue := true; + loop ()) + else if + (not stateless) + || not (Smart.Negotiation.is_common ack) + then ( + in_vain := 0; + retval := 0; + got_continue := true; + if Smart.Negotiation.is_ready ack then + got_ready := true; + loop ()) + else ( + retval := 0; + got_continue := true; + if Smart.Negotiation.is_ready ack then + got_ready := true; + loop ()))) in loop () >>= function | `Done -> return () @@ -253,18 +255,19 @@ let find_common (type t) scheduler io flow cfg >>= fun () -> let rec go () = if !flushes > 0 || cfg.multi_ack = `Some || cfg.multi_ack = `Detailed - then ( + then Smart_flow.run scheduler raise io flow Smart.(recv ctx recv_ack) - >>| Smart.Negotiation.map ~f:of_hex - >>= fun ack -> - match ack with - | Smart.Negotiation.ACK _ -> return (`Continue 0) - | ACK_common _ | ACK_continue _ | ACK_ready _ -> - cfg.multi_ack <- `Some; - go () - | NAK -> + >>= function + | `NAK -> decr flushes; - go ()) + go () + | `ACK ack -> ( + let ack = Smart.Negotiation.map ~f:of_hex ack in + match ack with + | Smart.Negotiation.ACK _ -> return (`Continue 0) + | ACK_common _ | ACK_continue _ | ACK_ready _ -> + cfg.multi_ack <- `Some; + go ()) else if !count > 0 then return (`Continue !retval) else return (`Continue 0) in diff --git a/src/not-so-smart/protocol.ml b/src/not-so-smart/protocol.ml index 1c69a8660..efac7a0ca 100644 --- a/src/not-so-smart/protocol.ml +++ b/src/not-so-smart/protocol.ml @@ -204,25 +204,21 @@ module Negotiation = struct | ACK_continue of 'uid | ACK_ready of 'uid | ACK_common of 'uid - | NAK let is_common = function ACK_common _ -> true | _ -> false let is_ready = function ACK_ready _ -> true | _ -> false - let is_nak = function NAK -> true | _ -> false let pp ppf = function | ACK uid -> Fmt.pf ppf "ACK %s" uid | ACK_continue uid -> Fmt.pf ppf "ACK %s continue" uid | ACK_ready uid -> Fmt.pf ppf "ACK %s ready" uid | ACK_common uid -> Fmt.pf ppf "ACK %s common" uid - | NAK -> Fmt.pf ppf "NAK" let map ~f = function | ACK uid -> ACK (f uid) | ACK_continue uid -> ACK_continue (f uid) | ACK_ready uid -> ACK_ready (f uid) | ACK_common uid -> ACK_common (f uid) - | NAK -> NAK end module Commands = struct @@ -766,13 +762,13 @@ module Decoder = struct let pkt = peek_pkt decoder in if String.Sub.equal_bytes pkt v_nak then ( junk_pkt decoder; - return Negotiation.NAK decoder) + return `NAK decoder) else if String.Sub.is_prefix ~affix:v_ack pkt then match String.Sub.cuts ~sep:v_space pkt with | [ _; uid ] -> let uid = String.Sub.to_string uid in junk_pkt decoder; - return (Negotiation.ACK uid) decoder + return (`ACK (Negotiation.ACK uid)) decoder | [ _; uid; v ] -> ( let uid = String.Sub.to_string uid in match @@ -780,9 +776,9 @@ module Decoder = struct junk_pkt decoder; v with - | "continue" -> return (Negotiation.ACK_continue uid) decoder - | "ready" -> return (Negotiation.ACK_ready uid) decoder - | "common" -> return (Negotiation.ACK_common uid) decoder + | "continue" -> return (`ACK (Negotiation.ACK_continue uid)) decoder + | "ready" -> return (`ACK (Negotiation.ACK_ready uid)) decoder + | "common" -> return (`ACK (Negotiation.ACK_common uid)) decoder | _ -> fail decoder (`Invalid_ack (String.Sub.to_string pkt))) | _ -> fail decoder (`Invalid_ack (String.Sub.to_string pkt)) else ( @@ -1236,6 +1232,7 @@ module Encoder = struct let encode_acks encoder acks = (* TODO: Remove NACK from [Negotiation.t]. *) + let write_nak encoder = write encoder "NAK" in let write_ack ack encoder = let write_ack uid suffix = write encoder "ACK"; @@ -1253,12 +1250,10 @@ module Encoder = struct | ACK_continue uid -> write_ack uid (Some "continue") | ACK_ready uid -> write_ack uid (Some "ready") | ACK_common uid -> write_ack uid (Some "common") - | NAK -> write encoder "NAK" in let rec go acks encoder = match acks with - | [] -> - delayed_write_pkt (write_ack Negotiation.NAK) (flush kdone) encoder + | [] -> delayed_write_pkt write_nak (flush kdone) encoder | hd :: tl -> delayed_write_pkt (write_ack hd) (go tl) encoder in go acks encoder diff --git a/src/not-so-smart/protocol.mli b/src/not-so-smart/protocol.mli index 35cefa818..3171543eb 100644 --- a/src/not-so-smart/protocol.mli +++ b/src/not-so-smart/protocol.mli @@ -96,11 +96,9 @@ module Negotiation : sig | ACK_continue of 'uid | ACK_ready of 'uid | ACK_common of 'uid - | NAK val is_common : 'uid t -> bool val is_ready : 'uid t -> bool - val is_nak : 'uid t -> bool val pp : string t Fmt.t val map : f:('a -> 'b) -> 'a t -> 'b t end @@ -197,7 +195,9 @@ module Decoder : sig [> error ] ) state - val decode_negotiation : decoder -> (string Negotiation.t, [> error ]) state + val decode_negotiation : + decoder -> ([ `ACK of string Negotiation.t | `NAK ], [> error ]) state + val decode_shallows : decoder -> (string Shallow.t list, [> error ]) state val decode_flush : decoder -> (unit, [> error ]) state diff --git a/src/not-so-smart/smart.ml b/src/not-so-smart/smart.ml index 9183adb7b..6a2030b22 100644 --- a/src/not-so-smart/smart.ml +++ b/src/not-so-smart/smart.ml @@ -42,7 +42,7 @@ module Witness = struct | `Stdout | `Stderr ] recv - | Ack : string Negotiation.t recv + | Ack : [ `ACK of string Negotiation.t | `NAK ] recv | Flush : unit recv | Shallows : string Shallow.t list recv | Want : (string, string) Want.t option recv diff --git a/src/not-so-smart/smart.mli b/src/not-so-smart/smart.mli index fc92e865a..425940e6e 100644 --- a/src/not-so-smart/smart.mli +++ b/src/not-so-smart/smart.mli @@ -104,11 +104,9 @@ module Negotiation : sig | ACK_continue of 'uid | ACK_ready of 'uid | ACK_common of 'uid - | NAK val is_common : 'uid t -> bool val is_ready : 'uid t -> bool - val is_nak : 'uid t -> bool val pp : string t Fmt.t val map : f:('a -> 'b) -> 'a t -> 'b t end @@ -245,7 +243,7 @@ val recv_pack : val recv_flush : unit recv val recv_commands : (string, string) Commands.t option recv val send_acks : string Negotiation.t list send -val recv_ack : string Negotiation.t recv +val recv_ack : [ `ACK of string Negotiation.t | `NAK ] recv val shallows : string Shallow.t list recv val status : bool -> string Status.t recv val packet : trim:bool -> string recv From fba1bf7d95c9f416f5fdcedbaf1c38df81867372 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Fri, 5 May 2023 16:54:01 +0200 Subject: [PATCH 05/14] Expose Smart_git.Make_server Co-authored-by: Paul-Elliot --- src/git/sync.ml | 2 +- src/not-so-smart/smart_git.ml | 15 ++---- src/not-so-smart/smart_git_intf.ml | 78 ++++++++---------------------- src/not-so-smart/upload_pack.ml | 5 +- test/smart/test.ml | 3 +- 5 files changed, 31 insertions(+), 72 deletions(-) diff --git a/src/git/sync.ml b/src/git/sync.ml index 0dbc27511..971a6ae29 100644 --- a/src/git/sync.ml +++ b/src/git/sync.ml @@ -181,7 +181,7 @@ struct Lwt.return (Carton.Dec.v ~kind raw) | None -> Lwt.fail Not_found - include Smart_git.Make (Scheduler) (Pack) (Index) (Hash) (Reference) + include Smart_git.Make_client (Scheduler) (Pack) (Index) (Hash) (Reference) let ( >>? ) x f = x >>= function Ok x -> f x | Error err -> Lwt.return_error err diff --git a/src/not-so-smart/smart_git.ml b/src/not-so-smart/smart_git.ml index fe9a6b514..59a6fd9ad 100644 --- a/src/not-so-smart/smart_git.ml +++ b/src/not-so-smart/smart_git.ml @@ -179,7 +179,7 @@ module Endpoint = struct headers end -module Make +module Make_client (Scheduler : Sigs.SCHED with type +'a s = 'a Lwt.t) (Pack : APPEND with type +'a fiber = 'a Lwt.t) (Index : APPEND with type +'a fiber = 'a Lwt.t) @@ -670,18 +670,13 @@ end module Make_server (Scheduler : Sigs.SCHED with type +'a s = 'a Lwt.t) + (Flow : Sigs.FLOW with type +'a fiber = 'a Lwt.t) (Pack : APPEND with type +'a fiber = 'a Lwt.t) (Index : APPEND with type +'a fiber = 'a Lwt.t) (Uid : UID) (Ref : Sigs.REF) = struct - let upload_pack : - (Uid.t, _, _, 'g, Scheduler.t) Sigs.access - * Uid.t Carton_lwt.Thin.light_load - * Uid.t Carton_lwt.Thin.heavy_load -> - (Uid.t, _, 'g) Sigs.store -> - flow:Mimic.flow -> - (unit, ([> `Exn of exn ] as 'err)) result Lwt.t = - fun (access, light_load, heavy_load) store ~flow -> - Mimic.close flow >>= fun () -> Lwt.return_ok () + module Upload_pack = Nss.Upload_pack.Make (Scheduler) (Lwt) (Flow) (Uid) (Ref) + + let upload_pack = Upload_pack.upload_pack end diff --git a/src/not-so-smart/smart_git_intf.ml b/src/not-so-smart/smart_git_intf.ml index 021a5a8d3..9370ab4d2 100644 --- a/src/not-so-smart/smart_git_intf.ml +++ b/src/not-so-smart/smart_git_intf.ml @@ -165,71 +165,35 @@ module type SMART_GIT = sig module Make_server (Scheduler : Sigs.SCHED with type +'a s = 'a Lwt.t) + (Flow : Sigs.FLOW with type +'a fiber = 'a Lwt.t) (Pack : APPEND with type +'a fiber = 'a Lwt.t) (Index : APPEND with type +'a fiber = 'a Lwt.t) (Uid : UID) (Ref : Sigs.REF) : sig val upload_pack : - (Uid.t, _, _, 'g, Scheduler.t) Sigs.access + Flow.t -> + (Uid.t, 'b, Uid.t Pck.t, 'a, Scheduler.t) Sigs.access * Uid.t Carton_lwt.Thin.light_load * Uid.t Carton_lwt.Thin.heavy_load -> - (Uid.t, _, 'g) Sigs.store -> - flow:Mimic.flow -> - (* -> capabilities:Smart.Capability.t list empty list for now *) - (unit, ([> `Exn of exn ] as 'err)) result Lwt.t + (Uid.t, Uid.t Pck.t, 'a) Sigs.store -> + (Uid.t list -> unit -> string option Flow.fiber) -> + unit Flow.fiber (** Answers a [git fetch] *) - val receive_pack : - (Uid.t, _, Uid.t, 'g, Scheduler.t) Sigs.access - * Uid.t Carton_lwt.Thin.light_load - * Uid.t Carton_lwt.Thin.heavy_load -> - (Uid.t, _, 'g) Sigs.store -> - flow:Mimic.flow -> - ?version:[> `V1 ] -> - ?capabilities:Smart.Capability.t list -> - Pack.t -> - Index.t -> - src:Pack.uid -> - dst:Pack.uid -> - idx:Index.uid -> - (unit, ([> `Exn of exn ] as 'err)) result Lwt.t - (** Answers a [git push] *) - - val fetch : - ?push_stdout:(string -> unit) -> - ?push_stderr:(string -> unit) -> - ?bounds:int -> - ?threads:int -> - ctx:Mimic.ctx -> - (Uid.t, _, Uid.t * int ref * int64, 'g, Scheduler.t) Sigs.access - * Uid.t Carton_lwt.Thin.light_load - * Uid.t Carton_lwt.Thin.heavy_load -> - (Uid.t, Uid.t * int ref * int64, 'g) Sigs.store -> - Endpoint.t -> - ?version:[> `V1 ] -> - ?capabilities:Smart.Capability.t list -> - ?deepen:[ `Depth of int | `Timestamp of int64 ] -> - [ `All | `Some of Ref.t list | `None ] -> - Pack.t -> - Index.t -> - src:Pack.uid -> - dst:Pack.uid -> - idx:Index.uid -> - ( [ `Pack of Uid.t * (Ref.t * Uid.t) list | `Empty ], - ([> `Exn of exn | Mimic.error ] as 'err) ) - result - Lwt.t - - val push : - ctx:Mimic.ctx -> - (Uid.t, Ref.t, Uid.t Pck.t, 'g, Scheduler.t) Sigs.access - * Uid.t Carton_lwt.Thin.light_load - * Uid.t Carton_lwt.Thin.heavy_load -> - (Uid.t, Uid.t Pck.t, 'g) Sigs.store -> - Endpoint.t -> - ?version:[> `V1 ] -> - ?capabilities:Smart.Capability.t list -> - [ `Create of Ref.t | `Delete of Ref.t | `Update of Ref.t * Ref.t ] list -> - (unit, ([> `Exn of exn | Mimic.error ] as 'err)) result Lwt.t + (*val receive_pack : *) + (* (Uid.t, _, Uid.t, 'g, Scheduler.t) Sigs.access *) + (* * Uid.t Carton_lwt.Thin.light_load *) + (* * Uid.t Carton_lwt.Thin.heavy_load -> *) + (* (Uid.t, _, 'g) Sigs.store -> *) + (* flow:Mimic.flow -> *) + (* ?version:[> `V1 ] -> *) + (* ?capabilities:Smart.Capability.t list -> *) + (* Pack.t -> *) + (* Index.t -> *) + (* src:Pack.uid -> *) + (* dst:Pack.uid -> *) + (* idx:Index.uid -> *) + (* (unit, ([> `Exn of exn ] as 'err)) result Lwt.t *) + (*(1** Answers a [git push] *1) *) end end diff --git a/src/not-so-smart/upload_pack.ml b/src/not-so-smart/upload_pack.ml index ffbf38f21..4aee36eb3 100644 --- a/src/not-so-smart/upload_pack.ml +++ b/src/not-so-smart/upload_pack.ml @@ -1,5 +1,3 @@ -open Rresult - type configuration = { stateless : bool } let configuration ?(stateless = false) () = { stateless } @@ -39,7 +37,8 @@ struct pp_error = Flow.pp_error; } - let upload_pack flow store access pack = + let upload_pack flow (access, _light_load, _heavy_load) store pack = + let my_caps = [ `Multi_ack; `Side_band_64k; `Ofs_delta; `Thin_pack ] in let fiber ctx = let open Smart in let adv_ref = Advertised_refs.v1 ~capabilities:[] [] (* TODO *) in diff --git a/test/smart/test.ml b/test/smart/test.ml index 7c4d8bc61..2d9fe9b8c 100644 --- a/test/smart/test.ml +++ b/test/smart/test.ml @@ -377,7 +377,8 @@ let test_sync_fetch () = (* XXX(dinosaure): [tmp] without systemic deletion of directories. *) -module Git_sync = Smart_git.Make (Scheduler) (Append) (Append) (Uid) (Ref) +module Git_sync = + Smart_git.Make_client (Scheduler) (Append) (Append) (Uid) (Ref) (* TODO(dinosaure): we don't check what we sent, we should check that. *) From a8fe840c8130dd8d93c6a043e2e46cacdf4c9bae Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Fri, 5 May 2023 16:55:42 +0200 Subject: [PATCH 06/14] Upload_pack: Negotiation phase Co-authored-by: Paul-Elliot --- src/not-so-smart/protocol.ml | 3 + src/not-so-smart/protocol.mli | 3 + src/not-so-smart/smart.mli | 3 + src/not-so-smart/upload_pack.ml | 104 ++++++++++++++++++++++++-------- 4 files changed, 88 insertions(+), 25 deletions(-) diff --git a/src/not-so-smart/protocol.ml b/src/not-so-smart/protocol.ml index efac7a0ca..8451740ee 100644 --- a/src/not-so-smart/protocol.ml +++ b/src/not-so-smart/protocol.ml @@ -188,6 +188,7 @@ module Have = struct type 'uid t = 'uid list * [ `Done | `Flush ] let have ~cmd haves = haves, cmd + let map ~f (uids, cmd) = List.map f uids, cmd end module Result = struct @@ -205,6 +206,8 @@ module Negotiation = struct | ACK_ready of 'uid | ACK_common of 'uid + let mk_ack uid = ACK uid + let mk_continue uid = ACK_continue uid let is_common = function ACK_common _ -> true | _ -> false let is_ready = function ACK_ready _ -> true | _ -> false diff --git a/src/not-so-smart/protocol.mli b/src/not-so-smart/protocol.mli index 3171543eb..043f2c837 100644 --- a/src/not-so-smart/protocol.mli +++ b/src/not-so-smart/protocol.mli @@ -82,6 +82,7 @@ module Have : sig type 'uid t = private 'uid list * [ `Done | `Flush ] val have : cmd:[ `Done | `Flush ] -> 'uid list -> 'uid t + val map : f:('a -> 'b) -> 'a t -> 'b t end module Result : sig @@ -97,6 +98,8 @@ module Negotiation : sig | ACK_ready of 'uid | ACK_common of 'uid + val mk_ack : 'uid -> 'uid t + val mk_continue : 'uid -> 'uid t val is_common : 'uid t -> bool val is_ready : 'uid t -> bool val pp : string t Fmt.t diff --git a/src/not-so-smart/smart.mli b/src/not-so-smart/smart.mli index 425940e6e..6256bff10 100644 --- a/src/not-so-smart/smart.mli +++ b/src/not-so-smart/smart.mli @@ -90,6 +90,7 @@ module Have : sig type 'uid t = private 'uid list * [ `Done | `Flush ] val have : cmd:[ `Done | `Flush ] -> 'uid list -> 'uid t + val map : f:('a -> 'b) -> 'a t -> 'b t end module Result : sig @@ -105,6 +106,8 @@ module Negotiation : sig | ACK_ready of 'uid | ACK_common of 'uid + val mk_ack : 'uid -> 'uid t + val mk_continue : 'uid -> 'uid t val is_common : 'uid t -> bool val is_ready : 'uid t -> bool val pp : string t Fmt.t diff --git a/src/not-so-smart/upload_pack.ml b/src/not-so-smart/upload_pack.ml index 4aee36eb3..a2b5d7605 100644 --- a/src/not-so-smart/upload_pack.ml +++ b/src/not-so-smart/upload_pack.ml @@ -14,6 +14,7 @@ struct let src = Logs.Src.create "upload-pack" module Log = (val Logs.src_log src : Logs.LOG) + open Uid open Scheduler let ( >>= ) x f = IO.bind x f @@ -37,6 +38,51 @@ struct pp_error = Flow.pp_error; } + module Server_neg = struct + (** Server-side common base negotiation. *) + + type 'uid t = { + haves : 'uid list; + last_common : 'uid option; + has_common_base : bool; + } + + let empty = { haves = []; last_common = None; has_common_base = false } + + let compute_has_common_base _store (_access : _ S.access) ~wants:_ _t = + (* TODO: Compute whether all [wants] each have an ancestor in [t.haves]. *) + false + + let mk_continue uid = Smart.Negotiation.mk_continue uid + + (** Returns the commits that should be [ACK]ed and update the state. *) + let ack store (access : _ S.access) ~wants t new_haves = + let rec loop t acc = function + | [] -> return (t, List.rev acc) + | hd :: tl -> ( + access.get hd store |> prj >>= function + | Some _ -> + let has_common_base = + t.has_common_base + || compute_has_common_base store access ~wants t + in + loop + { t with has_common_base; haves = hd :: t.haves } + (mk_continue hd :: acc) tl + | None -> + loop t + (if t.has_common_base then mk_continue hd :: acc else acc) + tl) + in + loop t [] new_haves + + (** Return the final [ACK] or [None] if the negotiation failed. *) + let last_common t = + match t.last_common with + | Some uid -> Some (Smart.Negotiation.mk_ack uid) + | None -> None + end + let upload_pack flow (access, _light_load, _heavy_load) store pack = let my_caps = [ `Multi_ack; `Side_band_64k; `Ofs_delta; `Thin_pack ] in let fiber ctx = @@ -45,35 +91,43 @@ struct let* () = send ctx send_advertised_refs adv_ref in recv ctx recv_want in - let ctx = Smart.Context.make ~client_caps in + let ctx = Smart.Context.make ~my_caps in Smart_flow.run sched fail io flow (fiber ctx) |> prj >>= fun wants -> - let rec go haves = - let fiber ctx = - let open Smart in - let* h, cmd = recv ctx recv_have in - let haves = h @ haves in - match cmd with - | `Done -> - (* let common_base = compute_common_base store access wants in *) - return (new_have @ haves) - | `Flush -> - let acks = _ store access haves in - let* () = send ctx send_ack acks in - go haves + (* TODO: Check that all the [wants] are in the store and each are the tip of a ref. *) + Smart.Context.replace_their_caps ctx wants.Smart.Want.capabilities; + + let rec negotiate neg = + Smart_flow.run sched fail io flow Smart.(recv ctx recv_have) |> prj + >>= fun have -> + let h, cmd = + (Smart.Have.map ~f:of_hex have :> Uid.t list * [ `Done | `Flush ]) in - return (w, h) Smart_flow.run sched fail io flow haves |> prj - >>= fun haves -> _ + Server_neg.ack store access ~wants neg h >>= fun (neg, acks) -> + let acks = List.map (Smart.Negotiation.map ~f:to_hex) acks in + Smart_flow.run sched fail io flow Smart.(send ctx send_acks acks) |> prj + >>= fun () -> + match cmd with + | `Done -> ( + match Server_neg.last_common neg with + | Some ack -> + let ack = Smart.Negotiation.map ~f:to_hex ack in + Smart_flow.run sched fail io flow + Smart.(send ctx send_acks [ ack ]) + |> prj + >>= fun () -> return neg + | None -> + Smart_flow.run sched fail io flow Smart.(send ctx send_acks []) + |> prj + >>= fun () -> return neg) + | `Flush -> negotiate neg + in + negotiate Server_neg.empty >>= fun neg -> + let sources = + let a, b = wants.wants in + List.map of_hex (a :: b) in - go - (* - Not implemented: send shallow information - Go: - - recv have: string list * [`Flush | `Done] - - send acks for each commit acknowledged *) - >>= - fun haves -> Pck.get_uncommon_objects sched ~compare:Uid.compare access store - ~exclude:haves ~sources:wants + ~exclude:neg.haves ~sources |> prj >>= fun uids -> Log.debug (fun m -> m "Prepare a pack of %d object(s)." (List.length uids)); From cf89e437b6af3f2b69c26d855eb8353496f7661a Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Fri, 5 May 2023 17:32:54 +0200 Subject: [PATCH 07/14] Export Smart_git.Make_stream_pack This functor defines a function for constructing and streaming a pack file from a list of revs. Co-authored-by: Paul-Elliot --- src/not-so-smart/smart_git.ml | 191 +++++++++++++++-------------- src/not-so-smart/smart_git_intf.ml | 11 ++ test/smart/dune | 4 +- test/smart/server.ml | 60 +++++---- 4 files changed, 146 insertions(+), 120 deletions(-) diff --git a/src/not-so-smart/smart_git.ml b/src/not-so-smart/smart_git.ml index 59a6fd9ad..06da00514 100644 --- a/src/not-so-smart/smart_git.ml +++ b/src/not-so-smart/smart_git.ml @@ -179,6 +179,101 @@ module Endpoint = struct headers end +module Make_stream_pack (Uid : UID) = struct + type 'a stream = unit -> 'a option Lwt.t + + module Delta = Carton_lwt.Enc.Delta (Uid) (Verbose) + + let deltify ~light_load ~heavy_load ?(threads = 4) (uids : Uid.t list) = + let open Lwt.Infix in + let fold (uid : Uid.t) = + light_load uid >|= fun (kind, length) -> + Carton_lwt.Enc.make_entry ~kind ~length uid + in + Lwt_list.map_p fold uids >|= Array.of_list >>= fun entries -> + Delta.delta + ~threads:(List.init threads (fun _thread -> heavy_load)) + ~weight:10 ~uid_ln:Uid.length entries + >>= fun targets -> Lwt.return (entries, targets) + + let header = Bigstringaf.create 12 + + let pack ~(heavy_load : Uid.t Carton_lwt.Enc.load) stream targets = + let open Lwt.Infix in + let offsets = Hashtbl.create (Array.length targets) in + let find uid = + match Hashtbl.find offsets uid with + | v -> Lwt.return_some v + | exception Not_found -> Lwt.return_none + in + let uid = + { Carton.Enc.uid_ln = Uid.length; Carton.Enc.uid_rw = Uid.to_raw_string } + in + let b = + { + Carton.Enc.o = Bigstringaf.create De.io_buffer_size; + Carton.Enc.i = Bigstringaf.create De.io_buffer_size; + Carton.Enc.q = De.Queue.create 0x10000; + Carton.Enc.w = De.Lz77.make_window ~bits:15; + } + in + let ctx = ref Uid.empty in + let cursor = ref 0 in + Carton.Enc.header_of_pack ~length:(Array.length targets) header 0 12; + stream (Some (Bigstringaf.to_string header)); + ctx := Uid.feed !ctx header ~off:0 ~len:12; + cursor := !cursor + 12; + let encode_targets targets = + let encode_target idx = + Hashtbl.add offsets (Carton.Enc.target_uid targets.(idx)) !cursor; + Carton_lwt.Enc.encode_target ~b ~find ~load:heavy_load ~uid + targets.(idx) ~cursor:!cursor + >>= fun (len, encoder) -> + let rec go encoder = + match Carton.Enc.N.encode ~o:b.o encoder with + | `Flush (encoder, len) -> + let payload = Bigstringaf.substring b.o ~off:0 ~len in + stream (Some payload); + ctx := Uid.feed !ctx b.o ~off:0 ~len; + cursor := !cursor + len; + let encoder = + Carton.Enc.N.dst encoder b.o 0 (Bigstringaf.length b.o) + in + go encoder + | `End -> Lwt.return () + in + let payload = Bigstringaf.substring b.o ~off:0 ~len in + stream (Some payload); + ctx := Uid.feed !ctx b.o ~off:0 ~len; + cursor := !cursor + len; + let encoder = Carton.Enc.N.dst encoder b.o 0 (Bigstringaf.length b.o) in + go encoder + in + let rec go idx = + if idx < Array.length targets then + encode_target idx >>= fun () -> go (succ idx) + else Lwt.return () + in + go 0 + in + encode_targets targets >>= fun () -> + let uid = Uid.get !ctx |> Uid.to_raw_string in + stream (Some uid); + stream None; + Lwt.return_unit + + let pack ~light_load ~heavy_load uids = + let open Lwt.Infix in + let stream, pusher = Lwt_stream.create () in + let fiber () = + deltify ~light_load ~heavy_load uids >>= fun (_, targets) -> + pack ~heavy_load pusher targets + in + let stream () = Lwt_stream.get stream in + Lwt.async fiber; + stream +end + module Make_client (Scheduler : Sigs.SCHED with type +'a s = 'a Lwt.t) (Pack : APPEND with type +'a fiber = 'a Lwt.t) @@ -190,6 +285,7 @@ struct module Log = (val Logs.src_log src : Logs.LOG) module Thin = Carton_lwt.Thin.Make (Uid) + module Stream_pack = Make_stream_pack (Uid) let fs = let open Rresult in @@ -504,97 +600,6 @@ struct | Failure err -> Lwt.return_error (R.msg err) | exn -> Lwt.return_error (`Exn exn) - module Delta = Carton_lwt.Enc.Delta (Uid) (Verbose) - - let deltify ~light_load ~heavy_load ?(threads = 4) (uids : Uid.t list) = - let open Lwt.Infix in - let fold (uid : Uid.t) = - light_load uid >|= fun (kind, length) -> - Carton_lwt.Enc.make_entry ~kind ~length uid - in - Lwt_list.map_p fold uids >|= Array.of_list >>= fun entries -> - Delta.delta - ~threads:(List.init threads (fun _thread -> heavy_load)) - ~weight:10 ~uid_ln:Uid.length entries - >>= fun targets -> Lwt.return (entries, targets) - - let header = Bigstringaf.create 12 - - let pack ~(heavy_load : Uid.t Carton_lwt.Enc.load) stream targets = - let open Lwt.Infix in - let offsets = Hashtbl.create (Array.length targets) in - let find uid = - match Hashtbl.find offsets uid with - | v -> Lwt.return_some v - | exception Not_found -> Lwt.return_none - in - let uid = - { Carton.Enc.uid_ln = Uid.length; Carton.Enc.uid_rw = Uid.to_raw_string } - in - let b = - { - Carton.Enc.o = Bigstringaf.create De.io_buffer_size; - Carton.Enc.i = Bigstringaf.create De.io_buffer_size; - Carton.Enc.q = De.Queue.create 0x10000; - Carton.Enc.w = De.Lz77.make_window ~bits:15; - } - in - let ctx = ref Uid.empty in - let cursor = ref 0 in - Carton.Enc.header_of_pack ~length:(Array.length targets) header 0 12; - stream (Some (Bigstringaf.to_string header)); - ctx := Uid.feed !ctx header ~off:0 ~len:12; - cursor := !cursor + 12; - let encode_targets targets = - let encode_target idx = - Hashtbl.add offsets (Carton.Enc.target_uid targets.(idx)) !cursor; - Carton_lwt.Enc.encode_target ~b ~find ~load:heavy_load ~uid - targets.(idx) ~cursor:!cursor - >>= fun (len, encoder) -> - let rec go encoder = - match Carton.Enc.N.encode ~o:b.o encoder with - | `Flush (encoder, len) -> - let payload = Bigstringaf.substring b.o ~off:0 ~len in - stream (Some payload); - ctx := Uid.feed !ctx b.o ~off:0 ~len; - cursor := !cursor + len; - let encoder = - Carton.Enc.N.dst encoder b.o 0 (Bigstringaf.length b.o) - in - go encoder - | `End -> Lwt.return () - in - let payload = Bigstringaf.substring b.o ~off:0 ~len in - stream (Some payload); - ctx := Uid.feed !ctx b.o ~off:0 ~len; - cursor := !cursor + len; - let encoder = Carton.Enc.N.dst encoder b.o 0 (Bigstringaf.length b.o) in - go encoder - in - let rec go idx = - if idx < Array.length targets then - encode_target idx >>= fun () -> go (succ idx) - else Lwt.return () - in - go 0 - in - encode_targets targets >>= fun () -> - let uid = Uid.get !ctx |> Uid.to_raw_string in - stream (Some uid); - stream None; - Lwt.return_unit - - let pack ~light_load ~heavy_load uids = - let open Lwt.Infix in - let stream, pusher = Lwt_stream.create () in - let fiber () = - deltify ~light_load ~heavy_load uids >>= fun (_, targets) -> - pack ~heavy_load pusher targets - in - let stream () = Lwt_stream.get stream in - Lwt.async fiber; - stream - let push_v1 ?uses_git_transport flow ~capabilities path cmds hostname store access push_cfg pack = let open Lwt.Infix in @@ -636,7 +641,7 @@ struct in push_v1 ~uses_git_transport flow ~capabilities path cmds hostname store access push_cfg - (pack ~light_load ~heavy_load) + (Stream_pack.pack ~light_load ~heavy_load) | Ok flow, Some (`HTTP (uri, handshake)), `V1 -> let push_cfg = Nss.Push.configuration ~stateless:true () in let uri0 = @@ -648,7 +653,7 @@ struct in handshake ~uri0 ~uri1 flow >>= fun () -> push_v1 flow ~capabilities path cmds hostname store access push_cfg - (pack ~light_load ~heavy_load) + (Stream_pack.pack ~light_load ~heavy_load) | Ok flow, Some _, _ -> Log.err (fun m -> m "The protocol version is uninmplemented."); Mimic.close flow >>= fun () -> diff --git a/src/not-so-smart/smart_git_intf.ml b/src/not-so-smart/smart_git_intf.ml index 9370ab4d2..711e63391 100644 --- a/src/not-so-smart/smart_git_intf.ml +++ b/src/not-so-smart/smart_git_intf.ml @@ -119,6 +119,17 @@ module type SMART_GIT = sig val git_uri : Uri.t Mimic.value + module Make_stream_pack (Uid : UID) : sig + type 'a stream = unit -> 'a option Lwt.t + + val pack : + light_load:Uid.t Carton_lwt.Thin.light_load -> + heavy_load:Uid.t Carton_lwt.Thin.heavy_load -> + Uid.t list -> + string stream + (** For a list of uids, construct and stream a pack. *) + end + module Make_client (Scheduler : Sigs.SCHED with type +'a s = 'a Lwt.t) (Pack : APPEND with type +'a fiber = 'a Lwt.t) diff --git a/test/smart/dune b/test/smart/dune index 254f99dd3..8dfe6a510 100644 --- a/test/smart/dune +++ b/test/smart/dune @@ -1,5 +1,5 @@ -(executable - (name test) +(tests + (names test server) (modules append fifo diff --git a/test/smart/server.ml b/test/smart/server.ml index 1fe383221..7a16eacb5 100644 --- a/test/smart/server.ml +++ b/test/smart/server.ml @@ -2,6 +2,7 @@ open Bos open Rresult open Lwt_backend open Store_backend +module Flow = Unixiz.Make (Mimic) (** to keep track of directories created by unit tests and clean them up afterwards *) module Tmp_dirs = struct @@ -85,14 +86,23 @@ let git_init_with_branch branch = OS.Cmd.run Cmd.(v "git" % "config" % "init.defaultBranch" % branch) else OS.Cmd.run Cmd.(v "git" % "init" % "-b" % branch) -let create_new_git_store _sw = +let create_new_git_push_store _sw = let create () = - (* XXX(dinosaure): a hook is already added by [Bos] to delete the - directory. *) create_tmp_dir "git-%s" >>= fun root -> OS.Dir.with_current root git_init_with_branch "master" |> R.join >>= fun () -> - let access = access lwt in + let access = + Sigs. + { + get = get_object_for_packer lwt; + parents = (fun _uid _store -> assert false); + deref = deref lwt; + locals = (fun _store -> assert false); + shallowed = (fun _store -> assert false); + shallow = (fun _store _uid -> assert false); + unshallow = (fun _store _uid -> assert false); + } + in let light_load uid = lightly_load lwt root uid |> Scheduler.prj in let heavy_load uid = heavily_load lwt root uid |> Scheduler.prj in let store = store_inj { path = root; tbl = Hashtbl.create 0x100 } in @@ -103,7 +113,9 @@ let create_new_git_store _sw = | Error err -> Fmt.failwith "%a" R.pp_msg err module Git_sync = - Smart_git.Make_server (Scheduler) (Append) (Append) (Uid) (Ref) + Smart_git.Make_server (Scheduler) (Flow) (Append) (Append) (Uid) (Ref) + +module Stream_pack = Smart_git.Make_stream_pack (Uid) let loopback_endpoint, loopback = Mimic.register ~name:"loopback" (module Loopback) @@ -129,35 +141,33 @@ let test_cancelled_fetch () = let run () = (* let capabilities = [] in *) let payloads = [ "\x30\x30\x30\x30" (* 0000 *) ] in - create_new_git_store sw >>= fun (access, store) -> - let { path; _ } = store_prj store in - let pack, index = - ( Fpath.(path / ".git" / "objects" / "pack"), - Fpath.(path / ".git" / "objects" / "pack") ) - in + create_new_git_push_store sw + >>= fun (((_, light_load, heavy_load) as access'), store) -> flow_with_payloads (payloads, ignore) >>? fun flow -> - Git_sync.upload_pack ~flow access store pack index + let pack = Stream_pack.pack ~light_load ~heavy_load in + Git_sync.upload_pack (Flow.make flow) access' store pack >>= fun () -> + Lwt.return (Ok ()) in (* TODO: Test that the flow receive the expected response: - List of references (head, master) *) run () >>= handle_error -let test_fetch_all () = - Alcotest_lwt.test_case "fetch all" `Quick @@ fun sw () -> - let open Lwt.Infix in - let run () = - Lwt.return @@ Ok () - (* Client send 0009done *) - (* Server should send: - - List of refs - - pack containing all commits *) - in - run () >>= handle_error +(* let test_fetch_all () = *) +(* Alcotest_lwt.test_case "fetch all" `Quick @@ fun sw () -> *) +(* let open Lwt.Infix in *) +(* let run () = *) +(* Lwt.return @@ Ok () *) +(* (1* Client send 0009done *1) *) +(* (1* Server should send: *) +(* - List of refs *) +(* - pack containing all commits *1) *) +(* in *) +(* run () >>= handle_error *) let test = - Alcotest_lwt.run "smart" [ "regression", [ test_cancelled_fetch () ] ] + Alcotest_lwt.run "server" [ "upload-pack", [ test_cancelled_fetch () ] ] -let tmp = "tmp" +let tmp = "tmp/server" let () = let fiber = From 008528a6ae336b68f173159ec7d27b8b5178b6cd Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Sat, 6 May 2023 16:38:52 +0200 Subject: [PATCH 08/14] Git_unix: Add std_in_out_ctx Allows to create a mimic context for reading and writing to stdin and stdout. --- src/git-unix/git_unix.ml | 2 ++ src/git-unix/git_unix.mli | 2 ++ src/git-unix/git_unix_mimic.ml | 20 ++++++++++++++++++-- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/git-unix/git_unix.ml b/src/git-unix/git_unix.ml index fa9575370..d2bd0d1a4 100644 --- a/src/git-unix/git_unix.ml +++ b/src/git-unix/git_unix.ml @@ -747,3 +747,5 @@ module Sync (Git_store : Git.S) = struct let push ~ctx edn store ?version ?capabilities cmds = push ~ctx edn store ?version ?capabilities cmds end + +let std_in_out_ctx = Git_unix_mimic.std_in_out_ctx diff --git a/src/git-unix/git_unix.mli b/src/git-unix/git_unix.mli index 153fb6474..24080b8bc 100644 --- a/src/git-unix/git_unix.mli +++ b/src/git-unix/git_unix.mli @@ -39,3 +39,5 @@ module Store : sig function should be registered with [at_exit] to clean pending file-descriptors. *) end + +val std_in_out_ctx : unit -> Mimic.ctx Lwt.t diff --git a/src/git-unix/git_unix_mimic.ml b/src/git-unix/git_unix_mimic.ml index b503b00c4..b6da6f8e1 100644 --- a/src/git-unix/git_unix_mimic.ml +++ b/src/git-unix/git_unix_mimic.ml @@ -88,13 +88,12 @@ module TCP = struct let unlisten _ = assert false end -module FIFO = struct +module Lwt_unix_file_descr_flow = struct open Lwt.Infix let ( >>? ) = Lwt_result.bind type flow = Lwt_unix.file_descr * Lwt_unix.file_descr - type endpoint = Fpath.t type error = [ `Error of Unix.error * string * string ] type write_error = [ `Closed | `Error of Unix.error * string * string ] @@ -132,6 +131,12 @@ module FIFO = struct | x :: r -> write fd x >>? fun () -> writev fd r let close (ic, oc) = Lwt_unix.close ic >>= fun () -> Lwt_unix.close oc +end + +module FIFO = struct + include Lwt_unix_file_descr_flow + + type endpoint = Fpath.t let connect fpath = let process () = @@ -184,3 +189,14 @@ let ctx happy_eyeballs = ~k:k2 ctx in C.with_optional_tls_config_and_headers ctx + +module Std_in_out = struct + include Lwt_unix_file_descr_flow + + type endpoint = unit + + let connect () = Lwt.return_ok Lwt_unix.(stdin, stdout) +end + +let std_endpoint, _ = Mimic.register ~name:"std" (module Std_in_out) +let std_in_out_ctx () = Lwt.return (Mimic.add std_endpoint () Mimic.empty) From f63b3bb3078ad480b27c66ba8b368ddf78e8b3c5 Mon Sep 17 00:00:00 2001 From: Jules Aguillon Date: Sat, 6 May 2023 17:22:02 +0200 Subject: [PATCH 09/14] Smart_git: Expose upload_pack Co-authored-by: Paul-Elliot --- src/git/dune | 2 ++ src/git/sync.ml | 8 ++++++++ src/git/sync.mli | 4 ++++ src/not-so-smart/smart_git.ml | 5 ++++- src/not-so-smart/smart_git_intf.ml | 1 - test/smart/server.ml | 7 ++----- 6 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/git/dune b/src/git/dune index 86bbf1ce0..23b45cf5e 100644 --- a/src/git/dune +++ b/src/git/dune @@ -5,6 +5,8 @@ hxd.core hxd.string mimic + mirage-flow + git.nss.unixiz rresult git.nss.sigs git.nss.pck diff --git a/src/git/sync.ml b/src/git/sync.ml index 971a6ae29..58696c60a 100644 --- a/src/git/sync.ml +++ b/src/git/sync.ml @@ -293,4 +293,12 @@ struct push ~ctx (access, lightly_load t, heavily_load t) ministore endpoint ?version ?capabilities cmds + + module Flow = Unixiz.Make(Mimic) + + include Smart_git.Make_server (Scheduler) (Flow) (Pack) (Index) (Hash) (Reference) + + let upload_pack ~flow t = + let ministore = Ministore.inj (t, Hashtbl.create 0x100) in + upload_pack (Flow.make flow) (access, lightly_load t, heavily_load t) ministore end diff --git a/src/git/sync.mli b/src/git/sync.mli index 35cb107dd..1abbef81a 100644 --- a/src/git/sync.mli +++ b/src/git/sync.mli @@ -106,4 +106,8 @@ module Make | `Update of Reference.t * Reference.t ] list -> (unit, ([> error ] as 'err)) result Lwt.t + + val upload_pack : + flow:Mimic.flow -> store -> unit Lwt.t + (** Answers a [git fetch] *) end diff --git a/src/not-so-smart/smart_git.ml b/src/not-so-smart/smart_git.ml index 06da00514..47be80037 100644 --- a/src/not-so-smart/smart_git.ml +++ b/src/not-so-smart/smart_git.ml @@ -681,7 +681,10 @@ module Make_server (Uid : UID) (Ref : Sigs.REF) = struct + module Stream_pack = Make_stream_pack (Uid) module Upload_pack = Nss.Upload_pack.Make (Scheduler) (Lwt) (Flow) (Uid) (Ref) - let upload_pack = Upload_pack.upload_pack + let upload_pack flow (_, light_load, heavy_load as access') store = + let pack = Stream_pack.pack ~light_load ~heavy_load in + Upload_pack.upload_pack flow access' store pack end diff --git a/src/not-so-smart/smart_git_intf.ml b/src/not-so-smart/smart_git_intf.ml index 711e63391..83e04d2ca 100644 --- a/src/not-so-smart/smart_git_intf.ml +++ b/src/not-so-smart/smart_git_intf.ml @@ -187,7 +187,6 @@ module type SMART_GIT = sig * Uid.t Carton_lwt.Thin.light_load * Uid.t Carton_lwt.Thin.heavy_load -> (Uid.t, Uid.t Pck.t, 'a) Sigs.store -> - (Uid.t list -> unit -> string option Flow.fiber) -> unit Flow.fiber (** Answers a [git fetch] *) diff --git a/test/smart/server.ml b/test/smart/server.ml index 7a16eacb5..8db94a273 100644 --- a/test/smart/server.ml +++ b/test/smart/server.ml @@ -115,8 +115,6 @@ let create_new_git_push_store _sw = module Git_sync = Smart_git.Make_server (Scheduler) (Flow) (Append) (Append) (Uid) (Ref) -module Stream_pack = Smart_git.Make_stream_pack (Uid) - let loopback_endpoint, loopback = Mimic.register ~name:"loopback" (module Loopback) @@ -142,10 +140,9 @@ let test_cancelled_fetch () = (* let capabilities = [] in *) let payloads = [ "\x30\x30\x30\x30" (* 0000 *) ] in create_new_git_push_store sw - >>= fun (((_, light_load, heavy_load) as access'), store) -> + >>= fun (access, store) -> flow_with_payloads (payloads, ignore) >>? fun flow -> - let pack = Stream_pack.pack ~light_load ~heavy_load in - Git_sync.upload_pack (Flow.make flow) access' store pack >>= fun () -> + Git_sync.upload_pack (Flow.make flow) access store >>= fun () -> Lwt.return (Ok ()) in (* TODO: Test that the flow receive the expected response: From cd8a3a6f4abbc6b31e65db6f775830bd08a9340b Mon Sep 17 00:00:00 2001 From: Paul-Elliot Date: Sat, 6 May 2023 17:57:32 +0200 Subject: [PATCH 10/14] Add a guit-upload-pack binary Signed-off-by: Paul-Elliot Co-authored-by: Jules Aguillon --- bin/guit/dune | 29 ++++++++ bin/guit/upload_pack.ml | 142 ++++++++++++++++++++++++++++++++++++++++ src/git/sync.ml | 11 +++- src/git/sync.mli | 5 +- 4 files changed, 182 insertions(+), 5 deletions(-) create mode 100644 bin/guit/upload_pack.ml diff --git a/bin/guit/dune b/bin/guit/dune index 47e166e2b..b9cc7b33f 100644 --- a/bin/guit/dune +++ b/bin/guit/dune @@ -27,6 +27,35 @@ mimic git-unix)) +(executable + (name upload_pack) + (modules upload_pack) + (package git-unix) + (public_name guit.upload_pack) + (libraries + happy-eyeballs-lwt + git + git.nss.git + logs + logs.fmt + fmt + mtime + mtime.clock.os + lwt + lwt.unix + fmt.cli + logs.cli + cstruct + domain-name + mirage-flow + fmt.tty + fpath + result + cmdliner + rresult + mimic + git-unix)) + (executable (name v) (modules v) diff --git a/bin/guit/upload_pack.ml b/bin/guit/upload_pack.ml new file mode 100644 index 000000000..9a963aca3 --- /dev/null +++ b/bin/guit/upload_pack.ml @@ -0,0 +1,142 @@ +let () = Random.self_init () + +open Git_unix +module Sync = Sync (Store) + +let src = Logs.Src.create "guit-upload-pack" ~doc:"logs binary event" + +module Log = (val Logs.src_log src : Logs.LOG) + +let pad n x = + if String.length x > n then x else x ^ String.make (n - String.length x) ' ' + +let pp_header ppf (level, header) = + let level_style = + match level with + | Logs.App -> Logs_fmt.app_style + | Logs.Debug -> Logs_fmt.debug_style + | Logs.Warning -> Logs_fmt.warn_style + | Logs.Error -> Logs_fmt.err_style + | Logs.Info -> Logs_fmt.info_style + in + let level = Logs.level_to_string (Some level) in + Fmt.pf ppf "[%a][%a]" + (Fmt.styled level_style Fmt.string) + level (Fmt.option Fmt.string) + (Option.map (pad 10) header) + +let reporter ppf = + let report src level ~over k msgf = + let k _ = + over (); + k () + in + let with_src_and_stamp h _ k fmt = + let dt_us = 1e-3 *. Int64.to_float (Mtime_clock.elapsed_ns ()) in + Fmt.kpf k ppf + ("%s %a %a: @[" ^^ fmt ^^ "@]@.") + (pad 10 (Fmt.str "%+04.0fus" dt_us)) + pp_header (level, h) + Fmt.(styled `Magenta string) + (pad 10 @@ Logs.Src.name src) + in + msgf @@ fun ?header ?tags fmt -> with_src_and_stamp header tags k fmt + in + { Logs.report } + +let setup_logs style_renderer level ppf = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + Logs.set_reporter (reporter ppf); + let quiet = match level with Some _ -> false | None -> true in + quiet, ppf + +type error = [ `Store of Store.error | `Sync of Sync.error ] + +let store_err err = `Store err +let sync_err err = `Sync err + +let pp_error ppf = function + | `Store err -> Fmt.pf ppf "(`Store %a)" Store.pp_error err + | `Sync err -> Fmt.pf ppf "(`Sync %a)" Sync.pp_error err + +let main quiet (directory : string) : (unit, 'error) Lwt_result.t = + let root = + (match directory with "" -> Sys.getcwd () | _ -> directory) |> Fpath.v + in + let ( >>? ) = Lwt_result.bind in + let ( >>?? ) = Lwt.bind in + let ( >>! ) v f = Lwt_result.map_error f v in + Store.v root >>! store_err >>? fun store -> + let _push_stdout, _push_stderr = + match quiet with + | true -> ignore, ignore + | false -> print_string, prerr_string + in + Git_unix.std_in_out_ctx () >>?? fun ctx -> + Mimic.resolve ctx >>? fun flow -> + Sync.upload_pack ~flow store >>?? Lwt.return_ok +(* >>! sync_err *) +(* >>? fun _ -> Lwt.return (Ok ()) *) + +open Cmdliner + +(* XXX(ulugbekna): We want ogit-fetch to have the following interface: + * ogit-fetch [-r | --root ] [--output ] + * [--progress] ... *) + +let output = + let converter = + let parse str = + match str with + | "stdout" -> Ok Fmt.stdout + | "stderr" -> Ok Fmt.stderr + | s -> Error (`Msg (Fmt.str "%s is not an output." s)) + in + let print ppf v = + Fmt.pf ppf "%s" (if v == Fmt.stdout then "stdout" else "stderr") + in + Arg.conv ~docv:"" (parse, print) + in + let doc = + "Output of the progress status. Can take values 'stdout' (default) or \ + 'stderr'." + in + Arg.( + value & opt converter Fmt.stdout & info [ "output" ] ~doc ~docv:"") + +let directory = + let doc = "Indicate path to repository root containing '.git' folder" in + Arg.(value & opt string "" & info [ "r"; "root" ] ~doc ~docv:"") + +(* XXX(ulugbekna): passed argument needs to be a URI of the repository *) +let repository = + let endpoint = + let parse = Smart_git.Endpoint.of_string in + let print = Smart_git.Endpoint.pp in + Arg.conv ~docv:"" (parse, print) + in + let doc = "URI leading to repository" in + Arg.( + required & pos 0 (some endpoint) None & info [] ~docv:"" ~doc) + +let setup_logs = + let docs = Manpage.s_common_options in + Term.( + const setup_logs + $ Fmt_cli.style_renderer ~docs () + $ Logs_cli.level ~docs () + $ output) + +let main (quiet, _) directory = + match Lwt_main.run (main quiet directory) with + | Ok () -> Ok () + | Error (#error as err) -> Error (Fmt.str "%a." pp_error err) + | Error _ -> Error "other" + +let command = + let doc = "Answer to a fetch." in + let info = Cmd.info "upload-pack" ~doc in + Cmd.v info Term.(const main $ setup_logs $ directory) + +let () = exit @@ Cmd.eval_result command diff --git a/src/git/sync.ml b/src/git/sync.ml index 58696c60a..8c9cc50e9 100644 --- a/src/git/sync.ml +++ b/src/git/sync.ml @@ -51,6 +51,8 @@ module type S = sig | `Update of Reference.t * Reference.t ] list -> (unit, error) result Lwt.t + + val upload_pack : flow:Mimic.flow -> store -> unit Lwt.t end module Make @@ -294,11 +296,14 @@ struct (access, lightly_load t, heavily_load t) ministore endpoint ?version ?capabilities cmds - module Flow = Unixiz.Make(Mimic) + module Flow = Unixiz.Make (Mimic) - include Smart_git.Make_server (Scheduler) (Flow) (Pack) (Index) (Hash) (Reference) + include + Smart_git.Make_server (Scheduler) (Flow) (Pack) (Index) (Hash) (Reference) let upload_pack ~flow t = let ministore = Ministore.inj (t, Hashtbl.create 0x100) in - upload_pack (Flow.make flow) (access, lightly_load t, heavily_load t) ministore + upload_pack (Flow.make flow) + (access, lightly_load t, heavily_load t) + ministore end diff --git a/src/git/sync.mli b/src/git/sync.mli index 1abbef81a..bfa93e6d0 100644 --- a/src/git/sync.mli +++ b/src/git/sync.mli @@ -50,6 +50,8 @@ module type S = sig | `Update of Reference.t * Reference.t ] list -> (unit, error) result Lwt.t + + val upload_pack : flow:Mimic.flow -> store -> unit Lwt.t end (** Creates a lower-level [Sync] functions [fetch] and [push] that are then @@ -107,7 +109,6 @@ module Make list -> (unit, ([> error ] as 'err)) result Lwt.t - val upload_pack : - flow:Mimic.flow -> store -> unit Lwt.t + val upload_pack : flow:Mimic.flow -> store -> unit Lwt.t (** Answers a [git fetch] *) end From b275577f5b13e7074a2266ea15ac4ada711db4ed Mon Sep 17 00:00:00 2001 From: Paul-Elliot Date: Sun, 7 May 2023 12:51:09 +0200 Subject: [PATCH 11/14] Upload pack: send advertised refs Signed-off-by: Paul-Elliot --- src/git/sync.ml | 15 +++++++++++++++ src/not-so-smart/smart_git_intf.ml | 2 +- src/not-so-smart/upload_pack.ml | 22 +++++++++++++++++++++- test/smart/server.ml | 5 ++--- test/smart/store_backend.ml | 7 ++----- test/smart/store_backend.mli | 6 +----- 6 files changed, 42 insertions(+), 15 deletions(-) diff --git a/src/git/sync.ml b/src/git/sync.ml index 8c9cc50e9..c15b0649c 100644 --- a/src/git/sync.ml +++ b/src/git/sync.ml @@ -301,6 +301,21 @@ struct include Smart_git.Make_server (Scheduler) (Flow) (Pack) (Index) (Hash) (Reference) + let access = + Sigs. + { + get = + (fun uid t -> + Scheduler.inj (get_object_for_packer (Ministore.prj t) uid)); + parents = (fun _ _ -> assert false); + deref = + (fun t refname -> Scheduler.inj (deref (Ministore.prj t) refname)); + locals = (fun t -> Scheduler.inj (locals (Ministore.prj t))); + shallowed = (fun _ -> assert false); + shallow = (fun _ -> assert false); + unshallow = (fun _ -> assert false); + } + let upload_pack ~flow t = let ministore = Ministore.inj (t, Hashtbl.create 0x100) in upload_pack (Flow.make flow) diff --git a/src/not-so-smart/smart_git_intf.ml b/src/not-so-smart/smart_git_intf.ml index 83e04d2ca..c7cc12acf 100644 --- a/src/not-so-smart/smart_git_intf.ml +++ b/src/not-so-smart/smart_git_intf.ml @@ -183,7 +183,7 @@ module type SMART_GIT = sig (Ref : Sigs.REF) : sig val upload_pack : Flow.t -> - (Uid.t, 'b, Uid.t Pck.t, 'a, Scheduler.t) Sigs.access + (Uid.t, Ref.t, Uid.t Pck.t, 'a, Scheduler.t) Sigs.access * Uid.t Carton_lwt.Thin.light_load * Uid.t Carton_lwt.Thin.heavy_load -> (Uid.t, Uid.t Pck.t, 'a) Sigs.store -> diff --git a/src/not-so-smart/upload_pack.ml b/src/not-so-smart/upload_pack.ml index a2b5d7605..28fb733d1 100644 --- a/src/not-so-smart/upload_pack.ml +++ b/src/not-so-smart/upload_pack.ml @@ -85,9 +85,29 @@ struct let upload_pack flow (access, _light_load, _heavy_load) store pack = let my_caps = [ `Multi_ack; `Side_band_64k; `Ofs_delta; `Thin_pack ] in + + access.S.locals store |> prj >>= fun refs -> + let rec go refs acc head_acc = + match refs with + | [] -> ( + match head_acc with + | None -> return acc + | Some head -> return (head :: acc)) + | ref_ :: q -> ( + access.deref store ref_ |> prj >>= function + | None -> go refs acc head_acc + | Some uid -> ( + let ref_ = Ref.to_string ref_ in + let uid = to_hex uid in + let entry = uid, ref_, false in + match ref_ with + | "HEAD" -> go q acc (Some entry) + | _ -> go q (entry :: acc) head_acc)) + in + go refs [] None >>= fun refs -> let fiber ctx = let open Smart in - let adv_ref = Advertised_refs.v1 ~capabilities:[] [] (* TODO *) in + let adv_ref = Advertised_refs.v1 ~capabilities:my_caps refs in let* () = send ctx send_advertised_refs adv_ref in recv ctx recv_want in diff --git a/test/smart/server.ml b/test/smart/server.ml index 8db94a273..0265ec126 100644 --- a/test/smart/server.ml +++ b/test/smart/server.ml @@ -97,7 +97,7 @@ let create_new_git_push_store _sw = get = get_object_for_packer lwt; parents = (fun _uid _store -> assert false); deref = deref lwt; - locals = (fun _store -> assert false); + locals = locals lwt; shallowed = (fun _store -> assert false); shallow = (fun _store _uid -> assert false); unshallow = (fun _store _uid -> assert false); @@ -139,8 +139,7 @@ let test_cancelled_fetch () = let run () = (* let capabilities = [] in *) let payloads = [ "\x30\x30\x30\x30" (* 0000 *) ] in - create_new_git_push_store sw - >>= fun (access, store) -> + create_new_git_push_store sw >>= fun (access, store) -> flow_with_payloads (payloads, ignore) >>? fun flow -> Git_sync.upload_pack (Flow.make flow) access store >>= fun () -> Lwt.return (Ok ()) diff --git a/test/smart/store_backend.ml b/test/smart/store_backend.ml index 90b2ade98..705b5cb04 100644 --- a/test/smart/store_backend.ml +++ b/test/smart/store_backend.ml @@ -338,11 +338,8 @@ let deref : Log.err (fun m -> m "Got an error [deref]: %a" R.pp_msg err); failwithf "%a" R.pp_msg err -let locals : - type s. - s scheduler -> - (Uid.t, Uid.t * int ref * int64, git) store -> - (Ref.t list, s) io = +let locals : type s. s scheduler -> (Uid.t, _, git) store -> (Ref.t list, s) io + = fun { Sigs.return; _ } store -> let { path; _ } = Store.prj store in let fiber = diff --git a/test/smart/store_backend.mli b/test/smart/store_backend.mli index e08652cd4..ae4c5c26c 100644 --- a/test/smart/store_backend.mli +++ b/test/smart/store_backend.mli @@ -13,11 +13,7 @@ val parents : ((Uid.t * int ref * int64) list, 's) io val deref : 's scheduler -> (_, _, git) store -> Ref.t -> (Uid.t option, 's) io - -val locals : - 's scheduler -> - (Uid.t, Uid.t * int ref * int64, git) store -> - (Ref.t list, 's) io +val locals : 's scheduler -> (Uid.t, 'a, git) store -> (Ref.t list, 's) io val get_object_for_packer : 's scheduler -> From 43682d0b0c2fb3ce986e6a3d96f5c5ffe565aa83 Mon Sep 17 00:00:00 2001 From: Paul-Elliot Date: Sun, 7 May 2023 21:01:54 +0200 Subject: [PATCH 12/14] remove server-side Sideband capability as it does not seem to work Signed-off-by: Paul-Elliot --- src/not-so-smart/upload_pack.ml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/not-so-smart/upload_pack.ml b/src/not-so-smart/upload_pack.ml index 28fb733d1..ee7ceaa8b 100644 --- a/src/not-so-smart/upload_pack.ml +++ b/src/not-so-smart/upload_pack.ml @@ -84,7 +84,9 @@ struct end let upload_pack flow (access, _light_load, _heavy_load) store pack = - let my_caps = [ `Multi_ack; `Side_band_64k; `Ofs_delta; `Thin_pack ] in + let my_caps = + [ `Multi_ack (* ; `Side_band_64k *); `Ofs_delta; `Thin_pack ] + in access.S.locals store |> prj >>= fun refs -> let rec go refs acc head_acc = From 060cb34e8d30c46c1a8675596265e6a8e22b2da6 Mon Sep 17 00:00:00 2001 From: Paul-Elliot Date: Sun, 7 May 2023 21:02:58 +0200 Subject: [PATCH 13/14] WIP: fix duplicate NAKs This is just a hack to make the git clone work, need to be cleaned and verified! Signed-off-by: Paul-Elliot --- src/not-so-smart/upload_pack.ml | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/not-so-smart/upload_pack.ml b/src/not-so-smart/upload_pack.ml index ee7ceaa8b..9473cf7bc 100644 --- a/src/not-so-smart/upload_pack.ml +++ b/src/not-so-smart/upload_pack.ml @@ -131,16 +131,20 @@ struct match cmd with | `Done -> ( match Server_neg.last_common neg with - | Some ack -> - let ack = Smart.Negotiation.map ~f:to_hex ack in - Smart_flow.run sched fail io flow - Smart.(send ctx send_acks [ ack ]) - |> prj - >>= fun () -> return neg + | Some _ack -> + (* TODO: work out when to send NAKs to follow the protocol... *) + (* let ack = Smart.Negotiation.map ~f:to_hex ack in *) + (* Smart_flow.run sched fail io flow *) + (* Smart.(send ctx send_acks [ ack ]) *) + (* |> prj *) + (* >>= fun () -> return neg *) + return neg | None -> - Smart_flow.run sched fail io flow Smart.(send ctx send_acks []) - |> prj - >>= fun () -> return neg) + (* TODO: work out when to send NAKs to follow the protocol... *) + (* Smart_flow.run sched fail io flow Smart.(send ctx send_acks []) *) + (* |> prj *) + (* >>= fun () -> *) + return neg) | `Flush -> negotiate neg in negotiate Server_neg.empty >>= fun neg -> From 2448f3b8f9f21fca81507e569e3c65cd64877168 Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Wed, 22 Nov 2023 18:36:58 +0100 Subject: [PATCH 14/14] Handle the case when the user wants nothing --- src/not-so-smart/upload_pack.ml | 119 +++++++++++++++++--------------- 1 file changed, 62 insertions(+), 57 deletions(-) diff --git a/src/not-so-smart/upload_pack.ml b/src/not-so-smart/upload_pack.ml index 9473cf7bc..b7e940916 100644 --- a/src/not-so-smart/upload_pack.ml +++ b/src/not-so-smart/upload_pack.ml @@ -114,61 +114,66 @@ struct recv ctx recv_want in let ctx = Smart.Context.make ~my_caps in - Smart_flow.run sched fail io flow (fiber ctx) |> prj >>= fun wants -> - (* TODO: Check that all the [wants] are in the store and each are the tip of a ref. *) - Smart.Context.replace_their_caps ctx wants.Smart.Want.capabilities; - - let rec negotiate neg = - Smart_flow.run sched fail io flow Smart.(recv ctx recv_have) |> prj - >>= fun have -> - let h, cmd = - (Smart.Have.map ~f:of_hex have :> Uid.t list * [ `Done | `Flush ]) - in - Server_neg.ack store access ~wants neg h >>= fun (neg, acks) -> - let acks = List.map (Smart.Negotiation.map ~f:to_hex) acks in - Smart_flow.run sched fail io flow Smart.(send ctx send_acks acks) |> prj - >>= fun () -> - match cmd with - | `Done -> ( - match Server_neg.last_common neg with - | Some _ack -> - (* TODO: work out when to send NAKs to follow the protocol... *) - (* let ack = Smart.Negotiation.map ~f:to_hex ack in *) - (* Smart_flow.run sched fail io flow *) - (* Smart.(send ctx send_acks [ ack ]) *) - (* |> prj *) - (* >>= fun () -> return neg *) - return neg - | None -> - (* TODO: work out when to send NAKs to follow the protocol... *) - (* Smart_flow.run sched fail io flow Smart.(send ctx send_acks []) *) - (* |> prj *) - (* >>= fun () -> *) - return neg) - | `Flush -> negotiate neg - in - negotiate Server_neg.empty >>= fun neg -> - let sources = - let a, b = wants.wants in - List.map of_hex (a :: b) - in - Pck.get_uncommon_objects sched ~compare:Uid.compare access store - ~exclude:neg.haves ~sources - |> prj - >>= fun uids -> - Log.debug (fun m -> m "Prepare a pack of %d object(s)." (List.length uids)); - let stream = pack uids in - let side_band = - Smart.Context.is_cap_shared ctx `Side_band - || Smart.Context.is_cap_shared ctx `Side_band_64k - in - let pack = Smart.send_pack side_band in - let rec go () = - stream () >>= function - | None -> return () - | Some payload -> - Smart_flow.run sched fail io flow Smart.(send ctx pack payload) |> prj - >>= fun () -> go () - in - go () + Smart_flow.run sched fail io flow (fiber ctx) |> prj >>= function + | None -> return () + | Some wants -> + (* TODO: Check that all the [wants] are in the store and each are the tip of a ref. *) + Smart.Context.replace_their_caps ctx wants.Smart.Want.capabilities; + + let rec negotiate neg = + Smart_flow.run sched fail io flow Smart.(recv ctx recv_have) |> prj + >>= fun have -> + let h, cmd = + (Smart.Have.map ~f:of_hex have :> Uid.t list * [ `Done | `Flush ]) + in + Server_neg.ack store access ~wants neg h >>= fun (neg, acks) -> + let acks = List.map (Smart.Negotiation.map ~f:to_hex) acks in + Smart_flow.run sched fail io flow Smart.(send ctx send_acks acks) + |> prj + >>= fun () -> + match cmd with + | `Done -> ( + match Server_neg.last_common neg with + | Some _ack -> + (* TODO: work out when to send NAKs to follow the protocol... *) + (* let ack = Smart.Negotiation.map ~f:to_hex ack in *) + (* Smart_flow.run sched fail io flow *) + (* Smart.(send ctx send_acks [ ack ]) *) + (* |> prj *) + (* >>= fun () -> return neg *) + return neg + | None -> + (* TODO: work out when to send NAKs to follow the protocol... *) + (* Smart_flow.run sched fail io flow Smart.(send ctx send_acks []) *) + (* |> prj *) + (* >>= fun () -> *) + return neg) + | `Flush -> negotiate neg + in + negotiate Server_neg.empty >>= fun neg -> + let sources = + let a, b = wants.wants in + List.map of_hex (a :: b) + in + Pck.get_uncommon_objects sched ~compare:Uid.compare access store + ~exclude:neg.haves ~sources + |> prj + >>= fun uids -> + Log.debug (fun m -> + m "Prepare a pack of %d object(s)." (List.length uids)); + let stream = pack uids in + let side_band = + Smart.Context.is_cap_shared ctx `Side_band + || Smart.Context.is_cap_shared ctx `Side_band_64k + in + let pack = Smart.send_pack side_band in + let rec go () = + stream () >>= function + | None -> return () + | Some payload -> + Smart_flow.run sched fail io flow Smart.(send ctx pack payload) + |> prj + >>= fun () -> go () + in + go () end