From 844456b6483b0733643c34511214eed8e39076a9 Mon Sep 17 00:00:00 2001 From: Bartosz Modelski Date: Fri, 2 Jun 2023 19:49:33 +0100 Subject: [PATCH] spmc queue --- .ocamlformat | 2 +- src/lockfree.ml | 1 + src/lockfree.mli | 1 + src/spmc_queue.ml | 70 +++++++++++++++++++++ src/spmc_queue.mli | 27 ++++++++ test/spmc_queue/dune | 12 ++++ test/spmc_queue/spmc_queue_dscheck.ml | 89 +++++++++++++++++++++++++++ test/spmc_queue/test_spmc_queue.ml | 77 +++++++++++++++++++++++ 8 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 src/spmc_queue.ml create mode 100644 src/spmc_queue.mli create mode 100644 test/spmc_queue/dune create mode 100644 test/spmc_queue/spmc_queue_dscheck.ml create mode 100644 test/spmc_queue/test_spmc_queue.ml diff --git a/.ocamlformat b/.ocamlformat index daa1fc92..13941e32 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,2 +1,2 @@ profile = default -version = 0.23.0 +version = 0.25.1 diff --git a/src/lockfree.ml b/src/lockfree.ml index 57eb3b9d..5628cbf6 100644 --- a/src/lockfree.ml +++ b/src/lockfree.ml @@ -32,3 +32,4 @@ module Treiber_stack = Treiber_stack module Michael_scott_queue = Michael_scott_queue module Backoff = Backoff module Mpmc_relaxed_queue = Mpmc_relaxed_queue +module Spmc_queue = Spmc_queue \ No newline at end of file diff --git a/src/lockfree.mli b/src/lockfree.mli index 5b50ca60..13eda5e1 100644 --- a/src/lockfree.mli +++ b/src/lockfree.mli @@ -37,3 +37,4 @@ module Treiber_stack = Treiber_stack module Michael_scott_queue = Michael_scott_queue module Mpmc_relaxed_queue = Mpmc_relaxed_queue module Backoff = Backoff +module Spmc_queue = Spmc_queue \ No newline at end of file diff --git a/src/spmc_queue.ml b/src/spmc_queue.ml new file mode 100644 index 00000000..a2aaf603 --- /dev/null +++ b/src/spmc_queue.ml @@ -0,0 +1,70 @@ +type 'a t = { + array : 'a Option.t Array.t; + head : int Atomic.t; + tail : int Atomic.t; + mask : int; +} + +let create ~size_exponent () : 'a t = + let max_size = 1 lsl size_exponent in + let array = Array.init max_size (fun _ -> None) in + let mask = max_size - 1 in + let head = Atomic.make 0 in + let tail = Atomic.make 0 in + { array; head; tail; mask } + +let local_push { array; head; tail; mask } item = + let tail_val = Atomic.get tail in + let head_val = Atomic.get head in + let max_size = mask + 1 in + + if tail_val - head_val >= max_size then false + else + let index = tail_val land mask in + Array.set array index (Some item); + Atomic.set tail (tail_val + 1); + true + +let local_pop { array; head; tail; mask } = + let head_val = Atomic.get head in + let tail_val = Atomic.get tail in + + let size = tail_val - head_val in + assert (size >= 0); + if size = 0 then None + else + (* Local methods should be as fast as possible to get good + throughput under load. Here, we first increment the head + and then decide what do. This ensures we never have to + retry or race with stealers - we've either reserved an item + or overshot the queue. + + This order works because [local_pop] does not have to + linearize with [local_push]. Stealers have to take a more + principled approach. *) + let reserved = Atomic.fetch_and_add head 1 in + assert (reserved <= tail_val); + if reserved = tail_val then ( + (* A steal has succeeded in the meantime, and we've overshot + the queue. Fix it and return. *) + Atomic.set head tail_val; + None) + else + let index = reserved land mask in + let value = Array.get array index in + assert (Option.is_some value); + value + +let rec steal_one ({ array; head; tail; mask } as t) = + let head_val = Atomic.get head in + let tail_val = Atomic.get tail in + let size = tail_val - head_val in + assert (size >= -1); + if size <= 0 then None + else + let index = head_val land mask in + let value = Array.get array index in + if Atomic.compare_and_set head head_val (head_val + 1) then ( + assert (Option.is_some value); + value) + else steal_one t diff --git a/src/spmc_queue.mli b/src/spmc_queue.mli new file mode 100644 index 00000000..7155cfd0 --- /dev/null +++ b/src/spmc_queue.mli @@ -0,0 +1,27 @@ +(* A lock-free single-producer multi-consumer queue. It has been written + with work-stealing scheduling in mind. + + The functions whose names start with local_ cannot be invoked by a domain + different to owner. That's because [local_push], [local_pop] linearize + with [steal_one] but not with each other. This assumption helps improve + performance but when broken the structure will misbehave in unexpected + ways. For a multi-producer multi-consumer FIFO structure see + Michael-Scott Queue. + + [local_push] and [local_pop] are wait-free. [steal_one] is lock-free. +*) +type 'a t + +(* Create queue of size 2^size_exponent. *) +val create : size_exponent:int -> unit -> 'a t + +(* [local_push t v] insert item [v] into the queue. To be called by owner + domain only. *) +val local_push : 'a t -> 'a -> bool + +(* [local_pop t] pops an item from the queue. To be called by owner domain + only. *) +val local_pop : 'a t -> 'a option + +(* [steal_one t] pops one item from the queue. *) +val steal_one : 'a t -> 'a option diff --git a/test/spmc_queue/dune b/test/spmc_queue/dune new file mode 100644 index 00000000..f9ff29e1 --- /dev/null +++ b/test/spmc_queue/dune @@ -0,0 +1,12 @@ +(rule + (copy ../../src/spmc_queue.ml spmc_queue.ml)) + +(test + (name spmc_queue_dscheck) + (libraries atomic dscheck alcotest) + (modules spmc_queue spmc_queue_dscheck)) + +(test + (name test_spmc_queue) + (libraries lockfree alcotest) + (modules test_spmc_queue)) \ No newline at end of file diff --git a/test/spmc_queue/spmc_queue_dscheck.ml b/test/spmc_queue/spmc_queue_dscheck.ml new file mode 100644 index 00000000..4bab6a17 --- /dev/null +++ b/test/spmc_queue/spmc_queue_dscheck.ml @@ -0,0 +1,89 @@ +let drain queue = + let remaining = ref 0 in + while Option.is_some (Spmc_queue.local_pop queue) do + remaining := !remaining + 1 + done; + !remaining + +let prepare_queue () = + let queue = Spmc_queue.create ~size_exponent:4 () in + for _ = 1 to 3 do + assert (Spmc_queue.local_push queue 0); + assert (Option.is_some (Spmc_queue.local_pop queue)) + done; + queue + +let push_pop_steal () = + Atomic.trace (fun () -> + let queue = prepare_queue () in + + let popped = ref 0 in + let stolen = ref 0 in + let items_total = 3 in + + (* owner *) + Atomic.spawn (fun () -> + for i = 1 to items_total do + assert (Spmc_queue.local_push queue i) + done; + + for _ = 1 to items_total do + match Spmc_queue.local_pop queue with + | None -> () + | Some _ -> stolen := !stolen + 1 + done); + + (* stealer *) + Atomic.spawn (fun () -> + for _ = 1 to items_total do + match Spmc_queue.steal_one queue with + | None -> () + | Some _ -> stolen := !stolen + 1 + done); + + (* checks*) + Atomic.final (fun () -> + Atomic.check (fun () -> drain queue == 0); + Atomic.check (fun () -> !popped + !stolen = items_total))) + +let push_pop_double_steal () = + Atomic.trace (fun () -> + let queue = prepare_queue () in + let items_total = 2 in + + let popped = ref 0 in + + (* owner *) + Atomic.spawn (fun () -> + for i = 1 to items_total do + assert (Spmc_queue.local_push queue i) + done; + + match Spmc_queue.local_pop queue with + | None -> () + | Some _ -> popped := !popped + 1); + + (* stealers *) + for _ = 1 to 2 do + Atomic.spawn (fun () -> + match Spmc_queue.steal_one queue with + | None -> () + | Some _ -> popped := !popped + 1) + done; + + (* checks*) + Atomic.final (fun () -> + Atomic.check (fun () -> + let remaining = drain queue in + remaining + !popped = items_total))) + +let () = + let open Alcotest in + run "spmc_queue_dscheck" + [ + ( "basic", + [ + test_case "push-pop-1-stealer" `Slow push_pop_steal; + test_case "push-pop-2-stealers" `Slow push_pop_double_steal; + ] ); + ] diff --git a/test/spmc_queue/test_spmc_queue.ml b/test/spmc_queue/test_spmc_queue.ml new file mode 100644 index 00000000..eaa23230 --- /dev/null +++ b/test/spmc_queue/test_spmc_queue.ml @@ -0,0 +1,77 @@ +open Lockfree + +let push_steal () = + let queue = Spmc_queue.create ~size_exponent:5 () in + let num_of_elements = 200_000 in + (* start dequeuer *) + let dequeuer = + Domain.spawn (fun () -> + let i = ref 0 in + while !i < num_of_elements do + match Spmc_queue.steal_one queue with + | Some item -> + Alcotest.(check int) + "popped items should follow FIFO order" item !i; + i := !i + 1 + | None -> () + done) + in + (* enqueue *) + let i = ref 0 in + while !i < num_of_elements do + if Spmc_queue.local_push queue !i then i := !i + 1 + done; + Domain.join dequeuer |> ignore; + () + +let no_item_popped_twice () = + let make_once () = + let flag = Atomic.make true in + fun () -> assert (Atomic.exchange flag false) + in + + let queue = Spmc_queue.create ~size_exponent:17 () in + let num_of_elements = 100_000 in + (* start dequeuers *) + let dequers = + let dequer () = + Domain.spawn (fun () -> + let i = ref 0 in + while !i < num_of_elements / 4 do + match Spmc_queue.steal_one queue with + | Some once -> + once (); + i := !i + 1 + | None -> () + done) + in + [ dequer (); dequer () ] + in + (* enqueue *) + let i = ref 0 in + while !i < num_of_elements do + let once = make_once () in + if Spmc_queue.local_push queue once then i := !i + 1 + done; + + let i = ref 0 in + while !i < num_of_elements / 2 do + match Spmc_queue.local_pop queue with + | Some once -> + once (); + i := !i + 1 + | None -> () + done; + List.iter Domain.join dequers; + () + +let () = + let open Alcotest in + run "Spmc_queue" + [ + ( "multicore", + [ + test_case "push, steal" `Quick push_steal; + test_case "no double pop" `Quick no_item_popped_twice; + ] ); + ]