Skip to content

Commit

Permalink
Keep passive clocks in a weak queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Oct 31, 2024
1 parent 643ce29 commit a69db6e
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions src/core/clock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,16 @@ let _sync ?(pending = false) x =

let sync c = _sync (Unifier.deref c)
let cleanup_source s = try s#force_sleep with _ -> ()
let clocks = Queue.create ()
let passive_clocks = WeakQueue.create ()
let active_clocks = Queue.create ()

let rec _cleanup ~clock { outputs; passive_sources; active_sources } =
Queue.iter outputs cleanup_source;
WeakQueue.iter passive_sources cleanup_source;
WeakQueue.iter active_sources cleanup_source;
Queue.iter clock.sub_clocks stop;
Queue.filter clocks (fun c -> Unifier.deref c == clock)
WeakQueue.filter passive_clocks (fun c -> Unifier.deref c == clock);
Queue.filter active_clocks (fun c -> Unifier.deref c == clock)

and stop c =
let clock = Unifier.deref c in
Expand Down Expand Up @@ -259,7 +261,8 @@ let unify =
(Queue.push clock'.pending_activations);
Queue.flush_iter clock.sub_clocks (Queue.push clock'.sub_clocks);
Queue.flush_iter clock.on_error (Queue.push clock'.on_error);
Queue.filter clocks (fun el -> el != c);
Queue.filter active_clocks (fun el -> el != c);
WeakQueue.filter passive_clocks (fun el -> el != c);
Unifier.(clock.id <-- clock'.id);
Unifier.(c <-- c')
in
Expand All @@ -281,7 +284,7 @@ let unify =
let () =
Lifecycle.before_core_shutdown ~name:"Clocks stop" (fun () ->
Atomic.set global_stop true;
Queue.iter clocks (fun c -> if sync c <> `Passive then stop c))
Queue.iter active_clocks stop)

let _animated_sources { outputs; active_sources } =
Queue.elements outputs @ WeakQueue.elements active_sources
Expand Down Expand Up @@ -421,7 +424,7 @@ and _tick ~clock x =
check_stopped ();
_after_tick ~clock x;
check_stopped ();
Queue.iter clocks start
Queue.iter active_clocks start

and _clock_thread ~clock x =
let has_sources_to_process () =
Expand Down Expand Up @@ -527,7 +530,9 @@ let create ?(stack = []) ?on_error ?(id = "generic") ?(sub_ids = [])
on_error = on_error_queue;
}
in
Queue.push clocks c;
(match sync with
| `Passive -> WeakQueue.push passive_clocks c
| _ -> Queue.push active_clocks c);
c

let time c =
Expand All @@ -536,7 +541,7 @@ let time c =
Time.to_float (_time c)

let start_pending () =
let c = Queue.flush_elements clocks in
let c = Queue.flush_elements active_clocks in
let c = List.map (fun c -> (c, Unifier.deref c)) c in
let c = List.sort_uniq (fun (_, c) (_, c') -> Stdlib.compare c c') c in
List.iter
Expand All @@ -547,7 +552,7 @@ let start_pending () =
| `True sync -> _start ~sync clock
| `False -> ())
| _ -> ());
Queue.push clocks c)
Queue.push active_clocks c)
c

let () =
Expand Down Expand Up @@ -594,4 +599,4 @@ let create ?stack ?on_error ?id ?sync () = create ?stack ?on_error ?id ?sync ()
let clocks () =
List.sort_uniq
(fun c c' -> Stdlib.compare (Unifier.deref c) (Unifier.deref c'))
(Queue.elements clocks)
(Queue.elements active_clocks @ WeakQueue.elements passive_clocks)

0 comments on commit a69db6e

Please sign in to comment.