Skip to content

Commit f8ecbba

Browse files
committed
WIP: Two stack queue
1 parent 969ddad commit f8ecbba

13 files changed

+328
-23
lines changed

bench/bench_two_stack_queue.ml

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
open Multicore_bench
2+
module Queue = Kcas_data.Two_stack_queue
3+
4+
let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
5+
let t = Queue.create () in
6+
7+
let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in
8+
9+
let init _ =
10+
assert (Queue.pop_opt t == None);
11+
Util.generate_push_and_pop_sequence n_msgs
12+
in
13+
let work _ bits = Util.Bits.iter op bits in
14+
15+
Times.record ~budgetf ~n_domains:1 ~init ~work ()
16+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
17+
18+
let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
19+
?(n_msgs = 100 * Util.iter_factor) () =
20+
let n_domains = n_adders + n_takers in
21+
22+
let t = Queue.create () in
23+
24+
let n_msgs_to_take = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in
25+
let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in
26+
27+
let init _ = () in
28+
let work i () =
29+
if i < n_adders then
30+
let rec work () =
31+
let n = Util.alloc n_msgs_to_add in
32+
if 0 < n then begin
33+
for i = 1 to n do
34+
Queue.push t i
35+
done;
36+
work ()
37+
end
38+
in
39+
work ()
40+
else
41+
let rec work () =
42+
let n = Util.alloc n_msgs_to_take in
43+
if n <> 0 then begin
44+
for _ = 1 to n do
45+
while Option.is_none (Queue.pop_opt t) do
46+
Domain.cpu_relax ()
47+
done
48+
done;
49+
work ()
50+
end
51+
in
52+
work ()
53+
in
54+
let after () =
55+
Atomic.set n_msgs_to_take n_msgs;
56+
Atomic.set n_msgs_to_add n_msgs
57+
in
58+
59+
let config =
60+
let format role blocking n =
61+
Printf.sprintf "%d %s%s%s" n
62+
(if blocking then "" else "nb ")
63+
role
64+
(if n = 1 then "" else "s")
65+
in
66+
Printf.sprintf "%s, %s"
67+
(format "adder" false n_adders)
68+
(format "taker" false n_takers)
69+
in
70+
Times.record ~budgetf ~n_domains ~init ~work ~after ()
71+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
72+
73+
let run_suite ~budgetf =
74+
run_one_domain ~budgetf ()
75+
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
76+
|> List.concat_map @@ fun (n_adders, n_takers) ->
77+
run_one ~budgetf ~n_adders ~n_takers ())

bench/main.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ let benchmarks =
88
("Kcas_data Dllist", Bench_dllist.run_suite);
99
("Kcas_data Hashtbl", Bench_hashtbl.run_suite);
1010
("Kcas_data Mvar", Bench_mvar.run_suite);
11+
("Kcas_data Two_stack_queue", Bench_two_stack_queue.run_suite);
1112
("Kcas_data Queue", Bench_queue.run_suite);
1213
("Kcas_data Stack", Bench_stack.run_suite);
1314
]

