From a69db6eb80c782233508e27e311b865f33209c62 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 31 Oct 2024 09:52:16 -0500 Subject: [PATCH 1/5] Keep passive clocks in a weak queue. --- src/core/clock.ml | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/core/clock.ml b/src/core/clock.ml index 4050dac02f..dd236092a6 100644 --- a/src/core/clock.ml +++ b/src/core/clock.ml @@ -213,14 +213,16 @@ let _sync ?(pending = false) x = let sync c = _sync (Unifier.deref c) let cleanup_source s = try s#force_sleep with _ -> () -let clocks = Queue.create () +let passive_clocks = WeakQueue.create () +let active_clocks = Queue.create () let rec _cleanup ~clock { outputs; passive_sources; active_sources } = Queue.iter outputs cleanup_source; WeakQueue.iter passive_sources cleanup_source; WeakQueue.iter active_sources cleanup_source; Queue.iter clock.sub_clocks stop; - Queue.filter clocks (fun c -> Unifier.deref c == clock) + WeakQueue.filter passive_clocks (fun c -> Unifier.deref c == clock); + Queue.filter active_clocks (fun c -> Unifier.deref c == clock) and stop c = let clock = Unifier.deref c in @@ -259,7 +261,8 @@ let unify = (Queue.push clock'.pending_activations); Queue.flush_iter clock.sub_clocks (Queue.push clock'.sub_clocks); Queue.flush_iter clock.on_error (Queue.push clock'.on_error); - Queue.filter clocks (fun el -> el != c); + Queue.filter active_clocks (fun el -> el != c); + WeakQueue.filter passive_clocks (fun el -> el != c); Unifier.(clock.id <-- clock'.id); Unifier.(c <-- c') in @@ -281,7 +284,7 @@ let unify = let () = Lifecycle.before_core_shutdown ~name:"Clocks stop" (fun () -> Atomic.set global_stop true; - Queue.iter clocks (fun c -> if sync c <> `Passive then stop c)) + Queue.iter active_clocks stop) let _animated_sources { outputs; active_sources } = Queue.elements outputs @ WeakQueue.elements active_sources @@ -421,7 +424,7 @@ and _tick ~clock x = check_stopped (); _after_tick ~clock x; check_stopped (); - Queue.iter clocks start + Queue.iter active_clocks start and _clock_thread ~clock x = let has_sources_to_process () = @@ -527,7 +530,9 @@ let create ?(stack = []) ?on_error ?(id = "generic") ?(sub_ids = []) on_error = on_error_queue; } in - Queue.push clocks c; + (match sync with + | `Passive -> WeakQueue.push passive_clocks c + | _ -> Queue.push active_clocks c); c let time c = @@ -536,7 +541,7 @@ let time c = Time.to_float (_time c) let start_pending () = - let c = Queue.flush_elements clocks in + let c = Queue.flush_elements active_clocks in let c = List.map (fun c -> (c, Unifier.deref c)) c in let c = List.sort_uniq (fun (_, c) (_, c') -> Stdlib.compare c c') c in List.iter @@ -547,7 +552,7 @@ let start_pending () = | `True sync -> _start ~sync clock | `False -> ()) | _ -> ()); - Queue.push clocks c) + Queue.push active_clocks c) c let () = @@ -594,4 +599,4 @@ let create ?stack ?on_error ?id ?sync () = create ?stack ?on_error ?id ?sync () let clocks () = List.sort_uniq (fun c c' -> Stdlib.compare (Unifier.deref c) (Unifier.deref c')) - (Queue.elements clocks) + (Queue.elements active_clocks @ WeakQueue.elements passive_clocks) From ee023411421808cd53803b26525a4a925bf74209 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Fri, 1 Nov 2024 08:57:07 -0500 Subject: [PATCH 2/5] Stop passive clocks after all active clocks have been stopped. --- src/core/clock.ml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/clock.ml b/src/core/clock.ml index dd236092a6..762c125416 100644 --- a/src/core/clock.ml +++ b/src/core/clock.ml @@ -284,7 +284,8 @@ let unify = let () = Lifecycle.before_core_shutdown ~name:"Clocks stop" (fun () -> Atomic.set global_stop true; - Queue.iter active_clocks stop) + Queue.iter active_clocks stop; + WeakQueue.iter passive_clocks stop) let _animated_sources { outputs; active_sources } = Queue.elements outputs @ WeakQueue.elements active_sources From 1cb6ae9fbca2a0b172e3d7ac6edbe9f57c00c58a Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Fri, 1 Nov 2024 08:57:59 -0500 Subject: [PATCH 3/5] Actually.. --- src/core/clock.ml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/core/clock.ml b/src/core/clock.ml index 762c125416..dd236092a6 100644 --- a/src/core/clock.ml +++ b/src/core/clock.ml @@ -284,8 +284,7 @@ let unify = let () = Lifecycle.before_core_shutdown ~name:"Clocks stop" (fun () -> Atomic.set global_stop true; - Queue.iter active_clocks stop; - WeakQueue.iter passive_clocks stop) + Queue.iter active_clocks stop) let _animated_sources { outputs; active_sources } = Queue.elements outputs @ WeakQueue.elements active_sources From b24942ffbcfbbfedf9b2a48ad5dbfc826f1f32ae Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Fri, 1 Nov 2024 10:53:49 -0500 Subject: [PATCH 4/5] Don't flush on stop. --- src/core/tools/producer_consumer.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/tools/producer_consumer.ml b/src/core/tools/producer_consumer.ml index b16769b378..0eb3423410 100644 --- a/src/core/tools/producer_consumer.ml +++ b/src/core/tools/producer_consumer.ml @@ -46,7 +46,7 @@ class consumer ?(always_enabled = false) ~write_frame ~name ~source () = method set_output_enabled v = output_enabled <- v method! reset = () method start = () - method stop = write_frame producer_buffer `Flush + method stop = () method! output = if always_enabled || output_enabled then super#output method private send_frame frame = write_frame producer_buffer (`Frame frame) end From 91f2524a61a34bec025ae95ee494aecda0d52f61 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Fri, 1 Nov 2024 16:57:12 -0500 Subject: [PATCH 5/5] Better. --- src/core/clock.ml | 23 +++++++++-------------- src/core/tools/producer_consumer.ml | 2 +- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/core/clock.ml b/src/core/clock.ml index dd236092a6..1b98d152c2 100644 --- a/src/core/clock.ml +++ b/src/core/clock.ml @@ -213,16 +213,14 @@ let _sync ?(pending = false) x = let sync c = _sync (Unifier.deref c) let cleanup_source s = try s#force_sleep with _ -> () -let passive_clocks = WeakQueue.create () -let active_clocks = Queue.create () +let clocks = Queue.create () let rec _cleanup ~clock { outputs; passive_sources; active_sources } = Queue.iter outputs cleanup_source; WeakQueue.iter passive_sources cleanup_source; WeakQueue.iter active_sources cleanup_source; Queue.iter clock.sub_clocks stop; - WeakQueue.filter passive_clocks (fun c -> Unifier.deref c == clock); - Queue.filter active_clocks (fun c -> Unifier.deref c == clock) + Queue.filter clocks (fun c -> Unifier.deref c == clock) and stop c = let clock = Unifier.deref c in @@ -261,8 +259,7 @@ let unify = (Queue.push clock'.pending_activations); Queue.flush_iter clock.sub_clocks (Queue.push clock'.sub_clocks); Queue.flush_iter clock.on_error (Queue.push clock'.on_error); - Queue.filter active_clocks (fun el -> el != c); - WeakQueue.filter passive_clocks (fun el -> el != c); + Queue.filter clocks (fun el -> el != c); Unifier.(clock.id <-- clock'.id); Unifier.(c <-- c') in @@ -284,7 +281,7 @@ let unify = let () = Lifecycle.before_core_shutdown ~name:"Clocks stop" (fun () -> Atomic.set global_stop true; - Queue.iter active_clocks stop) + Queue.iter clocks (fun c -> if sync c <> `Passive then stop c)) let _animated_sources { outputs; active_sources } = Queue.elements outputs @ WeakQueue.elements active_sources @@ -424,7 +421,7 @@ and _tick ~clock x = check_stopped (); _after_tick ~clock x; check_stopped (); - Queue.iter active_clocks start + Queue.iter clocks start and _clock_thread ~clock x = let has_sources_to_process () = @@ -530,9 +527,7 @@ let create ?(stack = []) ?on_error ?(id = "generic") ?(sub_ids = []) on_error = on_error_queue; } in - (match sync with - | `Passive -> WeakQueue.push passive_clocks c - | _ -> Queue.push active_clocks c); + if sync <> `Passive then Queue.push clocks c; c let time c = @@ -541,7 +536,7 @@ let time c = Time.to_float (_time c) let start_pending () = - let c = Queue.flush_elements active_clocks in + let c = Queue.flush_elements clocks in let c = List.map (fun c -> (c, Unifier.deref c)) c in let c = List.sort_uniq (fun (_, c) (_, c') -> Stdlib.compare c c') c in List.iter @@ -552,7 +547,7 @@ let start_pending () = | `True sync -> _start ~sync clock | `False -> ()) | _ -> ()); - Queue.push active_clocks c) + Queue.push clocks c) c let () = @@ -599,4 +594,4 @@ let create ?stack ?on_error ?id ?sync () = create ?stack ?on_error ?id ?sync () let clocks () = List.sort_uniq (fun c c' -> Stdlib.compare (Unifier.deref c) (Unifier.deref c')) - (Queue.elements active_clocks @ WeakQueue.elements passive_clocks) + (Queue.elements clocks) diff --git a/src/core/tools/producer_consumer.ml b/src/core/tools/producer_consumer.ml index 0eb3423410..b16769b378 100644 --- a/src/core/tools/producer_consumer.ml +++ b/src/core/tools/producer_consumer.ml @@ -46,7 +46,7 @@ class consumer ?(always_enabled = false) ~write_frame ~name ~source () = method set_output_enabled v = output_enabled <- v method! reset = () method start = () - method stop = () + method stop = write_frame producer_buffer `Flush method! output = if always_enabled || output_enabled then super#output method private send_frame frame = write_frame producer_buffer (`Frame frame) end