From 7f0e7b90315c371248b1d24a30219f1a1acb4726 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Mon, 18 Nov 2024 23:43:46 -0600 Subject: [PATCH] Remove `source.{dump, drop}`, add `request.{dump, process}`. --- CHANGES.md | 3 + src/core/builtins/builtins_request.ml | 185 ++++++++++++++++++++++++++ src/core/builtins/builtins_source.ml | 141 -------------------- src/core/lang_source.ml | 7 + src/core/sources/request_dynamic.ml | 6 +- src/libs/autocue.liq | 55 +++----- src/libs/protocols.liq | 15 ++- src/libs/replaygain.liq | 14 +- src/libs/tracks.liq | 2 + 9 files changed, 240 insertions(+), 188 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 633777dd1e..f87a953d7f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,9 @@ Changed: now returns `max_int` and `int(-infinity)` returns `min_int`. (#3407) - Made default font a setting (#3507) - Changed internal metadata format to be immutable (#3297). +- Removed `source.dump` and `source.drop` in favor safer `request.dump` and `request.drop`. + `source.{dump, drop}` can still be implemented manually when needed and with the proper + knowledge of what's going on. - Allow a getter for the offset of `on_offset` and dropped the metadata mechanism for updating it (#3355). - `string.length` and `string.sub` now default to `utf8` encoding (#4109) diff --git a/src/core/builtins/builtins_request.ml b/src/core/builtins/builtins_request.ml index 5ac113a6a4..1b1d6e87c0 100644 --- a/src/core/builtins/builtins_request.ml +++ b/src/core/builtins/builtins_request.ml @@ -21,6 +21,11 @@ *****************************************************************************) let request = Modules.request +let should_stop = Atomic.make false + +let () = + Lifecycle.before_core_shutdown ~name:"builtin source shutdown" (fun () -> + Atomic.set should_stop true) let _ = Lang.add_builtin ~base:request "all" ~category:`Liquidsoap @@ -295,3 +300,183 @@ let _ = | `Failed -> "failed" in Lang.string s) + +exception Invalid + +class flush ~name r = + object (self) + inherit + Request_dynamic.dynamic + ~name ~priority:`Non_blocking + ~retry_delay:(fun _ -> 0.1) + ~available:(fun _ -> true) + (Lang.val_fun [] (fun _ -> Lang.null)) + 1 None + + initializer + self#on_wake_up (fun () -> + match Request.get_decoder ~ctype:self#content_type r with + | Some _ -> self#set_queue [r] + | None | (exception _) -> raise Invalid) + end + +let flush_request ~log ~name ~ratio ~timeout ~sleep_latency ~process r = + let module Time = (val Clock.time_implementation () : Liq_time.T) in + let open Time in + let start_time = Time.time () in + match Request.resolve ~timeout r with + | `Failed | `Timeout -> () + | `Resolved -> + let timeout = Time.of_float timeout in + let timeout_time = Time.(start_time |+| timeout) in + let started = ref false in + let stopped = ref false in + let s = new flush ~name r in + let s = (process (s :> Source.source) :> Source.source) in + let clock = + Clock.create ~id:name ~sync:`Passive + ~on_error:(fun exn bt -> + stopped := true; + Utils.log_exception ~log + ~bt:(Printexc.raw_backtrace_to_string bt) + (Printf.sprintf "Error while processing source: %s" + (Printexc.to_string exn))) + () + in + let _ = + new Output.dummy + ~clock ~infallible:false + ~on_start:(fun () -> started := true) + ~on_stop:(fun () -> stopped := true) + ~register_telnet:false ~autostart:true (Lang.source s) + in + Clock.start ~force:true clock; + log#info "Start streaming loop (ratio: %.02fx)" ratio; + let sleep_latency = Time.of_float sleep_latency in + let target_time () = + Time.( + start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio)) + in + (try + while (not (Atomic.get should_stop)) && not !stopped do + if (not !started) && Time.(timeout_time |<=| start_time) then ( + log#important "Timeout while waiting for the source to start!"; + stopped := true) + else ( + Clock.tick clock; + let target_time = target_time () in + if Time.(time () |<| (target_time |+| sleep_latency)) then + sleep_until target_time) + done + with Invalid | Clock.Has_stopped -> ()); + let processing_time = Time.(to_float (time () |-| start_time)) in + let effective_ratio = Clock.time clock /. processing_time in + log#info + "Request processed. Total processing time: %.02fs, effective ratio: \ + %.02fx" + processing_time effective_ratio; + Clock.stop clock + +let _ = + let log = Log.make ["request"; "dump"] in + let kind = Lang.univ_t () in + Lang.add_builtin ~base:request "dump" ~category:(`Source `Liquidsoap) + ~descr:"Immediately encode the whole contents of a request into a file." + ~flags:[`Experimental] + [ + ("", Lang.format_t kind, None, Some "Encoding format."); + ("", Lang.string_t, None, Some "Name of the file."); + ("", Request.Value.t, None, Some "Request to encode."); + ( "ratio", + Lang.float_t, + Some (Lang.float 50.), + Some + "Time ratio. A value of `50` means process data at `50x` real rate, \ + when possible." ); + ( "timeout", + Lang.float_t, + Some (Lang.float 1.), + Some + "Stop processing the source if it has not started after the given \ + timeout." ); + ( "sleep_latency", + Lang.float_t, + Some (Lang.float 0.1), + Some + "How much time ahead, in seconds, should we should be before pausing \ + the processing." ); + ] + Lang.unit_t + (fun p -> + let proto = + let p = Pipe_output.file_proto (Lang.univ_t ()) in + List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p + in + let proto = ("fallible", Lang.bool true) :: proto in + let format = Lang.assoc "" 1 p in + let file = Lang.assoc "" 2 p in + let r = Request.Value.of_value (Lang.assoc "" 3 p) in + let process s = + let p = + ("id", Lang.string "request.drop") + :: ("", format) :: ("", file) + :: ("", Lang.source s) + :: (p @ proto) + in + Pipe_output.new_file_output p + in + let ratio = Lang.to_float (List.assoc "ratio" p) in + let timeout = Lang.to_float (List.assoc "timeout" p) in + let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in + flush_request ~log ~name:"request.dump" ~ratio ~timeout ~sleep_latency + ~process r; + log#info "Request dumped."; + Lang.unit) + +let _ = + let log = Log.make ["request"; "process"] in + Lang.add_builtin ~base:request "process" ~category:(`Source `Liquidsoap) + ~descr: + "Given a request and an optional function to process this request, \ + animate the source as fast as possible until the request is fully \ + processed." + [ + ("", Request.Value.t, None, Some "Request to process"); + ( "process", + Lang.fun_t + [(false, "", Lang.source_t (Lang.univ_t ()))] + (Lang.source_t (Lang.univ_t ())), + Some (Lang.val_fun [("", "", None)] (fun p -> List.assoc "" p)), + Some "Callback to create the source to animate." ); + ( "ratio", + Lang.float_t, + Some (Lang.float 50.), + Some + "Time ratio. A value of `50` means process data at `50x` real rate, \ + when possible." ); + ( "timeout", + Lang.float_t, + Some (Lang.float 1.), + Some + "Stop processing the source if it has not started after the given \ + timeout." ); + ( "sleep_latency", + Lang.float_t, + Some (Lang.float 0.1), + Some + "How much time ahead, in seconds, should we should be before pausing \ + the processing." ); + ] + Lang.unit_t + (fun p -> + let r = Request.Value.of_value (List.assoc "" p) in + let process = List.assoc "process" p in + let process s = + Lang.to_source (Lang.apply process [("", Lang.source s)]) + in + let ratio = Lang.to_float (List.assoc "ratio" p) in + let timeout = Lang.to_float (List.assoc "timeout" p) in + let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in + flush_request ~log ~name:"request.process" ~ratio ~timeout ~sleep_latency + ~process r; + Lang.unit) diff --git a/src/core/builtins/builtins_source.ml b/src/core/builtins/builtins_source.ml index b1f7db39c0..db5614a960 100644 --- a/src/core/builtins/builtins_source.ml +++ b/src/core/builtins/builtins_source.ml @@ -21,11 +21,6 @@ *****************************************************************************) let source = Muxer.source -let should_stop = Atomic.make false - -let () = - Lifecycle.before_core_shutdown ~name:"builtin source shutdown" (fun () -> - Atomic.set should_stop true) let _ = Lang.add_builtin ~base:source "set_name" ~category:(`Source `Liquidsoap) @@ -161,139 +156,3 @@ let _ = let wrap_f () = ignore (Lang.apply f []) in s#on_sleep wrap_f; Lang.unit) - -let flush_source ~log ~name ~ratio ~timeout ~sleep_latency s = - let module Time = (val Clock.time_implementation () : Liq_time.T) in - let open Time in - let started = ref false in - let stopped = ref false in - let clock = - Clock.create ~id:name ~sync:`Passive - ~on_error:(fun exn bt -> - stopped := true; - Utils.log_exception ~log - ~bt:(Printexc.raw_backtrace_to_string bt) - (Printf.sprintf "Error while processing source: %s" - (Printexc.to_string exn))) - () - in - let _ = - new Output.dummy - ~clock ~infallible:false - ~on_start:(fun () -> started := true) - ~on_stop:(fun () -> stopped := true) - ~register_telnet:false ~autostart:true (Lang.source s) - in - Clock.start ~force:true clock; - log#info "Start source streaming loop (ratio: %.02fx)" ratio; - let start_time = Time.time () in - let timeout = Time.of_float timeout in - let timeout_time = Time.(start_time |+| timeout) in - let sleep_latency = Time.of_float sleep_latency in - let target_time () = - Time.(start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio)) - in - (try - while (not (Atomic.get should_stop)) && not !stopped do - if (not !started) && Time.(timeout_time |<=| start_time) then ( - log#important "Timeout while waiting for the source to start!"; - stopped := true) - else ( - Clock.tick clock; - let target_time = target_time () in - if Time.(time () |<| (target_time |+| sleep_latency)) then - sleep_until target_time) - done - with Clock.Has_stopped -> ()); - let processing_time = Time.(to_float (time () |-| start_time)) in - let effective_ratio = Clock.time clock /. processing_time in - log#info - "Source processed. Total processing time: %.02fs, effective ratio: %.02fx" - processing_time effective_ratio; - Clock.stop clock - -let flush_source ~log ~name ~ratio ~timeout ~sleep_latency s = - if Tutils.running () then - flush_source ~log ~name ~ratio ~timeout ~sleep_latency s - else log#important "Cannot run %s: scheduler not started!" name - -let _ = - let log = Log.make ["source"; "dump"] in - let kind = Lang.univ_t () in - Lang.add_builtin ~base:source "dump" ~category:(`Source `Liquidsoap) - ~descr:"Immediately encode the whole contents of a source into a file." - ~flags:[`Experimental] - [ - ("", Lang.format_t kind, None, Some "Encoding format."); - ("", Lang.string_t, None, Some "Name of the file."); - ("", Lang.source_t kind, None, Some "Source to encode."); - ( "ratio", - Lang.float_t, - Some (Lang.float 50.), - Some - "Time ratio. A value of `50` means process data at `50x` real rate, \ - when possible." ); - ( "timeout", - Lang.float_t, - Some (Lang.float 1.), - Some - "Stop processing the source if it has not started after the given \ - timeout." ); - ( "sleep_latency", - Lang.float_t, - Some (Lang.float 0.1), - Some - "How much time ahead, in seconds, should we should be before pausing \ - the processing." ); - ] - Lang.unit_t - (fun p -> - let proto = - let p = Pipe_output.file_proto (Lang.univ_t ()) in - List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p - in - let proto = ("fallible", Lang.bool true) :: proto in - let p = (("id", Lang.string "source.drop") :: p) @ proto in - let s = Pipe_output.new_file_output p in - let ratio = Lang.to_float (List.assoc "ratio" p) in - let timeout = Lang.to_float (List.assoc "timeout" p) in - let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in - flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency - (s :> Source.source); - log#info "Source dumped."; - Lang.unit) - -let _ = - let log = Log.make ["source"; "drop"] in - Lang.add_builtin ~base:source "drop" ~category:(`Source `Liquidsoap) - ~descr:"Animate the source as fast as possible, dropping its output." - ~flags:[`Experimental] - [ - ("", Lang.source_t (Lang.univ_t ()), None, Some "Source to animate."); - ( "ratio", - Lang.float_t, - Some (Lang.float 50.), - Some - "Time ratio. A value of `50` means process data at `50x` real rate, \ - when possible." ); - ( "timeout", - Lang.float_t, - Some (Lang.float 1.), - Some - "Stop processing the source if it has not started after the given \ - timeout." ); - ( "sleep_latency", - Lang.float_t, - Some (Lang.float 0.1), - Some - "How much time ahead, in seconds, should we should be before pausing \ - the processing." ); - ] - Lang.unit_t - (fun p -> - let s = List.assoc "" p |> Lang.to_source in - let ratio = Lang.to_float (List.assoc "ratio" p) in - let timeout = Lang.to_float (List.assoc "timeout" p) in - let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in - flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency s; - Lang.unit) diff --git a/src/core/lang_source.ml b/src/core/lang_source.ml index b34070636d..cbea5ebe1f 100644 --- a/src/core/lang_source.ml +++ b/src/core/lang_source.ml @@ -94,6 +94,13 @@ module ClockValue = struct let c' = of_value (List.assoc "" p) in Clock.unify ~pos c c'; Lang.unit) ); + ( "tick", + Lang.fun_t [] Lang.unit_t, + "Animate the clock and run one tick", + fun c -> + Lang.val_fun [] (fun _ -> + Clock.tick c; + Lang.unit) ); ( "ticks", Lang.fun_t [] Lang.int_t, "The total number of times the clock has ticked.", diff --git a/src/core/sources/request_dynamic.ml b/src/core/sources/request_dynamic.ml index 8995487d85..b3aa692b2c 100644 --- a/src/core/sources/request_dynamic.ml +++ b/src/core/sources/request_dynamic.ml @@ -59,11 +59,11 @@ let () = Lifecycle.before_core_shutdown ~name:"request.dynamic shutdown" (fun () -> Atomic.set should_fail true) -class dynamic ~priority ~retry_delay ~available (f : Lang.value) prefetch - timeout = +class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available + (f : Lang.value) prefetch timeout = let available () = (not (Atomic.get should_fail)) && available () in object (self) - inherit source ~name:"request.dynamic" () + inherit source ~name () method fallible = true val mutable remaining = 0 method remaining = remaining diff --git a/src/libs/autocue.liq b/src/libs/autocue.liq index ca98462130..b262242965 100644 --- a/src/libs/autocue.liq +++ b/src/libs/autocue.liq @@ -59,25 +59,6 @@ let settings.autocue.internal.metadata_override = ] ) -let settings.autocue.internal.queues_count = - settings.make( - description= - "Number of dedicated queues for resolving autocue data using the internal \ - implementation", - 1 - ) - -def settings.autocue.internal.queues_count.set(c) = - settings.scheduler.queues := - [ - ...list.assoc.remove("autocue", settings.scheduler.queues()), - ("autocue", c) - ] - settings.autocue.internal.queues_count.set(c) -end - -settings.autocue.internal.queues_count.set(1) - let settings.autocue.internal.lufs_target = settings.make( description= @@ -206,30 +187,30 @@ def autocue.internal.ebur128(~duration, ~ratio=50., ~timeout=10., filename) = ) [] else - s = - request.once( - thread_queue="autocue", request.create(resolve_metadata=false, filename) - ) + r = request.create(resolve_metadata=false, filename) frames = ref([]) - def ebur128(s) = - def mk_filter(graph) = - let {audio = a} = source.tracks(s) - a = ffmpeg.filter.audio.input(graph, a) - let ([a], _) = ffmpeg.filter.ebur128(metadata=true, graph, a) + def process(s) = + def ebur128(s) = + def mk_filter(graph) = + let {audio = a} = source.tracks(s) + a = ffmpeg.filter.audio.input(graph, a) + let ([a], _) = ffmpeg.filter.ebur128(metadata=true, graph, a) + + # ebur filter seems to generate invalid PTS. + a = ffmpeg.filter.asetpts(expr="N/SR/TB", graph, a) + a = ffmpeg.filter.audio.output(id="filter_output", graph, a) + source({audio=a, metadata=track.metadata(a)}) + end - # ebur filter seems to generate invalid PTS. - a = ffmpeg.filter.asetpts(expr="N/SR/TB", graph, a) - a = ffmpeg.filter.audio.output(id="filter_output", graph, a) - source({audio=a, metadata=track.metadata(a)}) + ffmpeg.filter.create(mk_filter) end - ffmpeg.filter.create(mk_filter) + s = ebur128(s) + source.on_metadata(s, fun (m) -> frames := [...frames(), m]) end - s = ebur128(s) - s = source.on_metadata(s, fun (m) -> frames := [...frames(), m]) - source.drop(ratio=ratio, s) + request.process(ratio=ratio, process=process, r) frames() end @@ -794,7 +775,7 @@ def autocue.internal.implementation( fade_out_type?: string, fade_out_curve?: float, start_next?: float, - extra_metadata?: [(string*string)] + extra_metadata?: [(string * string)] } ) end diff --git a/src/libs/protocols.liq b/src/libs/protocols.liq index afa8d7b1c4..eef02bade5 100644 --- a/src/libs/protocols.liq +++ b/src/libs/protocols.liq @@ -660,9 +660,7 @@ def protocol.stereo(~rlog=_, ~maxtime=_, arg) = ) null() else - # TODO: the following sometimes hangs, so we resolve twice... - # source.dump(%wav, file, source.stereo(once(request.queue(queue=[r])))) - source.dump(%wav, file, stereo(once(single(arg)))) + request.dump(%wav, file, request.create(arg)) file end end @@ -1066,7 +1064,16 @@ def synth_protocol(~rlog=_, ~maxtime=_, text) = label="synth", "Synthesizing #{shape()} in #{file}." ) - source.dump(%wav, file, once(s)) + + clock.assign_new(sync="passive", [s]) + + output.file(%wav, file, once(s)) + + c = clock(s.clock) + c.start() + while source.is_ready(s) do c.tick() end + c.stop() + file end diff --git a/src/libs/replaygain.liq b/src/libs/replaygain.liq index 14d28a9f89..65da5042f7 100644 --- a/src/libs/replaygain.liq +++ b/src/libs/replaygain.liq @@ -22,9 +22,17 @@ def file.replaygain.compute(~ratio=50., file_name) = if request.resolve(_request) then - _source = source.replaygain.compute(request.once(_request)) - source.drop(ratio=ratio, _source) - _source.gain() + get_gain = ref(fun () -> null()) + def process(s) = + s = source.replaygain.compute(s) + get_gain := {s.gain()} + s + end + + request.process(ratio=ratio, process=process, _request) + + fn = get_gain() + fn() else null() end diff --git a/src/libs/tracks.liq b/src/libs/tracks.liq index 0eaf8f243e..9d0affa81c 100644 --- a/src/libs/tracks.liq +++ b/src/libs/tracks.liq @@ -34,6 +34,8 @@ def source.mux.midi(~id=null(), ~(midi:source), s) = source(id=id, source.tracks(s).{midi=source.tracks(midi).midi}) end +let source.drop = () + # Remove the audio track of a source. # @category Source / Track processing def source.drop.audio(~id=null(), s) =