dune-project

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,7 @@
7373
(multicore-magic
7474
(>= 2.1.0))
7575
(backoff
76-
(and
77-
(>= 0.1.0)
78-
:with-test))
76+
(>= 0.1.0))
7977
(domain-local-await
8078
(and
8179
(>= 1.0.1)

kcas_data.opam

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ depends: [
1919
"dune" {>= "3.14"}
2020
"kcas" {= version}
2121
"multicore-magic" {>= "2.1.0"}
22-
"backoff" {>= "0.1.0" & with-test}
22+
"backoff" {>= "0.1.0"}
2323
"domain-local-await" {>= "1.0.1" & with-test}
2424
"domain_shims" {>= "0.1.0" & with-test}
2525
"multicore-bench" {>= "0.1.2" & with-test}

src/kcas/kcas.ml

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -539,22 +539,15 @@ let rec exchange_no_alloc backoff loc state =
539539
end
540540
else exchange_no_alloc (Backoff.once backoff) loc state
541541

542-
let[@inline] rec cas_with_state backoff loc before state state_old =
542+
let[@inline] cas_with_state loc before state state_old =
543543
before == eval state_old
544544
&& (before == state.after
545-
||
546-
if Atomic.compare_and_set (as_atomic loc) state_old state then begin
547-
resume_awaiters state_old.awaiters;
548-
true
549-
end
550-
else
551-
(* We must retry, because compare is by value rather than by state. In
552-
other words, we should not fail spuriously due to some other thread
553-
having installed or removed a waiter.
554-
555-
Fenceless is safe as there was a fence before. *)
556-
cas_with_state (Backoff.once backoff) loc before state
557-
(fenceless_get (as_atomic loc)))
545+
|| atomic_get (as_atomic loc) == state_old
546+
&& Atomic.compare_and_set (as_atomic loc) state_old state
547+
&& begin
548+
resume_awaiters state_old.awaiters;
549+
true
550+
end)
558551

559552
let inc x = x + 1
560553
let dec x = x - 1
@@ -607,10 +600,18 @@ module Loc = struct
607600
let[@inline] get_mode loc =
608601
if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free
609602

610-
let compare_and_set ?(backoff = Backoff.default) loc before after =
611-
let state = new_state after in
603+
let compare_and_set loc before after =
612604
let state_old = atomic_get (as_atomic (to_loc loc)) in
613-
cas_with_state backoff (to_loc loc) before state state_old
605+
before == eval state_old
606+
&& (before == after
607+
|| atomic_get (as_atomic (to_loc loc)) == state_old
608+
&& Atomic.compare_and_set
609+
(as_atomic (to_loc loc))
610+
state_old (new_state after)
611+
&& begin
612+
resume_awaiters state_old.awaiters;
613+
true
614+
end)
614615

615616
let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f =
616617
let timeout = Timeout.alloc_opt timeoutf in
@@ -947,7 +948,7 @@ module Xt = struct
947948
(* Fenceless is safe inside transactions as each log update has a
948949
fence. *)
949950
let state_old = fenceless_get (as_atomic loc) in
950-
if cas_with_state Backoff.default loc before state state_old then
951+
if cas_with_state loc before state state_old then
951952
success xt result
952953
else commit_once_reuse backoff xt tx
953954
end

src/kcas/kcas.mli

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ module Loc : sig
232232
conditional load. It is also safe for the given function [f] to raise any
233233
other exception to abort the conditional load. *)
234234

235-
val compare_and_set : ?backoff:Backoff.t -> 'a t -> 'a -> 'a -> bool
235+
val compare_and_set : 'a t -> 'a -> 'a -> bool
236236
(** [compare_and_set r before after] atomically updates the shared memory
237237
location [r] to the [after] value if the current value of [r] is the
238238
[before] value. *)

src/kcas_data/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
(public_name kcas_data)
44
(libraries
55
(re_export kcas)
6+
backoff
67
multicore-magic))
78

