Skip to content

Commit 5247b99

Browse files
committed
Reimplement queue using a new two stack representation
1 parent 3af1f71 commit 5247b99

File tree

1 file changed

+179
-122
lines changed

1 file changed

+179
-122
lines changed

src/kcas_data/queue.ml

Lines changed: 179 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,162 +1,219 @@
11
open Kcas
22

3-
type 'a t = {
4-
front : 'a Elems.t Loc.t;
5-
middle : 'a Elems.t Loc.t;
6-
back : 'a Elems.t Loc.t;
7-
}
8-
9-
let alloc ~front ~middle ~back =
10-
(* We allocate locations in specific order to make most efficient use of the
11-
splay-tree based transaction log. *)
12-
let front = Loc.make ~padded:true front
13-
and middle = Loc.make ~padded:true middle
14-
and back = Loc.make ~padded:true back in
15-
Multicore_magic.copy_as_padded { back; middle; front }
3+
let unique = ref ()
4+
let null () = Obj.magic unique
165

17-
let create () = alloc ~front:Elems.empty ~middle:Elems.empty ~back:Elems.empty
6+
module Elems = struct
7+
type 'a t = { value : 'a; tl : 'a t; length : int }
188

19-
let copy q =
20-
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.middle, Xt.get ~xt q.back) in
21-
let front, middle, back = Xt.commit { tx } in
22-
alloc ~front ~middle ~back
9+
let rec empty = { value = null (); tl = empty; length = 0 }
10+
let[@inline] length t = t.length lxor (t.length asr (Sys.int_size - 1))
2311

24-
module Xt = struct
25-
let is_empty ~xt t =
26-
(* We access locations in order of allocation to make most efficient use of
27-
the splay-tree based transaction log. *)
28-
Xt.get ~xt t.front == Elems.empty
29-
&& Xt.get ~xt t.middle == Elems.empty
30-
&& Xt.get ~xt t.back == Elems.empty
31-
32-
let length ~xt { back; middle; front } =
33-
Elems.length (Xt.get ~xt front)
34-
+ Elems.length (Xt.get ~xt middle)
35-
+ Elems.length (Xt.get ~xt back)
36-
37-
let add ~xt x q = Xt.unsafe_modify ~xt q.back @@ Elems.cons x
38-
let push = add
12+
let rec rev_append length t tl =
13+
if length = 0 then tl
14+
else rev_append (length - 1) t.tl { value = t.value; tl; length }
3915

40-
(** Cooperative helper to move elems from back to middle. *)
41-
let back_to_middle ~middle ~back =
42-
let tx ~xt =
43-
let xs = Xt.exchange ~xt back Elems.empty in
44-
if xs == Elems.empty || Xt.exchange ~xt middle xs != Elems.empty then
45-
raise_notrace Exit
46-
in
47-
try Xt.commit { tx } with Exit -> ()
16+
let tl_safe t = if -2 <= t.length then t.tl else t
17+
18+
let[@inline] tl res t =
19+
let length = t.length in
20+
if -2 <= length then begin
21+
if length <> 0 then res := t.value;
22+
t.tl
23+
end
24+
else
25+
let length = lnot length in
26+
let t =
27+
rev_append (length - 1) t.tl { value = t.value; tl = empty; length }
28+
in
29+
res := t.value;
30+
t.tl
31+
32+
let peek res t =
33+
let length = t.length in
34+
if -2 <= length then begin
35+
if length <> 0 then res := t.value;
36+
t
37+
end
38+
else
39+
let length = lnot length in
40+
let t =
41+
rev_append (length - 1) t.tl { value = t.value; tl = empty; length }
42+
in
43+
res := t.value;
44+
t
45+
46+
let rec prepend_to_seq t tl =
47+
if t == empty then tl
48+
else fun () -> Seq.Cons (t.value, prepend_to_seq t.tl tl)
49+
end
4850

49-
let take_opt_finish ~xt front elems =
50-
let elems = Elems.rev elems in
51-
Xt.set ~xt front (Elems.tl_safe elems);
52-
Elems.hd_opt elems
51+
module Back = struct
52+
type 'a t = { length : int; front : 'a; elems : 'a Elems.t }
5353

