Skip to content

Commit

Permalink
Add multicore example.
Browse files Browse the repository at this point in the history
  • Loading branch information
lyrm committed Nov 23, 2024
1 parent ea7bf52 commit 43687e1
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ let () =
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files treiber_stack.mli bounded_stack.mli))
(files treiber_stack.mli bounded_stack.mli mpsc_queue.mli))
|}
147 changes: 102 additions & 45 deletions src/mpsc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,29 @@
(** {1 API} *)

type 'a t
(** Represents a single consumer queue of items of type ['a]. *)
(** Represents a single-consumer queue of items of type ['a]. *)

exception Closed

val create : unit -> 'a t
(** [create ()] returns a new empty queue. *)
(** [create ()] returns a new empty single-consumer queue. *)

val of_list : 'a list -> 'a t
(** [of_list l] returns a new empty queue.
{[
# open Saturn.Single_consumer_queue
# let t : int t = of_list [1; 2; 3]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 2
# pop_opt t
- : int option = Some 3
# pop_opt t
- : int option = None
]}
(** [of_list l] creates a new single-consumer queue from list [l].
{[
# open Saturn.Single_consumer_queue
# let t : int t = of_list [1; 2; 3]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 2
# pop_opt t
- : int option = Some 2
# pop_opt t
- : int option = Some 3
]}
*)

(** {2 Producer-only functions} *)
Expand All @@ -54,19 +54,21 @@ val push_all : 'a t -> 'a list -> unit
@raise Closed if [q] is closed.
{[
# open Saturn.Single_consumer_queue
# let t : int t = create ()
val t : int t = <abstr>
# push_all t [1; 2; 3]
-: unit = ()
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 2
# pop_opt t
- : int option = Some 3
# pop_opt t
- : int option = None
# open Saturn.Single_consumer_queue
# let t : int t = create ()
val t : int t = <abstr>
# push_all t [1; 2; 3]
- : unit = ()
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 2
# pop_opt t
- : int option = Some 2
# pop_opt t
- : int option = Some 3
# pop_exn t
Exception: Saturn__Mpsc_queue.Empty.
]}
*)

Expand Down Expand Up @@ -135,21 +137,76 @@ val push_head : 'a t -> 'a -> unit
(** {1 Examples}
An example top-level session:
{[
# open Saturn.Single_consumer_queue
# let t : int t = create ()
val t : int t = <abstr>
# push t 1
- : unit = ()
# push t 42
- : unit = ()
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 42
# pop_opt t
- : int option = None
# pop_exn t
Exception: Saturn__Single_consumer_queue.Empty.]}
# open Saturn.Single_consumer_queue
# let t : int t = create ()
val t : int t = <abstr>
# push t 1
- : unit = ()
# push t 42
- : unit = ()
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 42
# drop_exn t
- : unit = ()
# pop_exn t
Exception: Saturn__Mpsc_queue.Empty.]}
A multicore example:
{@ocaml non-deterministic[
# open Saturn.Single_consumer_queue
# let t : (string * int) t = create ()
val t : (string * int) t = <abstr>
# let barrier = Atomic.make 3
val barrier : int Atomic.t = <abstr>
# let n = 10
val n : int = 10
# let work_consumer () =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done;
for i = 1 to n do
begin
match pop_opt t with
| None -> Printf.printf "Empty.\n%!"
| Some (s, n) ->
Printf.printf "Consumed ressource #%d from %s.\n%!" n s
end;
Domain.cpu_relax ()
done;
val work_consumer : unit -> unit = <fun>
# let work_producer id () =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done;
List.init n Fun.id
|> List.iter (fun i -> push t (id , i);
Domain.cpu_relax ())
val work_producer : string -> unit -> unit = <fun>
# let consumer = Domain.spawn work_consumer
val consumer : unit Domain.t = <abstr>
# let producerA = Domain.spawn (work_producer "A")
val producerA : unit Domain.t = <abstr>
# let producerB = Domain.spawn (work_producer "B")
Empty.
Consumed ressource #0 from A.
Consumed ressource #0 from B.
Consumed ressource #1 from B.
Consumed ressource #2 from B.
Consumed ressource #3 from B.
Consumed ressource #4 from B.
Consumed ressource #5 from B.
Consumed ressource #6 from B.
Consumed ressource #7 from B.
val producerB : unit Domain.t = <abstr>
# Domain.join consumer
- : unit = ()
# Domain.join producerA
- : unit = ()
# Domain.join producerB
- : unit = ()
]}
*)

0 comments on commit 43687e1

Please sign in to comment.