-
Notifications
You must be signed in to change notification settings - Fork 10
/
mtq.ml
27 lines (16 loc) · 937 Bytes
/
mtq.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
(* ExtThread.locked, duplicated to break internal circular dependency in ExtThread *)
let locked mutex f = Mutex.lock mutex; Std.finally (fun () -> Mutex.unlock mutex) f ()
type 'a t = { mutex : Mutex.t; cond : Condition.t; q : 'a Queue.t; }
let create () = { mutex = Mutex.create (); cond = Condition.create (); q = Queue.create (); }
let put q v = locked q.mutex (fun () -> Queue.push v q.q; Condition.signal q.cond)
let get q = locked q.mutex (fun () ->
while Queue.is_empty q.q do Condition.wait q.cond q.mutex done;
Queue.pop q.q)
let peek q = locked q.mutex (fun () ->
while Queue.is_empty q.q do Condition.wait q.cond q.mutex done;
Queue.peek q.q)
let junk q = locked q.mutex (fun () ->
let _ = Exn.catch Queue.pop q.q in ())
let try_get q = locked q.mutex (fun () -> Exn.catch Queue.pop q.q)
let length q = locked q.mutex (fun () -> Queue.length q.q)
let clear q = locked q.mutex (fun () -> Queue.clear q.q)