Skip to content

Commit

Permalink
Better.
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Nov 1, 2024
1 parent b24942f commit 91f2524
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
23 changes: 9 additions & 14 deletions src/core/clock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,14 @@ let _sync ?(pending = false) x =

let sync c = _sync (Unifier.deref c)
let cleanup_source s = try s#force_sleep with _ -> ()
let passive_clocks = WeakQueue.create ()
let active_clocks = Queue.create ()
let clocks = Queue.create ()

let rec _cleanup ~clock { outputs; passive_sources; active_sources } =
Queue.iter outputs cleanup_source;
WeakQueue.iter passive_sources cleanup_source;
WeakQueue.iter active_sources cleanup_source;
Queue.iter clock.sub_clocks stop;
WeakQueue.filter passive_clocks (fun c -> Unifier.deref c == clock);
Queue.filter active_clocks (fun c -> Unifier.deref c == clock)
Queue.filter clocks (fun c -> Unifier.deref c == clock)

and stop c =
let clock = Unifier.deref c in
Expand Down Expand Up @@ -261,8 +259,7 @@ let unify =
(Queue.push clock'.pending_activations);
Queue.flush_iter clock.sub_clocks (Queue.push clock'.sub_clocks);
Queue.flush_iter clock.on_error (Queue.push clock'.on_error);
Queue.filter active_clocks (fun el -> el != c);
WeakQueue.filter passive_clocks (fun el -> el != c);
Queue.filter clocks (fun el -> el != c);
Unifier.(clock.id <-- clock'.id);
Unifier.(c <-- c')
in
Expand All @@ -284,7 +281,7 @@ let unify =
let () =
Lifecycle.before_core_shutdown ~name:"Clocks stop" (fun () ->
Atomic.set global_stop true;
Queue.iter active_clocks stop)
Queue.iter clocks (fun c -> if sync c <> `Passive then stop c))

let _animated_sources { outputs; active_sources } =
Queue.elements outputs @ WeakQueue.elements active_sources
Expand Down Expand Up @@ -424,7 +421,7 @@ and _tick ~clock x =
check_stopped ();
_after_tick ~clock x;
check_stopped ();
Queue.iter active_clocks start
Queue.iter clocks start

and _clock_thread ~clock x =
let has_sources_to_process () =
Expand Down Expand Up @@ -530,9 +527,7 @@ let create ?(stack = []) ?on_error ?(id = "generic") ?(sub_ids = [])
on_error = on_error_queue;
}
in
(match sync with
| `Passive -> WeakQueue.push passive_clocks c
| _ -> Queue.push active_clocks c);
if sync <> `Passive then Queue.push clocks c;
c

let time c =
Expand All @@ -541,7 +536,7 @@ let time c =
Time.to_float (_time c)

let start_pending () =
let c = Queue.flush_elements active_clocks in
let c = Queue.flush_elements clocks in
let c = List.map (fun c -> (c, Unifier.deref c)) c in
let c = List.sort_uniq (fun (_, c) (_, c') -> Stdlib.compare c c') c in
List.iter
Expand All @@ -552,7 +547,7 @@ let start_pending () =
| `True sync -> _start ~sync clock
| `False -> ())
| _ -> ());
Queue.push active_clocks c)
Queue.push clocks c)
c

let () =
Expand Down Expand Up @@ -599,4 +594,4 @@ let create ?stack ?on_error ?id ?sync () = create ?stack ?on_error ?id ?sync ()
let clocks () =
List.sort_uniq
(fun c c' -> Stdlib.compare (Unifier.deref c) (Unifier.deref c'))
(Queue.elements active_clocks @ WeakQueue.elements passive_clocks)
(Queue.elements clocks)
2 changes: 1 addition & 1 deletion src/core/tools/producer_consumer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class consumer ?(always_enabled = false) ~write_frame ~name ~source () =
method set_output_enabled v = output_enabled <- v
method! reset = ()
method start = ()
method stop = ()
method stop = write_frame producer_buffer `Flush
method! output = if always_enabled || output_enabled then super#output
method private send_frame frame = write_frame producer_buffer (`Frame frame)
end
Expand Down

0 comments on commit 91f2524

Please sign in to comment.