Skip to content

Commit 2a81fae

Browse files
committed
Change to use Picos instead of DLA and DLT
This adds support for cancelation through Picos and removes explicit support for timeouts, which simplifies the library. Support for DLA and DLT is removed. This basically also means that one can no longer use Kcas without a scheduler.
1 parent 97e6f09 commit 2a81fae

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+786
-1017
lines changed

CHANGES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## Next version
2+
3+
- Changed to use [Picos](https://github.com/ocaml-multicore/picos/) instead of
4+
[DLA](https://github.com/ocaml-multicore/domain-local-await/) and
5+
[DLT](https://github.com/ocaml-multicore/domain-local-timeout/) (@polytypic)
6+
17
## 0.7.0
28

39
- Numerous minor internal improvements (@polytypic)

README.md

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ Features and properties:
4747
read-only compare (CMP) operations that can be performed on overlapping
4848
locations in parallel without interference.
4949

50-
- **_Blocking await_**: The algorithm supports timeouts and awaiting for changes
51-
to any number of shared memory locations.
50+
- **_Blocking await_**: The algorithm supports cancelation and awaiting for
51+
changes to any number of shared memory locations.
5252

5353
- **_Composable_**: Independently developed transactions can be composed with
5454
ease sequentially, conjunctively, conditionally, and disjunctively.
@@ -76,7 +76,7 @@ is distributed under the [ISC license](LICENSE.md).
7676
- [A transactional lock-free queue](#a-transactional-lock-free-queue)
7777
- [Composing transactions](#composing-transactions)
7878
- [Blocking transactions](#blocking-transactions)
79-
- [Timeouts](#timeouts)
79+
- [Cancelation and Timeouts](#cancelation-and-timeouts)
8080
- [A transactional lock-free leftist heap](#a-transactional-lock-free-leftist-heap)
8181
- [Programming with transactional data structures](#programming-with-transactional-data-structures)
8282
- [The dining philosophers problem](#the-dining-philosophers-problem)
@@ -101,14 +101,7 @@ is distributed under the [ISC license](LICENSE.md).
101101

102102
To use the library
103103

104-
<!--
105104
```ocaml
106-
# #thread
107-
```
108-
-->
109-
110-
```ocaml
111-
# #require "kcas"
112105
# open Kcas
113106
```
114107

@@ -143,6 +136,7 @@ Block waiting for changes to locations:
143136

144137
```ocaml
145138
# let a_domain = Domain.spawn @@ fun () ->
139+
Scheduler.run @@ fun () ->
146140
let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in
147141
Printf.sprintf "The answer is %d!" x
148142
val a_domain : string Domain.t = <abstr>
@@ -551,13 +545,28 @@ and then spawn a domain that tries to atomically both pop and dequeue:
551545

552546
```ocaml
553547
# let a_domain = Domain.spawn @@ fun () ->
548+
Scheduler.run @@ fun () ->
554549
let tx ~xt = (pop ~xt a_stack, dequeue ~xt a_queue) in
555550
let (popped, dequeued) = Xt.commit { tx } in
556551
Printf.sprintf "I popped %d and dequeued %d!"
557552
popped dequeued
558553
val a_domain : string Domain.t = <abstr>
559554
```
560555

556+
**Kcas** uses the [Picos](https://github.com/ocaml-multicore/picos/) interface
557+
to implement blocking. Above `Scheduler.run` starts an effects based
558+
[Picos compatible](https://ocaml-multicore.github.io/picos/doc/picos/index.html#interoperability)
559+
scheduler, which allows **Kcas** to block in a scheduler friendly manner.
560+
561+
> **_Note_**: Typically your entire program would run inside a scheduler and you
562+
> should
563+
> [fork fibers](https://ocaml-multicore.github.io/picos/doc/picos_mux/index.html#examples)
564+
> rather than spawn domains and start schedulers. The
565+
> [MDX](https://github.com/realworldocaml/mdx) tool used for checking this
566+
> document does not allow one to start a scheduler once and run individual code
567+
> snippets within the scheduler, which is why individual examples spawn domains
568+
> and start schedulers.
569+
561570
The domain is now blocked waiting for changes to the stack and the queue. As
562571
long as we don't populate both at the same time
563572

@@ -585,7 +594,7 @@ The retry mechanism essentially allows a transaction to wait for an arbitrary
585594
condition and can function as a fairly expressive communication and
586595
synchronization mechanism.
587596

588-
#### Timeouts
597+
#### Cancelation and Timeouts
589598

590599
> If you block, will they come?
591600
@@ -605,43 +614,30 @@ val pop_or_raise_if :
605614
xt:'a Xt.t -> bool Loc.t -> 'b list Loc.t -> xt:'c Xt.t -> 'b = <fun>
606615
```
607616

608-
This works, but creating, checking, and canceling timeouts properly can be a lot
609-
of work. Therefore **Kcas** also directly supports an optional `timeoutf`
610-
argument for potentially blocking operations. For example, to perform a blocking
611-
pop with a timeout, one can simply explicitly pass the desired timeout in
612-
seconds:
613-
614-
```ocaml
615-
# let an_empty_stack = stack () in
616-
Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack }
617-
Exception: Failure "Domain_local_timeout.set_timeoutf not implemented".
618-
```
619-
620-
Oops! What happened above is that the
621-
[_domain local timeout_](https://github.com/ocaml-multicore/domain-local-timeout)
622-
mechanism used by **Kcas** was not implemented on the current domain. The idea
623-
is that, in the future, concurrent schedulers provide the mechanism out of the
624-
box, but there is also a default implementation using the Stdlib `Thread` and
625-
`Unix` modules that works on most platforms. However, to avoid direct
626-
dependencies to `Thread` and `Unix`, we need to explicitly tell the library that
627-
it can use those modules:
617+
This works, but creating, checking, and canceling timeouts properly in this
618+
manner can be a lot of work. Therefore **Kcas** also directly supports
619+
[cancelation](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/index.html#understanding-cancelation)
620+
through the [Picos](https://github.com/ocaml-multicore/picos/) interface. This
621+
both allows **Kcas** transactions to be cleanly terminated in case the program
622+
has encountered an error and also allows one to simply use a timeout mechanism
623+
provided by the scheduler. For example, the sample
624+
[structured concurrency library](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/index.html)
628625

629626
```ocaml
630-
# Domain_local_timeout.set_system (module Thread) (module Unix)
631-
- : unit = ()
627+
# open Picos_std_structured
632628
```
633629

634-
This initialization, if needed, should be done by application code rather than
635-
by libraries.
636-
637-
If we now retry the previous example we will get a
638-
[`Timeout`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Timeout/index.html#exception-Timeout)
639-
exception as expected:
630+
for Picos provides the
631+
[`Control.terminate_after`](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/Control/index.html#val-terminate_after)
632+
operation, which allows one to easily run an operation with a timeout on the
633+
current fiber:
640634

641635
```ocaml
642636
# let an_empty_stack = stack () in
643-
Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack }
644-
Exception: Kcas.Timeout.Timeout.
637+
Scheduler.run @@ fun () ->
638+
Control.terminate_after ~seconds:0.1 @@ fun () ->
639+
Xt.commit { tx = pop an_empty_stack }
640+
Exception: Picos_std_structured__Control.Terminate.
645641
```
646642

647643
Besides
@@ -651,7 +647,7 @@ potentially blocking single location operations such as
651647
[`update`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-update),
652648
and
653649
[`modify`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-modify)
654-
support the optional `timeoutf` argument.
650+
support cancelation.
655651

656652
#### A transactional lock-free leftist heap
657653

@@ -839,10 +835,9 @@ structures.
839835
One source of ready-made data structures is
840836
[**Kcas_data**](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/index.html).
841837
Let's explore how we can leverage those data structures. Of course, first we
842-
need to `#require` the package and we'll also open it for convenience:
838+
open `Kcas_data` for convenience:
843839

844840
```ocaml
845-
# #require "kcas_data"
846841
# open Kcas_data
847842
```
848843

@@ -915,6 +910,7 @@ the philosophers:
915910
in
916911
Array.iter Domain.join @@ Array.init philosophers @@ fun i ->
917912
Domain.spawn @@ fun () ->
913+
Scheduler.run @@ fun () ->
918914
let fork_lhs = forks.(i)
919915
and fork_rhs = forks.((i + 1) mod philosophers)
920916
and eaten = eaten.(i) in

bench/bench_accumulator.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () =
99
let n_ops_todo = Countdown.create ~n_domains () in
1010

1111
let init _ = Countdown.non_atomic_set n_ops_todo n_ops in
12+
let wrap _ _ action = Scheduler.run action in
1213
let work domain_index () =
1314
let rec work () =
1415
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:1000 in
@@ -31,7 +32,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () =
3132
(if n_domains = 1 then "" else "s")
3233
in
3334

34-
Times.record ~budgetf ~n_domains ~init ~work ()
35+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
3536
|> Times.to_thruput_metrics ~n:n_ops ~config ~singular:"operation"
3637

3738
let run_suite ~budgetf =

bench/bench_dllist.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ let run_single ~budgetf ?(n_msgs = 15 * Util.iter_factor) () =
1212
assert (Dllist.is_empty t);
1313
Util.generate_push_and_pop_sequence n_msgs
1414
in
15+
let wrap _ _ action = Scheduler.run action in
1516
let work _ bits = Util.Bits.iter op bits in
1617

17-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
18+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
1819
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
1920

2021
let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
@@ -31,6 +32,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
3132
Countdown.non_atomic_set n_msgs_to_take n_msgs;
3233
Countdown.non_atomic_set n_msgs_to_add n_msgs
3334
in
35+
let wrap _ _ action = Scheduler.run action in
3436
let work i () =
3537
if i < n_adders then
3638
let domain_index = i in
@@ -72,7 +74,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
7274
(format "taker" false n_takers)
7375
in
7476

75-
Times.record ~budgetf ~n_domains ~init ~work ()
77+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
7678
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
7779

7880
let run_suite ~budgetf =

bench/bench_hashtbl.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor)
2424
Countdown.non_atomic_set n_ops_todo n_ops;
2525
Random.State.make_self_init ()
2626
in
27-
27+
let wrap _ _ action = Scheduler.run action in
2828
let work domain_index state =
2929
let rec work () =
3030
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:1000 in
@@ -56,7 +56,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor)
5656
percent_read
5757
in
5858

59-
Times.record ~budgetf ~n_domains ~init ~work ()
59+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
6060
|> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config
6161

6262
let run_suite ~budgetf =

bench/bench_mvar.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
1414
Countdown.non_atomic_set n_msgs_to_take n_msgs;
1515
Countdown.non_atomic_set n_msgs_to_add n_msgs
1616
in
17+
let wrap _ _ action = Scheduler.run action in
1718
let work i () =
1819
if i < n_adders then
1920
let domain_index = i in
@@ -81,7 +82,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
8182
(format "taker" blocking_take n_takers)
8283
in
8384

84-
Times.record ~budgetf ~n_domains ~init ~work ()
85+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
8586
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
8687

8788
let run_suite ~budgetf =

bench/bench_parallel_cmp.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () =
1414
Countdown.non_atomic_set n_ops_todo n_ops;
1515
Array.unsafe_get xs i
1616
in
17+
let wrap _ _ action = Scheduler.run action in
1718
let work domain_index x =
1819
let tx1 ~xt =
1920
let a = Xt.get ~xt a in
@@ -41,7 +42,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () =
4142
Printf.sprintf "%d worker%s" n_domains (if n_domains = 1 then "" else "s")
4243
in
4344

44-
Times.record ~budgetf ~n_domains ~init ~work ()
45+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
4546
|> Times.to_thruput_metrics ~n:n_ops ~singular:"transaction" ~config
4647

4748
let run_suite ~budgetf =

bench/bench_queue.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
1010
assert (Queue.is_empty t);
1111
Util.generate_push_and_pop_sequence n_msgs
1212
in
13+
let wrap _ _ action = Scheduler.run action in
1314
let work _ bits = Util.Bits.iter op bits in
1415

15-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
16+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
1617
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
1718

1819
let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
@@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
2930
Countdown.non_atomic_set n_msgs_to_take n_msgs;
3031
Countdown.non_atomic_set n_msgs_to_add n_msgs
3132
in
33+
let wrap _ _ action = Scheduler.run action in
3234
let work i () =
3335
if i < n_adders then
3436
let domain_index = i in
@@ -82,7 +84,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
8284
(format "taker" blocking_take n_takers)
8385
in
8486

85-
Times.record ~budgetf ~n_domains ~init ~work ()
87+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
8688
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
8789

8890
let run_suite ~budgetf =

bench/bench_stack.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
1010
assert (Stack.is_empty t);
1111
Util.generate_push_and_pop_sequence n_msgs
1212
in
13+
let wrap _ _ action = Scheduler.run action in
1314
let work _ bits = Util.Bits.iter op bits in
1415

15-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
16+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
1617
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
1718

1819
let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
@@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
2930
Countdown.non_atomic_set n_msgs_to_take n_msgs;
3031
Countdown.non_atomic_set n_msgs_to_add n_msgs
3132
in
33+
let wrap _ _ action = Scheduler.run action in
3234
let work i () =
3335
if i < n_adders then
3436
let domain_index = i in
@@ -82,7 +84,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
8284
(format "taker" blocking_take n_takers)
8385
in
8486

85-
Times.record ~budgetf ~n_domains ~init ~work ()
87+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
8688
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
8789

8890
let run_suite ~budgetf =

bench/bench_xt.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2)
1515
in
1616

1717
let init _ = () in
18+
let wrap _ _ action = Scheduler.run action in
1819
let work _ () =
1920
let rec loop i =
2021
if i > 0 then begin
@@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2)
2728

2829
let config = Printf.sprintf "%d loc tx" n_locs in
2930

30-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
31+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
3132
|> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config
3233

3334
let run_suite ~budgetf =

bench/bench_xt_ro.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2)
1515
in
1616

1717
let init _ = () in
18+
let wrap _ _ action = Scheduler.run action in
1819
let work _ () =
1920
let rec loop i =
2021
if i > 0 then begin
@@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2)
2728

2829
let config = Printf.sprintf "%d loc tx" n_locs in
2930

30-
Times.record ~budgetf ~n_domains:1 ~init ~work ()
31+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
3132
|> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config
3233

3334
let run_suite ~budgetf =

bench/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@
1919
(run %{test} -brief "Kcas_data Mvar")
2020
(run %{test} -brief "Kcas_data Queue")
2121
(run %{test} -brief "Kcas_data Stack")))
22-
(libraries kcas_data multicore-bench backoff multicore-magic))
22+
(libraries backoff kcas_data multicore-bench multicore-magic scheduler))

doc/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
(package kcas_data))
66
(enabled_if
77
(>= %{ocaml_version} 5.0.0))
8-
(files gkmz-with-read-only-cmp-ops.md scheduler-interop.md))
8+
(files gkmz-with-read-only-cmp-ops.md))

0 commit comments

Comments
 (0)