Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: make it clear where callbacks run in switch/timer #13

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sync/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
(synopsis "Sync primitives")
(preprocess
(pps ppx_deriving.std))
(libraries containers threads.posix imandrakit))
(libraries containers moonpool threads.posix imandrakit))
33 changes: 25 additions & 8 deletions src/sync/switch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,20 @@ type state =
}
| Off

type t = { st: state Atomic.t } [@@unboxed]
type run_on =
[ `Sync
| `Runner of Moonpool.Runner.t
]

type t = {
st: state Atomic.t;
run_on: run_on;
}

let run_on (r : run_on) f : unit =
match r with
| `Sync -> f ()
| `Runner runner -> Moonpool.Runner.run_async runner f

let update_ (type a) (self : t) (f : state -> a * state) : a =
let rec loop () =
Expand All @@ -27,7 +40,7 @@ let on_turn_off (self : t) (f : cb) : unit =
| Off -> true, Off
| On r -> false, On { r with l = f :: r.l })
in
if must_fire then (* call now *) f ()
if must_fire then (* call now *) run_on self.run_on f

let with_on_turn_off (self : t) (cb : cb) f =
let must_fire, cb_handle =
Expand All @@ -40,7 +53,7 @@ let with_on_turn_off (self : t) (cb : cb) f =

if must_fire then (
(* switch is already off, just call [cb] now and tailcall into [f] *)
cb ();
run_on self.run_on cb;
f ()
) else (
(* cleanup: remove the callback *)
Expand All @@ -53,22 +66,26 @@ let with_on_turn_off (self : t) (cb : cb) f =
Fun.protect f ~finally:remove_cb
)

let turn_off' ?(trace = true) self =
let turn_off_ ~trace self =
(* When calling turn_off' from a signal handler, Trace.message may cause the thread
to be killed. For this reason, we provide a way to disable tracing here. *)
if trace then Trace.message "switch.turn-off";
match Atomic.exchange self.st Off with
| Off -> `Was_off
| On { l; m; n = _ } ->
List.iter (fun f -> f ()) l;
Int_map.iter (fun _ f -> f ()) m;
List.iter (fun f -> run_on self.run_on f) l;
Int_map.iter (fun _ f -> run_on self.run_on f) m;
`Was_on

let turn_off' ?(trace = true) self = turn_off_ ~trace self

let[@inline] turn_off ?(trace = true) self =
ignore (turn_off' self ~trace : [> `Was_on ])

let create ?parent () : t =
let self = { st = Atomic.make (On { l = []; n = 0; m = Int_map.empty }) } in
let create ~run_on ?parent () : t =
let self =
{ run_on; st = Atomic.make (On { l = []; n = 0; m = Int_map.empty }) }
in
Option.iter (fun p -> on_turn_off p (fun () -> turn_off self)) parent;
self

Expand Down
4 changes: 3 additions & 1 deletion src/sync/switch.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

type t [@@deriving show]

val create : ?parent:t -> unit -> t
val create :
run_on:[ `Sync | `Runner of Moonpool.Runner.t ] -> ?parent:t -> unit -> t
(** New switch.
@param run_on decides where callbacks will run when the switch is turned off.
@param parent inherit from this switch. It means that the result switches
off if the parent does, but conversely we can turn the result off
without affecting the parent.
Expand Down
2 changes: 1 addition & 1 deletion src/thread/background_thread.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ type t = Executor.t

let pp out _self = Fmt.string out "<background thread>"

let start ?(active = Switch.create ()) ?on_exn ~name () : t =
let start ?(active = Switch.create ~run_on:`Sync ()) ?on_exn ~name () : t =
let size_name_ = spf "%s.queue-size" name in
let around_task =
if Trace.enabled () then (
Expand Down
Loading