Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spmc queue #74

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ocamlformat
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
profile = default
version = 0.23.0
version = 0.25.1
1 change: 1 addition & 0 deletions src/lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ module Treiber_stack = Treiber_stack
module Michael_scott_queue = Michael_scott_queue
module Backoff = Backoff
module Mpmc_relaxed_queue = Mpmc_relaxed_queue
module Spmc_queue = Spmc_queue
1 change: 1 addition & 0 deletions src/lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ module Treiber_stack = Treiber_stack
module Michael_scott_queue = Michael_scott_queue
module Mpmc_relaxed_queue = Mpmc_relaxed_queue
module Backoff = Backoff
module Spmc_queue = Spmc_queue
70 changes: 70 additions & 0 deletions src/spmc_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
type 'a t = {
array : 'a Option.t Array.t;
head : int Atomic.t;
tail : int Atomic.t;
mask : int;
}

let create ~size_exponent () : 'a t =
let max_size = 1 lsl size_exponent in
let array = Array.init max_size (fun _ -> None) in
let mask = max_size - 1 in
let head = Atomic.make 0 in
let tail = Atomic.make 0 in
{ array; head; tail; mask }

let local_push { array; head; tail; mask } item =
let tail_val = Atomic.get tail in
let head_val = Atomic.get head in
let max_size = mask + 1 in

if tail_val - head_val >= max_size then false
else
let index = tail_val land mask in
Array.set array index (Some item);
Atomic.set tail (tail_val + 1);
true

let local_pop { array; head; tail; mask } =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in

let size = tail_val - head_val in
assert (size >= 0);
if size = 0 then None
else
(* Local methods should be as fast as possible to get good
throughput under load. Here, we first increment the head
and then decide what do. This ensures we never have to
retry or race with stealers - we've either reserved an item
or overshot the queue.

This order works because [local_pop] does not have to
linearize with [local_push]. Stealers have to take a more
principled approach. *)
let reserved = Atomic.fetch_and_add head 1 in
assert (reserved <= tail_val);
if reserved = tail_val then (
(* A steal has succeeded in the meantime, and we've overshot
the queue. Fix it and return. *)
Atomic.set head tail_val;
None)
else
let index = reserved land mask in
let value = Array.get array index in
assert (Option.is_some value);
value

let rec steal_one ({ array; head; tail; mask } as t) =
let head_val = Atomic.get head in
let tail_val = Atomic.get tail in
let size = tail_val - head_val in
assert (size >= -1);
if size <= 0 then None
else
let index = head_val land mask in
let value = Array.get array index in
if Atomic.compare_and_set head head_val (head_val + 1) then (
assert (Option.is_some value);
value)
else steal_one t
27 changes: 27 additions & 0 deletions src/spmc_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
(* A lock-free single-producer multi-consumer queue. It has been written
with work-stealing scheduling in mind.

The functions whose names start with local_ cannot be invoked by a domain
different to owner. That's because [local_push], [local_pop] linearize
with [steal_one] but not with each other. This assumption helps improve
performance but when broken the structure will misbehave in unexpected
ways. For a multi-producer multi-consumer FIFO structure see
Michael-Scott Queue.

[local_push] and [local_pop] are wait-free. [steal_one] is lock-free.
*)
type 'a t

(* Create queue of size 2^size_exponent. *)
val create : size_exponent:int -> unit -> 'a t

(* [local_push t v] insert item [v] into the queue. To be called by owner
domain only. *)
val local_push : 'a t -> 'a -> bool

(* [local_pop t] pops an item from the queue. To be called by owner domain
only. *)
val local_pop : 'a t -> 'a option

(* [steal_one t] pops one item from the queue. *)
val steal_one : 'a t -> 'a option
12 changes: 12 additions & 0 deletions test/spmc_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(rule
(copy ../../src/spmc_queue.ml spmc_queue.ml))

(test
(name spmc_queue_dscheck)
(libraries atomic dscheck alcotest)
(modules spmc_queue spmc_queue_dscheck))

(test
(name test_spmc_queue)
(libraries lockfree alcotest)
(modules test_spmc_queue))
89 changes: 89 additions & 0 deletions test/spmc_queue/spmc_queue_dscheck.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
let drain queue =
let remaining = ref 0 in
while Option.is_some (Spmc_queue.local_pop queue) do
remaining := !remaining + 1
done;
!remaining

