Skip to content

Commit

Permalink
move 'run' fn in nss 'Neg'
Browse files Browse the repository at this point in the history
to its own sub-library 'git.nss.smart-flow'
  • Loading branch information
ulugbekna committed Jan 18, 2021
1 parent faeb743 commit 906ae7b
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 70 deletions.
12 changes: 10 additions & 2 deletions src/not-so-smart/dune
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
(modules sigs)
(libraries fmt cstruct))

(library
(name smart_flow)
(public_name git.nss.smart-flow)
(modules smart_flow)
(libraries cstruct fmt git.nss.sigs git.nss.smart logs))

(library
(name hkt)
(public_name git.nss.hkt)
Expand All @@ -27,7 +33,8 @@
(name neg)
(public_name git.nss.neg)
(modules neg find_common default)
(libraries stdlib-shims fmt rresult cstruct sigs logs psq smart))
(libraries stdlib-shims fmt rresult cstruct sigs logs psq smart
git.nss.smart-flow))

(library
(name pck)
Expand All @@ -39,7 +46,8 @@
(name nss)
(public_name git.nss)
(modules nss fetch push)
(libraries fmt result rresult logs ipaddr domain-name smart sigs neg pck))
(libraries fmt result rresult logs ipaddr domain-name smart sigs neg pck
git.nss.smart-flow))