54-
let take_opt ~xt t =
55-
let front = t.front in
56-
let elems = Xt.unsafe_update ~xt front Elems.tl_safe in
57-
if elems != Elems.empty then Elems.hd_opt elems
54+
let empty = { length = -1; front = null (); elems = Elems.empty }
55+
let[@inline] length t = lnot t.length
56+
57+
let[@inline] snoc x t =
58+
let length = t.length in
59+
if length = -1 then { length = length - 1; front = x; elems = Elems.empty }
5860
else
59-
let middle = t.middle and back = t.back in
60-
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
61-
back_to_middle ~middle ~back;
62-
let elems = Xt.exchange ~xt middle Elems.empty in
63-
if elems != Elems.empty then take_opt_finish ~xt front elems
61+
{
62+
length = length - 1;
63+
front = t.front;
64+
elems = { value = x; tl = t.elems; length };
65+
}
66+
67+
let rev_prepend_to_seq t tl =
68+
let tl =
69+
if t.length >= -2 then Elems.prepend_to_seq t.elems tl
6470
else
65-
let elems = Xt.exchange ~xt back Elems.empty in
66-
if elems != Elems.empty then take_opt_finish ~xt front elems else None
71+
let t = ref (Either.Left t.elems) in
72+
fun () ->
73+
let t =
74+
match !t with
75+
| Left t' ->
76+
(* This is parallelism safe as the result is always equivalent. *)
77+
let t' = Elems.rev_append (lnot t'.length) t' Elems.empty in
78+
t := Right t';
79+
t'
80+
| Right t' -> t'
81+
in
82+
Elems.prepend_to_seq t tl ()
83+
in
84+
if t.length <= -2 then Seq.cons t.front tl else tl
85+
end
86+
87+
type 'a t = { front : 'a Elems.t Loc.t; back : 'a Back.t Loc.t }
88+
89+
let alloc ~front ~back =
90+
let front = Loc.make ~padded:true front
91+
and back = Loc.make ~padded:true back in
92+
Multicore_magic.copy_as_padded { front; back }
93+
94+
let create () = alloc ~front:Elems.empty ~back:Back.empty
95+
96+
let copy t =
97+
let tx ~xt = (Xt.get ~xt t.front, Xt.get ~xt t.back) in
98+
let front, back = Xt.commit { tx } in
99+
alloc ~front ~back
67100

68-
let take_blocking ~xt q = Xt.to_blocking ~xt (take_opt q)
101+
module Xt = struct
102+
let is_empty ~xt t =
103+
Xt.get ~xt t.front == Elems.empty && Xt.get ~xt t.back == Back.empty
69104

70-
let peek_opt_finish ~xt front elems =
71-
let elems = Elems.rev elems in
72-
Xt.set ~xt front elems;
73-
Elems.hd_opt elems
105+
let length ~xt t =
106+
Elems.length (Xt.get ~xt t.front) + Back.length (Xt.get ~xt t.back)
107+
108+
let add ~xt x t = Xt.unsafe_modify ~xt t.back @@ Back.snoc x
109+
let push = add
74110

75111
let peek_opt ~xt t =
76-
let front = t.front in
77-
let elems = Xt.get ~xt front in
78-
if elems != Elems.empty then Elems.hd_opt elems
79-
else
80-
let middle = t.middle and back = t.back in
81-
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
82-
back_to_middle ~middle ~back;
83-
let elems = Xt.exchange ~xt middle Elems.empty in
84-
if elems != Elems.empty then peek_opt_finish ~xt front elems
85-
else
86-
let elems = Xt.exchange ~xt back Elems.empty in
87-
if elems != Elems.empty then peek_opt_finish ~xt front elems else None
112+
let res = ref (null ()) in
113+
Xt.unsafe_modify ~xt t.front @@ Elems.peek res;
114+
let res = !res in
115+
if res == null () then
116+
let back = Xt.get ~xt t.back in
117+
if back.length = -1 then None else Some back.front
118+
else Some res
119+
120+
let peek_blocking ~xt t =
121+
let res = ref (null ()) in
122+
Xt.unsafe_modify ~xt t.front @@ Elems.peek res;
123+
let res = !res in
124+
if res == null () then
125+
let back = Xt.get ~xt t.back in
126+
if back.length = -1 then Retry.later () else back.front
127+
else res
88128

89-
let peek_blocking ~xt q = Xt.to_blocking ~xt (peek_opt q)
129+
let take_opt ~xt t =
130+
let res = ref (null ()) in
131+
Xt.unsafe_modify ~xt t.front @@ Elems.tl res;
132+
let res = !res in
133+
if res == null () then
134+
let back = Xt.exchange ~xt t.back Back.empty in
135+
if back.length = -1 then None
136+
else begin
137+
Xt.set ~xt t.front back.elems;
138+
Some back.front
139+
end
140+
else Some res
141+
142+
let take_blocking ~xt t =
143+
let res = ref (null ()) in
144+
Xt.unsafe_modify ~xt t.front @@ Elems.tl res;
145+
let res = !res in
146+
if res == null () then
147+
let back = Xt.exchange ~xt t.back Back.empty in
148+
if back.length = -1 then Retry.later ()
149+
else begin
150+
Xt.set ~xt t.front back.elems;
151+
back.front
152+
end
153+
else res
90154

