From 906ae7b07769de1c16b9df773ef048d40b867e20 Mon Sep 17 00:00:00 2001 From: Ulugbek Abdullaev Date: Fri, 15 Jan 2021 00:46:25 +0500 Subject: [PATCH] move 'run' fn in nss 'Neg' to its own sub-library 'git.nss.smart-flow' --- src/not-so-smart/dune | 12 ++++++-- src/not-so-smart/fetch.ml | 7 +++-- src/not-so-smart/find_common.ml | 49 +------------------------------ src/not-so-smart/neg.ml | 3 -- src/not-so-smart/neg.mli | 9 ------ src/not-so-smart/push.ml | 12 ++++---- src/not-so-smart/smart_flow.ml | 52 +++++++++++++++++++++++++++++++++ src/not-so-smart/smart_flow.mli | 13 +++++++++ 8 files changed, 87 insertions(+), 70 deletions(-) create mode 100644 src/not-so-smart/smart_flow.ml create mode 100644 src/not-so-smart/smart_flow.mli diff --git a/src/not-so-smart/dune b/src/not-so-smart/dune index d73fafd8a..b8eb41ca1 100644 --- a/src/not-so-smart/dune +++ b/src/not-so-smart/dune @@ -17,6 +17,12 @@ (modules sigs) (libraries fmt cstruct)) +(library + (name smart_flow) + (public_name git.nss.smart-flow) + (modules smart_flow) + (libraries cstruct fmt git.nss.sigs git.nss.smart logs)) + (library (name hkt) (public_name git.nss.hkt) @@ -27,7 +33,8 @@ (name neg) (public_name git.nss.neg) (modules neg find_common default) - (libraries stdlib-shims fmt rresult cstruct sigs logs psq smart)) + (libraries stdlib-shims fmt rresult cstruct sigs logs psq smart + git.nss.smart-flow)) (library (name pck) @@ -39,7 +46,8 @@ (name nss) (public_name git.nss) (modules nss fetch push) - (libraries fmt result rresult logs ipaddr domain-name smart sigs neg pck)) + (libraries fmt result rresult logs ipaddr domain-name smart sigs neg pck + git.nss.smart-flow)) (library (name unixiz) diff --git a/src/not-so-smart/fetch.ml b/src/not-so-smart/fetch.ml index 761ce4e43..9c804a893 100644 --- a/src/not-so-smart/fetch.ml +++ b/src/not-so-smart/fetch.ml @@ -99,7 +99,8 @@ struct let ctx = Smart.Context.make capabilities in let negotiator = Neg.make ~compare:Uid.compare in Neg.tips sched access store negotiator |> prj >>= fun () -> - Neg.run sched fail io flow (prelude ctx) |> prj >>= fun (uids, refs) -> + Smart_flow.run sched fail io flow (prelude ctx) |> prj + >>= fun (uids, refs) -> let hex = { Neg.to_hex = Uid.to_hex; of_hex = Uid.of_hex; compare = Uid.compare } in @@ -121,8 +122,8 @@ struct if res < 0 then Log.warn (fun m -> m "No common commits"); let rec go () = Log.debug (fun m -> m "Read PACK file."); - Neg.run sched fail io flow (pack ctx) |> prj >>= fun continue -> - if continue then go () else return () + Smart_flow.run sched fail io flow (pack ctx) |> prj + >>= fun continue -> if continue then go () else return () in Log.debug (fun m -> m "Start to download PACK file."); go () >>= fun () -> return (List.combine refs uids) diff --git a/src/not-so-smart/find_common.ml b/src/not-so-smart/find_common.ml index fb6bf5835..5289b4df1 100644 --- a/src/not-so-smart/find_common.ml +++ b/src/not-so-smart/find_common.ml @@ -1,4 +1,5 @@ open Sigs +open Smart_flow let ( <.> ) f g x = f (g x) @@ -10,54 +11,6 @@ let _max_in_vain = 256 let _large_flush = 16384 let _pipe_safe_flush = 32 -type ('a, 's) raise = exn -> ('a, 's) io - -let io_buffer_size = 65536 - -let run : - type fl s. - s scheduler -> - ('a, s) raise -> - (fl, 'error, s) flow -> - fl -> - ('res, [ `Protocol of Smart.error ]) Smart.t -> - ('res, s) io = - fun { bind; return } raise { recv; send; pp_error } flow fiber -> - let ( >>= ) = bind in - let tmp = Cstruct.create io_buffer_size in - let failwithf fmt = Format.kasprintf (fun err -> raise (Failure err)) fmt in - let rec go = function - | Smart.Read { k; buffer; off; len; eof } -> ( - let max = min (Cstruct.len tmp) len in - Log.debug (fun m -> m "Start to read %d byte(s)." max); - recv flow (Cstruct.sub tmp 0 max) >>= function - | Ok `End_of_flow -> - Log.debug (fun m -> m "Got end of input."); - go (eof ()) - | Ok (`Input len) -> - Log.debug (fun m -> m "Got %d/%d byte(s)." len max); - Cstruct.blit_to_bytes tmp 0 buffer off len; - go (k len) - | Error err -> - Log.err (fun m -> m "Got an error: %a." pp_error err); - failwithf "%a" pp_error err) - | Smart.Write { k; buffer; off; len } -> - let rec loop tmp = - if Cstruct.len tmp = 0 then go (k len) - else - send flow tmp >>= function - | Ok shift -> loop (Cstruct.shift tmp shift) - | Error err -> failwithf "%a" pp_error err - in - Log.debug (fun m -> m "Write %d byte(s)." len); - loop (Cstruct.of_string buffer ~off ~len) - | Smart.Return v -> return v - | Smart.Error (`Protocol err) -> - Log.err (fun m -> m "Got a protocol error: %a." Smart.pp_error err); - failwithf "%a" Smart.pp_error err - in - go fiber - (* XXX(dinosaure): this part is really **ugly**! But we must follow the same behaviour of [git]. Instead to understand the synchronisation process of [git] with Smart.v1 and implement a state of the art synchronisation algorithm, I diff --git a/src/not-so-smart/neg.ml b/src/not-so-smart/neg.ml index 38e4c5b98..cb2ff9a8f 100644 --- a/src/not-so-smart/neg.ml +++ b/src/not-so-smart/neg.ml @@ -1,5 +1,3 @@ -type nonrec ('a, 's) raise = ('a, 's) Find_common.raise - type nonrec configuration = Find_common.configuration = { stateless : bool; mutable multi_ack : [ `None | `Some | `Detailed ]; @@ -15,6 +13,5 @@ type nonrec 'uid hex = 'uid Find_common.hex = { type 'uid negotiator = 'uid Default.t let make ~compare = Default.make ~compare -let run = Find_common.run let find_common = Find_common.find_common let tips = Find_common.tips diff --git a/src/not-so-smart/neg.mli b/src/not-so-smart/neg.mli index 2691a53ac..d62ae0509 100644 --- a/src/not-so-smart/neg.mli +++ b/src/not-so-smart/neg.mli @@ -31,19 +31,10 @@ type 'uid hex = { compare : 'uid -> 'uid -> int; } -type ('a, 's) raise = exn -> ('a, 's) io type 'uid negotiator val make : compare:('uid -> 'uid -> int) -> 'uid negotiator -val run : - 's scheduler -> - ('res, 's) raise -> - ('flow, 'error, 's) flow -> - 'flow -> - ('res, [ `Protocol of Smart.error ]) Smart.t -> - ('res, 's) io - val find_common : 's scheduler -> ('flow, 'error, 's) flow -> diff --git a/src/not-so-smart/push.ml b/src/not-so-smart/push.ml index 8e269ec84..fbdd29ca0 100644 --- a/src/not-so-smart/push.ml +++ b/src/not-so-smart/push.ml @@ -54,7 +54,8 @@ struct return (Smart.Advertised_refs.map ~fuid:Uid.of_hex ~fref:Ref.v v) in let ctx = Smart.Context.make caps in - Neg.run sched fail io flow (fiber ctx) |> prj >>= fun advertised_refs -> + Smart_flow.run sched fail io flow (fiber ctx) |> prj + >>= fun advertised_refs -> Pck.commands sched ~capabilities:(Smart.Advertised_refs.capabilities advertised_refs) ~equal:Ref.equal ~deref:access.Sigs.deref store cmds @@ -62,10 +63,10 @@ struct |> prj >>= function | None -> - Neg.run sched fail io flow Smart.(send ctx flush ()) |> prj + Smart_flow.run sched fail io flow Smart.(send ctx flush ()) |> prj >>= fun () -> return () | Some cmds -> ( - Neg.run sched fail io flow + Smart_flow.run sched fail io flow Smart.( send ctx commands (Commands.map ~fuid:Uid.to_hex ~fref:Ref.to_string cmds)) @@ -97,14 +98,15 @@ struct Log.debug (fun m -> m "report-status capability: %b." report_status); if report_status then - Neg.run sched fail io flow Smart.(recv ctx status) + Smart_flow.run sched fail io flow Smart.(recv ctx status) |> prj >>| Smart.Status.map ~f:Ref.v else let cmds = List.map R.ok (Smart.Commands.commands cmds) in return (Smart.Status.v cmds) | Some payload -> - Neg.run sched fail io flow Smart.(send ctx pack payload) |> prj + Smart_flow.run sched fail io flow Smart.(send ctx pack payload) + |> prj >>= fun () -> go () in go () >>= fun status -> diff --git a/src/not-so-smart/smart_flow.ml b/src/not-so-smart/smart_flow.ml new file mode 100644 index 000000000..1d70336ba --- /dev/null +++ b/src/not-so-smart/smart_flow.ml @@ -0,0 +1,52 @@ +open Sigs + +module Log = (val let src = Logs.Src.create "smart_flow" in + Logs.src_log src : Logs.LOG) + +let io_buffer_size = 65536 + +type ('a, 's) raise = exn -> ('a, 's) io + +let run : + type fl s. + s scheduler -> + ('a, s) raise -> + (fl, 'error, s) flow -> + fl -> + ('res, [ `Protocol of Smart.error ]) Smart.t -> + ('res, s) io = + fun { bind; return } raise { recv; send; pp_error } flow fiber -> + let ( >>= ) = bind in + let tmp = Cstruct.create io_buffer_size in + let failwithf fmt = Format.kasprintf (fun err -> raise (Failure err)) fmt in + let rec go = function + | Smart.Read { k; buffer; off; len; eof } -> ( + let max = min (Cstruct.len tmp) len in + Log.debug (fun m -> m "Start to read %d byte(s)." max); + recv flow (Cstruct.sub tmp 0 max) >>= function + | Ok `End_of_flow -> + Log.debug (fun m -> m "Got end of input."); + go (eof ()) + | Ok (`Input len) -> + Log.debug (fun m -> m "Got %d/%d byte(s)." len max); + Cstruct.blit_to_bytes tmp 0 buffer off len; + go (k len) + | Error err -> + Log.err (fun m -> m "Got an error: %a." pp_error err); + failwithf "%a" pp_error err) + | Smart.Write { k; buffer; off; len } -> + let rec loop tmp = + if Cstruct.len tmp = 0 then go (k len) + else + send flow tmp >>= function + | Ok shift -> loop (Cstruct.shift tmp shift) + | Error err -> failwithf "%a" pp_error err + in + Log.debug (fun m -> m "Write %d byte(s)." len); + loop (Cstruct.of_string buffer ~off ~len) + | Smart.Return v -> return v + | Smart.Error (`Protocol err) -> + Log.err (fun m -> m "Got a protocol error: %a." Smart.pp_error err); + failwithf "%a" Smart.pp_error err + in + go fiber diff --git a/src/not-so-smart/smart_flow.mli b/src/not-so-smart/smart_flow.mli new file mode 100644 index 000000000..fbcdcde6d --- /dev/null +++ b/src/not-so-smart/smart_flow.mli @@ -0,0 +1,13 @@ +open Sigs + +val io_buffer_size : int + +type ('a, 's) raise = exn -> ('a, 's) Sigs.io + +val run : + 's scheduler -> + ('res, 's) raise -> + ('flow, 'error, 's) flow -> + 'flow -> + ('res, [ `Protocol of Smart.error ]) Smart.t -> + ('res, 's) io