(library
(name unixiz)
Expand Down
7 changes: 4 additions & 3 deletions src/not-so-smart/fetch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ struct
let ctx = Smart.Context.make capabilities in
let negotiator = Neg.make ~compare:Uid.compare in
Neg.tips sched access store negotiator |> prj >>= fun () ->
Neg.run sched fail io flow (prelude ctx) |> prj >>= fun (uids, refs) ->
Smart_flow.run sched fail io flow (prelude ctx) |> prj
>>= fun (uids, refs) ->
let hex =
{ Neg.to_hex = Uid.to_hex; of_hex = Uid.of_hex; compare = Uid.compare }
in
Expand All @@ -121,8 +122,8 @@ struct
if res < 0 then Log.warn (fun m -> m "No common commits");
let rec go () =
Log.debug (fun m -> m "Read PACK file.");
Neg.run sched fail io flow (pack ctx) |> prj >>= fun continue ->
if continue then go () else return ()
Smart_flow.run sched fail io flow (pack ctx) |> prj
>>= fun continue -> if continue then go () else return ()
in
Log.debug (fun m -> m "Start to download PACK file.");
go () >>= fun () -> return (List.combine refs uids)
Expand Down
49 changes: 1 addition & 48 deletions src/not-so-smart/find_common.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
open Sigs
open Smart_flow

let ( <.> ) f g x = f (g x)

Expand All @@ -10,54 +11,6 @@ let _max_in_vain = 256
let _large_flush = 16384
let _pipe_safe_flush = 32

type ('a, 's) raise = exn -> ('a, 's) io

let io_buffer_size = 65536

let run :
type fl s.
s scheduler ->
('a, s) raise ->
(fl, 'error, s) flow ->
fl ->
('res, [ `Protocol of Smart.error ]) Smart.t ->
('res, s) io =
fun { bind; return } raise { recv; send; pp_error } flow fiber ->
let ( >>= ) = bind in
let tmp = Cstruct.create io_buffer_size in
let failwithf fmt = Format.kasprintf (fun err -> raise (Failure err)) fmt in
let rec go = function
| Smart.Read { k; buffer; off; len; eof } -> (
let max = min (Cstruct.len tmp) len in
Log.debug (fun m -> m "Start to read %d byte(s)." max);
recv flow (Cstruct.sub tmp 0 max) >>= function
| Ok `End_of_flow ->
Log.debug (fun m -> m "Got end of input.");
go (eof ())
| Ok (`Input len) ->
Log.debug (fun m -> m "Got %d/%d byte(s)." len max);
Cstruct.blit_to_bytes tmp 0 buffer off len;
go (k len)
| Error err ->
Log.err (fun m -> m "Got an error: %a." pp_error err);
failwithf "%a" pp_error err)
| Smart.Write { k; buffer; off; len } ->
let rec loop tmp =
if Cstruct.len tmp = 0 then go (k len)
else
send flow tmp >>= function
| Ok shift -> loop (Cstruct.shift tmp shift)
| Error err -> failwithf "%a" pp_error err
in
Log.debug (fun m -> m "Write %d byte(s)." len);
loop (Cstruct.of_string buffer ~off ~len)
| Smart.Return v -> return v
| Smart.Error (`Protocol err) ->
Log.err (fun m -> m "Got a protocol error: %a." Smart.pp_error err);
failwithf "%a" Smart.pp_error err
in
go fiber

(* XXX(dinosaure): this part is really **ugly**! But we must follow the same
behaviour of [git]. Instead to understand the synchronisation process of [git]
with Smart.v1 and implement a state of the art synchronisation algorithm, I
Expand Down
3 changes: 0 additions & 3 deletions src/not-so-smart/neg.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
type nonrec ('a, 's) raise = ('a, 's) Find_common.raise

type nonrec configuration = Find_common.configuration = {
stateless : bool;
mutable multi_ack : [ `None | `Some | `Detailed ];
Expand All @@ -15,6 +13,5 @@ type nonrec 'uid hex = 'uid Find_common.hex = {
type 'uid negotiator = 'uid Default.t

let make ~compare = Default.make ~compare
let run = Find_common.run
let find_common = Find_common.find_common
let tips = Find_common.tips
9 changes: 0 additions & 9 deletions src/not-so-smart/neg.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,10 @@ type 'uid hex = {
compare : 'uid -> 'uid -> int;
}

type ('a, 's) raise = exn -> ('a, 's) io
type 'uid negotiator

val make : compare:('uid -> 'uid -> int) -> 'uid negotiator

val run :
's scheduler ->
('res, 's) raise ->
('flow, 'error, 's) flow ->
'flow ->
('res, [ `Protocol of Smart.error ]) Smart.t ->
('res, 's) io

val find_common :
's scheduler ->
('flow, 'error, 's) flow ->
Expand Down
12 changes: 7 additions & 5 deletions src/not-so-smart/push.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,19 @@ struct
return (Smart.Advertised_refs.map ~fuid:Uid.of_hex ~fref:Ref.v v)
in
let ctx = Smart.Context.make caps in
Neg.run sched fail io flow (fiber ctx) |> prj >>= fun advertised_refs ->
Smart_flow.run sched fail io flow (fiber ctx) |> prj
>>= fun advertised_refs ->
Pck.commands sched
~capabilities:(Smart.Advertised_refs.capabilities advertised_refs)
~equal:Ref.equal ~deref:access.Sigs.deref store cmds
(Smart.Advertised_refs.refs advertised_refs)
|> prj
>>= function
| None ->
Neg.run sched fail io flow Smart.(send ctx flush ()) |> prj
Smart_flow.run sched fail io flow Smart.(send ctx flush ()) |> prj
>>= fun () -> return ()
| Some cmds -> (
Neg.run sched fail io flow
Smart_flow.run sched fail io flow
Smart.(
send ctx commands
(Commands.map ~fuid:Uid.to_hex ~fref:Ref.to_string cmds))
Expand Down Expand Up @@ -97,14 +98,15 @@ struct
Log.debug (fun m ->
m "report-status capability: %b." report_status);
if report_status then
Neg.run sched fail io flow Smart.(recv ctx status)
Smart_flow.run sched fail io flow Smart.(recv ctx status)
|> prj
>>| Smart.Status.map ~f:Ref.v
else
let cmds = List.map R.ok (Smart.Commands.commands cmds) in
return (Smart.Status.v cmds)
| Some payload ->
Neg.run sched fail io flow Smart.(send ctx pack payload) |> prj
Smart_flow.run sched fail io flow Smart.(send ctx pack payload)
|> prj
>>= fun () -> go ()
in
go () >>= fun status ->
Expand Down
52 changes: 52 additions & 0 deletions src/not-so-smart/smart_flow.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
open Sigs

module Log = (val let src = Logs.Src.create "smart_flow" in
Logs.src_log src : Logs.LOG)

let io_buffer_size = 65536

type ('a, 's) raise = exn -> ('a, 's) io

let run :
type fl s.
s scheduler ->
('a, s) raise ->
(fl, 'error, s) flow ->
fl ->
('res, [ `Protocol of Smart.error ]) Smart.t ->
('res, s) io =
fun { bind; return } raise { recv; send; pp_error } flow fiber ->
let ( >>= ) = bind in
let tmp = Cstruct.create io_buffer_size in
let failwithf fmt = Format.kasprintf (fun err -> raise (Failure err)) fmt in
let rec go = function
| Smart.Read { k; buffer; off; len; eof } -> (
let max = min (Cstruct.len tmp) len in
Log.debug (fun m -> m "Start to read %d byte(s)." max);
recv flow (Cstruct.sub tmp 0 max) >>= function
| Ok `End_of_flow ->
Log.debug (fun m -> m "Got end of input.");
go (eof ())
| Ok (`Input len) ->
Log.debug (fun m -> m "Got %d/%d byte(s)." len max);
Cstruct.blit_to_bytes tmp 0 buffer off len;
go (k len)
| Error err ->
Log.err (fun m -> m "Got an error: %a." pp_error err);
failwithf "%a" pp_error err)
| Smart.Write { k; buffer; off; len } ->
let rec loop tmp =
if Cstruct.len tmp = 0 then go (k len)
else
send flow tmp >>= function
| Ok shift -> loop (Cstruct.shift tmp shift)
| Error err -> failwithf "%a" pp_error err
in
Log.debug (fun m -> m "Write %d byte(s)." len);
loop (Cstruct.of_string buffer ~off ~len)
| Smart.Return v -> return v
| Smart.Error (`Protocol err) ->
Log.err (fun m -> m "Got a protocol error: %a." Smart.pp_error err);
failwithf "%a" Smart.pp_error err
in
go fiber
13 changes: 13 additions & 0 deletions src/not-so-smart/smart_flow.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
open Sigs

val io_buffer_size : int

type ('a, 's) raise = exn -> ('a, 's) Sigs.io

val run :
's scheduler ->
('res, 's) raise ->
('flow, 'error, 's) flow ->
'flow ->
('res, [ `Protocol of Smart.error ]) Smart.t ->
('res, 's) io

0 comments on commit 906ae7b

Please sign in to comment.