91155
let clear ~xt t =
92156
Xt.set ~xt t.front Elems.empty;
93-
Xt.set ~xt t.middle Elems.empty;
94-
Xt.set ~xt t.back Elems.empty
95-
96-
let swap ~xt q1 q2 =
97-
let front = Xt.get ~xt q1.front
98-
and middle = Xt.get ~xt q1.middle
99-
and back = Xt.get ~xt q1.back in
100-
let front = Xt.exchange ~xt q2.front front
101-
and middle = Xt.exchange ~xt q2.middle middle
102-
and back = Xt.exchange ~xt q2.back back in
103-
Xt.set ~xt q1.front front;
104-
Xt.set ~xt q1.middle middle;
105-
Xt.set ~xt q1.back back
106-
107-
let seq_of ~front ~middle ~back =
108-
(* Sequence construction is lazy, so this function is O(1). *)
109-
Seq.empty
110-
|> Elems.rev_prepend_to_seq back
111-
|> Elems.rev_prepend_to_seq middle
112-
|> Elems.prepend_to_seq front
157+
Xt.set ~xt t.back Back.empty
158+
159+
let swap ~xt t1 t2 =
160+
let front = Xt.get ~xt t1.front and back = Xt.get ~xt t1.back in
161+
let front = Xt.exchange ~xt t2.front front
162+
and back = Xt.exchange ~xt t2.back back in
163+
Xt.set ~xt t1.front front;
164+
Xt.set ~xt t1.back back
165+
166+
let seq_of ~front ~back =
167+
Seq.empty |> Back.rev_prepend_to_seq back |> Elems.prepend_to_seq front
113168

114169
let to_seq ~xt t =
115-
let front = Xt.get ~xt t.front
116-
and middle = Xt.get ~xt t.middle
117-
and back = Xt.get ~xt t.back in
118-
seq_of ~front ~middle ~back
170+
let front = Xt.get ~xt t.front and back = Xt.get ~xt t.back in
171+
seq_of ~front ~back
119172

120173
let take_all ~xt t =
121174
let front = Xt.exchange ~xt t.front Elems.empty
122-
and middle = Xt.exchange ~xt t.middle Elems.empty
123-
and back = Xt.exchange ~xt t.back Elems.empty in
124-
seq_of ~front ~middle ~back
175+
and back = Xt.exchange ~xt t.back Back.empty in
176+
seq_of ~front ~back
125177
end
126178

127179
let is_empty q = Kcas.Xt.commit { tx = Xt.is_empty q }
128180
let length q = Kcas.Xt.commit { tx = Xt.length q }
129181

130-
let add x q =
182+
let add x t =
131183
(* Fenceless is safe as we always update. *)
132-
Loc.fenceless_modify q.back @@ Elems.cons x
184+
Loc.fenceless_modify t.back @@ Back.snoc x
133185

134186
let push = add
135187

136-
let take_opt q =
188+
let take_opt t =
137189
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
138-
match Loc.fenceless_update q.front Elems.tl_safe |> Elems.hd_opt with
139-
| None -> Kcas.Xt.commit { tx = Xt.take_opt q }
140-
| some -> some
190+
let front = Loc.fenceless_update t.front Elems.tl_safe in
191+
let length = front.length in
192+
if 0 < length || length = -2 then Some front.value
193+
else Kcas.Xt.commit { tx = Xt.take_opt t }
141194

142-
let take_blocking ?timeoutf q =
195+
let take_blocking ?timeoutf t =
143196
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
144-
match Loc.fenceless_update q.front Elems.tl_safe |> Elems.hd_opt with
145-
| None -> Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking q }
146-
| Some elem -> elem
197+
let front = Loc.fenceless_update t.front Elems.tl_safe in
198+
let length = front.length in
199+
if 0 < length || length = -2 then front.value
200+
else Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking t }
201+
202+
let peek_opt t =
203+
let front = Loc.get t.front in
204+
let length = front.length in
205+
if 0 < length || length = -2 then Some front.value
206+
else Kcas.Xt.commit { tx = Xt.peek_opt t }
207+
208+
let peek_blocking ?timeoutf t =
209+
let front = Loc.get t.front in
210+
let length = front.length in
211+
if 0 < length || length = -2 then front.value
212+
else Kcas.Xt.commit ?timeoutf { tx = Xt.peek_blocking t }
147213

148214
let take_all q = Kcas.Xt.commit { tx = Xt.take_all q }
149-
150-
let peek_opt q =
151-
match Loc.get q.front |> Elems.hd_opt with
152-
| None -> Kcas.Xt.commit { tx = Xt.peek_opt q }
153-
| some -> some
154-
155-
let peek_blocking ?timeoutf q =
156-
Kcas.Xt.commit ?timeoutf { tx = Xt.peek_blocking q }
157-
158-
let clear q = Kcas.Xt.commit { tx = Xt.clear q }
159-
let swap q1 q2 = Kcas.Xt.commit { tx = Xt.swap q1 q2 }
215+
let clear t = Kcas.Xt.commit { tx = Xt.clear t }
216+
let swap t1 t2 = Kcas.Xt.commit { tx = Xt.swap t1 t2 }
160217
let to_seq q = Kcas.Xt.commit { tx = Xt.to_seq q }
161218
let iter f q = Seq.iter f @@ to_seq q
162219
let fold f a q = Seq.fold_left f a @@ to_seq q

0 commit comments

Comments
 (0)