Skip to content

Commit 36689d8

Browse files
committed
Change to use Picos instead of DLA and DLT
This adds support for cancelation through Picos and removes explicit support for timeouts, which simplifies the library. This basically also means that one can no longer use Kcas without a scheduler.
1 parent 5715181 commit 36689d8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+786
-1022
lines changed

CHANGES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## Next version
2+
3+
- Changed to use [Picos](https://github.com/ocaml-multicore/picos/) instead of
4+
[DLA](https://github.com/ocaml-multicore/domain-local-await/) and
5+
[DLT](https://github.com/ocaml-multicore/domain-local-timeout/) (@polytypic)
6+
17
## 0.7.0
28

39
- Numerous minor internal improvements (@polytypic)

README.md

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ Features and properties:
4646
read-only compare (CMP) operations that can be performed on overlapping
4747
locations in parallel without interference.
4848

49-
- **_Blocking await_**: The algorithm supports timeouts and awaiting for changes
50-
to any number of shared memory locations.
49+
- **_Blocking await_**: The algorithm supports cancelation and awaiting for
50+
changes to any number of shared memory locations.
5151

5252
- **_Composable_**: Independently developed transactions can be composed with
5353
ease sequentially, conjunctively, conditionally, and disjunctively.
@@ -75,7 +75,7 @@ is distributed under the [ISC license](LICENSE.md).
7575
- [A transactional lock-free queue](#a-transactional-lock-free-queue)
7676
- [Composing transactions](#composing-transactions)
7777
- [Blocking transactions](#blocking-transactions)
78-
- [Timeouts](#timeouts)
78+
- [Cancelation and Timeouts](#cancelation-and-timeouts)
7979
- [A transactional lock-free leftist heap](#a-transactional-lock-free-leftist-heap)
8080
- [Programming with transactional data structures](#programming-with-transactional-data-structures)
8181
- [The dining philosophers problem](#the-dining-philosophers-problem)
@@ -100,14 +100,7 @@ is distributed under the [ISC license](LICENSE.md).
100100

101101
To use the library
102102

103-
<!--
104103
```ocaml
105-
# #thread
106-
```
107-
-->
108-
109-
```ocaml
110-
# #require "kcas"
111104
# open Kcas
112105
```
113106

@@ -142,6 +135,7 @@ Block waiting for changes to locations:
142135

143136
```ocaml
144137
# let a_domain = Domain.spawn @@ fun () ->
138+
Scheduler.run @@ fun () ->
145139
let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in
146140
Printf.sprintf "The answer is %d!" x
147141
val a_domain : string Domain.t = <abstr>
@@ -550,13 +544,28 @@ and then spawn a domain that tries to atomically both pop and dequeue:
550544

551545
```ocaml
552546
# let a_domain = Domain.spawn @@ fun () ->
547+
Scheduler.run @@ fun () ->
553548
let tx ~xt = (pop ~xt a_stack, dequeue ~xt a_queue) in
554549
let (popped, dequeued) = Xt.commit { tx } in
555550
Printf.sprintf "I popped %d and dequeued %d!"
556551
popped dequeued
557552
val a_domain : string Domain.t = <abstr>
558553
```
559554

555+
**Kcas** uses the [Picos](https://github.com/ocaml-multicore/picos/) interface
556+
to implement blocking. Above `Scheduler.run` starts an effects based
557+
[Picos compatible](https://ocaml-multicore.github.io/picos/doc/picos/index.html#interoperability)
558+
scheduler, which allows **Kcas** to block in a scheduler friendly manner.
559+
560+
> **_Note_**: Typically your entire program would run inside a scheduler and you
561+
> should
562+
> [fork fibers](https://ocaml-multicore.github.io/picos/doc/picos_mux/index.html#examples)
563+
> rather than spawn domains and start schedulers. The
564+
> [MDX](https://github.com/realworldocaml/mdx) tool used for checking this
565+
> document does not allow one to start a scheduler once and run individual code
566+
> snippets within the scheduler, which is why individual examples spawn domains
567+
> and start schedulers.
568+
560569
The domain is now blocked waiting for changes to the stack and the queue. As
561570
long as we don't populate both at the same time
562571

@@ -584,7 +593,7 @@ The retry mechanism essentially allows a transaction to wait for an arbitrary
584593
condition and can function as a fairly expressive communication and
585594
synchronization mechanism.
586595

587-
#### Timeouts
596+
#### Cancelation and Timeouts
588597

589598
> If you block, will they come?
590599
@@ -604,43 +613,30 @@ val pop_or_raise_if :
604613
xt:'a Xt.t -> bool Loc.t -> 'b list Loc.t -> xt:'c Xt.t -> 'b = <fun>
605614
```
606615

607-
This works, but creating, checking, and canceling timeouts properly can be a lot
608-
of work. Therefore **Kcas** also directly supports an optional `timeoutf`
609-
argument for potentially blocking operations. For example, to perform a blocking
610-
pop with a timeout, one can simply explicitly pass the desired timeout in
611-
seconds:
612-
613-
```ocaml
614-
# let an_empty_stack = stack () in
615-
Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack }
616-
Exception: Failure "Domain_local_timeout.set_timeoutf not implemented".
617-
```
618-
619-
Oops! What happened above is that the
620-
[_domain local timeout_](https://github.com/ocaml-multicore/domain-local-timeout)
621-
mechanism used by **Kcas** was not implemented on the current domain. The idea
622-
is that, in the future, concurrent schedulers provide the mechanism out of the
623-
box, but there is also a default implementation using the Stdlib `Thread` and
624-
`Unix` modules that works on most platforms. However, to avoid direct
625-
dependencies to `Thread` and `Unix`, we need to explicitly tell the library that
626-
it can use those modules:
616+
This works, but creating, checking, and canceling timeouts properly in this
617+
manner can be a lot of work. Therefore **Kcas** also directly supports
618+
[cancelation](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/index.html#understanding-cancelation)
619+
through the [Picos](https://github.com/ocaml-multicore/picos/) interface. This
620+
both allows **Kcas** transactions to be cleanly terminated in case the program
621+
has encountered an error and also allows one to simply use a timeout mechanism
622+
provided by the scheduler. For example, the sample
623+
[structured concurrency library](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/index.html)
627624

628625
```ocaml
629-
# Domain_local_timeout.set_system (module Thread) (module Unix)
630-
- : unit = ()
626+
# open Picos_std_structured
631627
```
632628

633-
This initialization, if needed, should be done by application code rather than
634-
by libraries.
635-
636-
If we now retry the previous example we will get a
637-
[`Timeout`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Timeout/index.html#exception-Timeout)
638-
exception as expected:
629+
for Picos provides the
630+
[`Control.terminate_after`](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/Control/index.html#val-terminate_after)
631+
operation, which allows one to easily run an operation with a timeout on the
632+
current fiber:
639633

640634
```ocaml
641635
# let an_empty_stack = stack () in
642-
Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack }
643-
Exception: Kcas.Timeout.Timeout.
636+
Scheduler.run @@ fun () ->
637+
Control.terminate_after ~seconds:0.1 @@ fun () ->
638+
Xt.commit { tx = pop an_empty_stack }
639+
Exception: Picos_std_structured__Control.Terminate.
644640
```
645641

646642
Besides
@@ -650,7 +646,7 @@ potentially blocking single location operations such as
650646
[`update`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-update),
651647
and
652648
[`modify`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-modify)
653-
support the optional `timeoutf` argument.
649+
support cancelation.
654650

655651
#### A transactional lock-free leftist heap
656652

@@ -838,10 +834,9 @@ structures.
838834
One source of ready-made data structures is
839835
[**Kcas_data**](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/index.html).
840836
Let's explore how we can leverage those data structures. Of course, first we
841-
need to `#require` the package and we'll also open it for convenience:
837+
open `Kcas_data` for convenience:
842838

843839
```ocaml
844-
# #require "kcas_data"
845840
# open Kcas_data
846841
```
847842

@@ -914,6 +909,7 @@ the philosophers:
914909
in
915910
Array.iter Domain.join @@ Array.init philosophers @@ fun i ->
916911
Domain.spawn @@ fun () ->
912+
Scheduler.run @@ fun () ->
917913
let fork_lhs = forks.(i)
918914
and fork_rhs = forks.((i + 1) mod philosophers)
919915
and eaten = eaten.(i) in

bench/bench_accumulator.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () =
99
let n_ops_todo = Atomic.make n_ops |> Multicore_magic.copy_as_padded in
1010

1111
let init _ = () in
12-
12+
let wrap _ _ action = Scheduler.run action in
1313
let work _ () =
1414
let rec work () =
1515
let n = Util.alloc n_ops_todo in
@@ -34,7 +34,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () =
3434
(if n_domains = 1 then "" else "s")
3535
in
3636

37-
Times.record ~budgetf ~n_domains ~init ~work ~after ()
37+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after ()
3838
|> Times.to_thruput_metrics ~n:n_ops ~config ~singular:"operation"
3939

4040
let run_suite ~budgetf =

bench/bench_dllist.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ let run_single ~budgetf ?(n_msgs = 15 * Util.iter_factor) () =
1212
assert (Dllist.is_empty t);
1313
Util.generate_push_and_pop_sequence n_msgs
1414
in
15+
let wrap _ _ action = Scheduler.run action in
1516
let work _ bits = Util.Bits.iter op bits in
1617

17-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
18+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
1819
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
1920

2021
let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
@@ -31,6 +32,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
3132
Atomic.set n_msgs_to_take n_msgs;
3233
Atomic.set n_msgs_to_add n_msgs
3334
in
35+
let wrap _ _ action = Scheduler.run action in
3436
let work i () =
3537
if i < n_adders then
3638
let rec work () =
@@ -70,7 +72,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
7072
(format "taker" false n_takers)
7173
in
7274

73-
Times.record ~budgetf ~n_domains ~init ~work ()
75+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
7476
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
7577

7678
let run_suite ~budgetf =

bench/bench_hashtbl.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor)
2424
Atomic.set n_ops_todo n_ops;
2525
Random.State.make_self_init ()
2626
in
27-
27+
let wrap _ _ action = Scheduler.run action in
2828
let work _ state =
2929
let rec work () =
3030
let n = Util.alloc n_ops_todo in
@@ -56,7 +56,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor)
5656
percent_read
5757
in
5858

59-
Times.record ~budgetf ~n_domains ~init ~work ()
59+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
6060
|> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config
6161

6262
let run_suite ~budgetf =

bench/bench_mvar.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
1111
let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in
1212

1313
let init _ = () in
14+
let wrap _ _ action = Scheduler.run action in
1415
let work i () =
1516
if i < n_adders then
1617
if blocking_add then
@@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
7980
(format "taker" blocking_take n_takers)
8081
in
8182

82-
Times.record ~budgetf ~n_domains ~init ~work ~after ()
83+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after ()
8384
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
8485

8586
let run_suite ~budgetf =

bench/bench_parallel_cmp.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () =
1111
let n_ops_todo = Atomic.make n_ops |> Multicore_magic.copy_as_padded in
1212

1313
let init i = Array.unsafe_get xs i in
14-
14+
let wrap _ _ action = Scheduler.run action in
1515
let work _ x =
1616
let tx1 ~xt =
1717
let a = Xt.get ~xt a in
@@ -41,7 +41,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () =
4141
Printf.sprintf "%d worker%s" n_domains (if n_domains = 1 then "" else "s")
4242
in
4343

44-
Times.record ~budgetf ~n_domains ~init ~work ~after ()
44+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after ()
4545
|> Times.to_thruput_metrics ~n:n_ops ~singular:"transaction" ~config
4646

4747
let run_suite ~budgetf =

bench/bench_queue.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
1010
assert (Queue.is_empty t);
1111
Util.generate_push_and_pop_sequence n_msgs
1212
in
13+
let wrap _ _ action = Scheduler.run action in
1314
let work _ bits = Util.Bits.iter op bits in
1415

15-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
16+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
1617
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
1718

1819
let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
@@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
2930
Atomic.set n_msgs_to_take n_msgs;
3031
Atomic.set n_msgs_to_add n_msgs
3132
in
33+
let wrap _ _ action = Scheduler.run action in
3234
let work i () =
3335
if i < n_adders then
3436
let rec work () =
@@ -79,7 +81,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
7981
(format "taker" blocking_take n_takers)
8082
in
8183

82-
Times.record ~budgetf ~n_domains ~init ~work ()
84+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
8385
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
8486

8587
let run_suite ~budgetf =

bench/bench_stack.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
1010
assert (Stack.is_empty t);
1111
Util.generate_push_and_pop_sequence n_msgs
1212
in
13+
let wrap _ _ action = Scheduler.run action in
1314
let work _ bits = Util.Bits.iter op bits in
1415

15-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
16+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
1617
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
1718

1819
let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
@@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
2930
Atomic.set n_msgs_to_take n_msgs;
3031
Atomic.set n_msgs_to_add n_msgs
3132
in
33+
let wrap _ _ action = Scheduler.run action in
3234
let work i () =
3335
if i < n_adders then
3436
let rec work () =
@@ -79,7 +81,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
7981
(format "taker" blocking_take n_takers)
8082
in
8183

82-
Times.record ~budgetf ~n_domains ~init ~work ()
84+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
8385
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
8486

8587
let run_suite ~budgetf =

bench/bench_xt.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2)
1515
in
1616

1717
let init _ = () in
18+
let wrap _ _ action = Scheduler.run action in
1819
let work _ () =
1920
let rec loop i =
2021
if i > 0 then begin
@@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2)
2728

2829
let config = Printf.sprintf "%d loc tx" n_locs in
2930

30-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
31+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
3132
|> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config
3233

3334
let run_suite ~budgetf =

bench/bench_xt_ro.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2)
1515
in
1616

1717
let init _ = () in
18+
let wrap _ _ action = Scheduler.run action in
1819
let work _ () =
1920
let rec loop i =
2021
if i > 0 then begin
@@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2)
2728

2829
let config = Printf.sprintf "%d loc tx" n_locs in
2930

30-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
31+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
3132
|> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config
3233

3334
let run_suite ~budgetf =

bench/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ let () =
1414
(action
1515
(run %{test} -brief))
1616
(libraries
17+
scheduler
1718
kcas_data
1819
multicore-bench
1920
backoff

doc/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
(package kcas_data))
66
(enabled_if
77
(>= %{ocaml_version} 5.0.0))
8-
(files gkmz-with-read-only-cmp-ops.md scheduler-interop.md))
8+
(files gkmz-with-read-only-cmp-ops.md))

0 commit comments

Comments
 (0)