let prepare_queue () =
let queue = Spmc_queue.create ~size_exponent:4 () in
for _ = 1 to 3 do
assert (Spmc_queue.local_push queue 0);
assert (Option.is_some (Spmc_queue.local_pop queue))
done;
queue

let push_pop_steal () =
Atomic.trace (fun () ->
let queue = prepare_queue () in

let popped = ref 0 in
let stolen = ref 0 in
let items_total = 3 in

(* owner *)
Atomic.spawn (fun () ->
for i = 1 to items_total do
assert (Spmc_queue.local_push queue i)
done;

for _ = 1 to items_total do
match Spmc_queue.local_pop queue with
| None -> ()
| Some _ -> stolen := !stolen + 1
done);

(* stealer *)
Atomic.spawn (fun () ->
for _ = 1 to items_total do
match Spmc_queue.steal_one queue with
| None -> ()
| Some _ -> stolen := !stolen + 1
done);

(* checks*)
Atomic.final (fun () ->
Atomic.check (fun () -> drain queue == 0);
Atomic.check (fun () -> !popped + !stolen = items_total)))

let push_pop_double_steal () =
Atomic.trace (fun () ->
let queue = prepare_queue () in
let items_total = 2 in

let popped = ref 0 in

(* owner *)
Atomic.spawn (fun () ->
for i = 1 to items_total do
assert (Spmc_queue.local_push queue i)
done;

match Spmc_queue.local_pop queue with
| None -> ()
| Some _ -> popped := !popped + 1);

(* stealers *)
for _ = 1 to 2 do
Atomic.spawn (fun () ->
match Spmc_queue.steal_one queue with
| None -> ()
| Some _ -> popped := !popped + 1)
done;

(* checks*)
Atomic.final (fun () ->
Atomic.check (fun () ->
let remaining = drain queue in
remaining + !popped = items_total)))

let () =
let open Alcotest in
run "spmc_queue_dscheck"
[
( "basic",
[
test_case "push-pop-1-stealer" `Slow push_pop_steal;
test_case "push-pop-2-stealers" `Slow push_pop_double_steal;
] );
]
77 changes: 77 additions & 0 deletions test/spmc_queue/test_spmc_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
open Lockfree

let push_steal () =
let queue = Spmc_queue.create ~size_exponent:5 () in
let num_of_elements = 200_000 in
(* start dequeuer *)
let dequeuer =
Domain.spawn (fun () ->
let i = ref 0 in
while !i < num_of_elements do
match Spmc_queue.steal_one queue with
| Some item ->
Alcotest.(check int)
"popped items should follow FIFO order" item !i;
i := !i + 1
| None -> ()
done)
in
(* enqueue *)
let i = ref 0 in
while !i < num_of_elements do
if Spmc_queue.local_push queue !i then i := !i + 1
done;
Domain.join dequeuer |> ignore;
()

let no_item_popped_twice () =
let make_once () =
let flag = Atomic.make true in
fun () -> assert (Atomic.exchange flag false)
in

let queue = Spmc_queue.create ~size_exponent:17 () in
let num_of_elements = 100_000 in
(* start dequeuers *)
let dequers =
let dequer () =
Domain.spawn (fun () ->
let i = ref 0 in
while !i < num_of_elements / 4 do
match Spmc_queue.steal_one queue with
| Some once ->
once ();
i := !i + 1
| None -> ()
done)
in
[ dequer (); dequer () ]
in
(* enqueue *)
let i = ref 0 in
while !i < num_of_elements do
let once = make_once () in
if Spmc_queue.local_push queue once then i := !i + 1
done;

let i = ref 0 in
while !i < num_of_elements / 2 do
match Spmc_queue.local_pop queue with
| Some once ->
once ();
i := !i + 1
| None -> ()
done;
List.iter Domain.join dequers;
()

let () =
let open Alcotest in
run "Spmc_queue"
[
( "multicore",
[
test_case "push, steal" `Quick push_steal;
test_case "no double pop" `Quick no_item_popped_twice;
] );
]