89
(rule

src/kcas_data/kcas_data.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
module Hashtbl = Hashtbl
22
module Queue = Queue
3+
module Two_stack_queue = Two_stack_queue
34
module Stack = Stack
45
module Mvar = Mvar
56
module Promise = Promise

src/kcas_data/kcas_data.mli

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122

123123
module Hashtbl = Hashtbl
124124
module Queue = Queue
125+
module Two_stack_queue = Two_stack_queue
125126
module Stack = Stack
126127

127128
(** {1 Communication and synchronization primitives} *)

src/kcas_data/two_stack_queue.ml

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
open Kcas
2+
3+
type 'a t = { head : 'a head Loc.t; tail : 'a tail Loc.t }
4+
5+
and ('a, _) tdt =
6+
| Cons : {
7+
counter : int;
8+
value : 'a;
9+
suffix : 'a head;
10+
}
11+
-> ('a, [> `Cons ]) tdt
12+
| Head : { counter : int } -> ('a, [> `Head ]) tdt
13+
| Snoc : {
14+
counter : int;
15+
prefix : 'a tail;
16+
value : 'a;
17+
}
18+
-> ('a, [> `Snoc ]) tdt
19+
| Tail : {
20+
counter : int;
21+
mutable move : ('a, [ `Snoc ]) tdt;
22+
}
23+
-> ('a, [> `Tail ]) tdt
24+
25+
and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
26+
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]
27+
28+
(* *)
29+
30+
let create () =
31+
let head = Loc.make ~padded:true (H (Head { counter = 1 })) in
32+
let tail =
33+
Loc.make ~padded:true (T (Tail { counter = 0; move = Obj.magic () }))
34+
in
35+
{ head; tail } |> Multicore_magic.copy_as_padded
36+
37+
(* *)
38+
39+
let rec rev (suffix : (_, [< `Cons ]) tdt) = function
40+
| T (Snoc { counter; prefix; value }) ->
41+
rev (Cons { counter; value; suffix = H suffix }) prefix
42+
| T (Tail _) -> suffix
43+
44+
let[@inline] rev = function
45+
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
46+
rev
47+
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
48+
prefix
49+
50+
(* *)
51+
52+
let rec push backoff t value =
53+
match Loc.fenceless_get t.tail with
54+
| T (Snoc snoc_r) as prefix -> push_with backoff t snoc_r.counter prefix value
55+
| T (Tail tail_r as tail) ->
56+
let move = tail_r.move in
57+
if move != Obj.magic () then begin
58+
let (Snoc move_r) = move in
59+
match Loc.fenceless_get t.head with
60+
| H (Head head_r as head) when head_r.counter < move_r.counter ->
61+
let after = rev move in
62+
if Loc.compare_and_set t.head (H head) (H after) then
63+
tail_r.move <- Obj.magic ()
64+
| _ -> ()
65+
end;
66+
push_with backoff t tail_r.counter (T tail) value
67+
68+
and push_with backoff t counter prefix value =
69+
let after = Snoc { counter = counter + 1; prefix; value } in
70+
if not (Loc.compare_and_set t.tail prefix (T after)) then
71+
push (Backoff.once backoff) t value
72+
73+
let[@inline] push t value = push Backoff.default t value
74+
75+
(* *)
76+
77+
exception Empty
78+
79+
let rec pop backoff t =
80+
match Loc.get t.head with
81+
| H (Cons cons_r) as before ->
82+
let after = cons_r.suffix in
83+
if Loc.compare_and_set t.head before after then cons_r.value
84+
else pop (Backoff.once backoff) t
85+
| H (Head head_r as head) -> begin
86+
match Loc.fenceless_get t.tail with
87+
| T (Snoc snoc_r as move) ->
88+
if head_r.counter = snoc_r.counter then
89+
if Loc.compare_and_set t.tail (T move) snoc_r.prefix then
90+
snoc_r.value
91+
else pop backoff t
92+
else
93+
let tail = Tail { counter = snoc_r.counter; move } in
94+
if
95+
Loc.fenceless_get t.head == H head
96+
&& Loc.compare_and_set t.tail (T move) (T tail)
97+
then pop_moving backoff t head move tail
98+
else pop backoff t
99+
| T (Tail tail_r as tail) ->
100+
let move = tail_r.move in
101+
if move == Obj.magic () then pop_emptyish backoff t head
102+
else pop_moving backoff t head move tail
103+
end
104+
105+
and pop_moving backoff t (Head head_r as head : (_, [< `Head ]) tdt)
106+
(Snoc move_r as move) (Tail tail_r : (_, [< `Tail ]) tdt) =
107+
if head_r.counter < move_r.counter then
108+
match rev move with
109+
| Cons cons_r ->
110+
if Loc.compare_and_set t.head (H head) cons_r.suffix then begin
111+
tail_r.move <- Obj.magic ();
112+
cons_r.value
113+
end
114+
else pop (Backoff.once backoff) t
115+
else pop_emptyish backoff t head
116+
117+
and pop_emptyish backoff t head =
118+
if Loc.get t.head == H head then raise_notrace Empty else pop backoff t
119+
120+
let[@inline] pop_opt t =
121+
match pop Backoff.default t with
122+
| value -> Some value
123+
| exception Empty -> None
124+
125+
let[@inline] pop t = pop Backoff.default t
126+
127+
(* *)
128+
129+
let rec length t =
130+
let head = Loc.get t.head in
131+
let tail = Loc.fenceless_get t.tail in
132+
if head != Loc.get t.head then length t
133+
else
134+
let head_at =
135+
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
136+
in
137+
let tail_at =
138+
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
139+
in
140+
tail_at - head_at + 1

src/kcas_data/two_stack_queue.mli

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
type !'a t
2+
(** *)
3+
4+
val create : unit -> 'a t
5+
(** *)
6+
7+
val push : 'a t -> 'a -> unit
8+
(** *)
9+
10+
exception Empty
11+
(** *)
12+
13+
val pop : 'a t -> 'a
14+
(** *)
15+
16+
val pop_opt : 'a t -> 'a option
17+
(** *)
18+
19+
val length : 'a t -> int
20+
(** *)

test/kcas_data/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
queue_test_stm
2424
stack_test
2525
stack_test_stm
26+
two_stack_queue_test_stm
2627
xt_test)
2728
(libraries
2829
alcotest

0 commit comments

Comments
 (0)