diff --git a/CHANGES.md b/CHANGES.md index 633777dd1e..fde6df39d1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,9 @@ Changed: now returns `max_int` and `int(-infinity)` returns `min_int`. (#3407) - Made default font a setting (#3507) - Changed internal metadata format to be immutable (#3297). +- Removed `source.dump` and `source.drop` in favor of safer `request.dump` and `request.drop`. + `source.{dump, drop}` can still be implemented manually when needed and with the proper + knowledge of what's going on. - Allow a getter for the offset of `on_offset` and dropped the metadata mechanism for updating it (#3355). - `string.length` and `string.sub` now default to `utf8` encoding (#4109) diff --git a/doc/content/migrating.md b/doc/content/migrating.md index 3a99da33b5..78aae83543 100644 --- a/doc/content/migrating.md +++ b/doc/content/migrating.md @@ -78,8 +78,7 @@ end ### Thread queues -In order to improve issues with complex inter-dependent asynchronous tasks such as `autocue` data computation, -scheduler queues have been improved. +In order to improve issues with complex inter-dependent asynchronous tasks, scheduler queues have been updated. User-provided named queues can now be created and used to send asynchronous tasks, making it possible to control concurrency of certain classes of tasks and also to remedy any potential dependency between asynchronous tasks. @@ -101,8 +100,7 @@ asynchronous tasks sent to. Likewise, `request.dynamic`, `playlist`, `single` etc. have also been updated to accept a `thread_queue` argument controlling which asynchronous queue their request resolution tasks should be sent to. -See [the original Pull Request)[https://github.com/savonet/liquidsoap/pull/4151) and [the threads page](threads.html) -for more details. +See [the threads page](threads.html) for more details. ### Replaygain diff --git a/doc/content/threads.md b/doc/content/threads.md index 68d07cb990..45eca25668 100644 --- a/doc/content/threads.md +++ b/doc/content/threads.md @@ -19,7 +19,7 @@ By default, there are two type of queues available in liquidsoap: - `non_blocking` queues By convention, tasks that are known to be executing very fast should be sent to the -`non_blockin` queues and all the other tasks should be sent to the `generic` queue. +`non_blocking` queues and all the other tasks should be sent to the `generic` queue. You can decide which queue to send tasks to by using the `queue` parameter of the `thread.run` functions. Some other operators who also use threads can have a similar @@ -35,12 +35,3 @@ This is particularly useful for two applications: - To control concurrent execution of specific tasks. - To prevent deadlocks in case some tasks depends on other tasks. - -Typically, `autocue` data resolution is executed inside a `request` resolution. To -control the concurrency with which this CPU-intensive task is executed, we place them -in a specific queue. The number of queues controls how many of these tasks can be executed -concurrently. - -Also, this prevents a deadlock where all the request resolution fill up the available -`generic` queues, making it impossible for the autocue computation to finish, thus preventing -the request resolution from returning. diff --git a/src/core/builtins/builtins_request.ml b/src/core/builtins/builtins_request.ml index 5ac113a6a4..9a6daa8b5f 100644 --- a/src/core/builtins/builtins_request.ml +++ b/src/core/builtins/builtins_request.ml @@ -21,6 +21,11 @@ *****************************************************************************) let request = Modules.request +let should_stop = Atomic.make false + +let () = + Lifecycle.before_core_shutdown ~name:"builtin source shutdown" (fun () -> + Atomic.set should_stop true) let _ = Lang.add_builtin ~base:request "all" ~category:`Liquidsoap @@ -295,3 +300,187 @@ let _ = | `Failed -> "failed" in Lang.string s) + +exception Process_failed + +class process ~name r = + object (self) + inherit + Request_dynamic.dynamic + ~name ~priority:`Non_blocking + ~retry_delay:(fun _ -> 0.1) + ~available:(fun _ -> true) + (Lang.val_fun [] (fun _ -> Lang.null)) + 1 None + + initializer + self#on_wake_up (fun () -> + match Request.get_decoder ~ctype:self#content_type r with + | Some _ -> self#set_queue [r] + | None | (exception _) -> raise Process_failed) + end + +let process_request ~log ~name ~ratio ~timeout ~sleep_latency ~process r = + let module Time = (val Clock.time_implementation () : Liq_time.T) in + let open Time in + let start_time = Time.time () in + match Request.resolve ~timeout r with + | `Failed | `Timeout -> () + | `Resolved -> ( + let timeout = Time.of_float timeout in + let timeout_time = Time.(start_time |+| timeout) in + try + let s = new process ~name r in + let s = (process (s :> Source.source) :> Source.source) in + let clock = + Clock.create ~id:name ~sync:`Passive + ~on_error:(fun exn bt -> + Utils.log_exception ~log + ~bt:(Printexc.raw_backtrace_to_string bt) + (Printf.sprintf "Error while processing source: %s" + (Printexc.to_string exn)); + raise Process_failed) + () + in + Fun.protect + ~finally:(fun () -> try Clock.stop clock with _ -> ()) + (fun () -> + let started = ref false in + let stopped = ref false in + let _ = + new Output.dummy + ~clock ~infallible:false ~register_telnet:false + ~on_start:(fun () -> started := true) + ~on_stop:(fun () -> stopped := true) + ~autostart:true (Lang.source s) + in + Clock.start ~force:true clock; + log#info "Start streaming loop (ratio: %.02fx)" ratio; + let sleep_latency = Time.of_float sleep_latency in + let target_time () = + Time.( + start_time |+| sleep_latency + |+| of_float (Clock.time clock /. ratio)) + in + while (not (Atomic.get should_stop)) && not !stopped do + if (not !started) && Time.(timeout_time |<=| Time.time ()) then ( + log#important + "Timeout while waiting for the source to be ready!"; + raise Process_failed) + else ( + Clock.tick clock; + let target_time = target_time () in + if Time.(time () |<| (target_time |+| sleep_latency)) then + sleep_until target_time) + done; + let processing_time = Time.(to_float (time () |-| start_time)) in + let effective_ratio = Clock.time clock /. processing_time in + log#info + "Request processed. Total processing time: %.02fs, effective \ + ratio: %.02fx" + processing_time effective_ratio) + with Process_failed | Clock.Has_stopped -> ()) + +let _ = + let log = Log.make ["request"; "dump"] in + let kind = Lang.univ_t () in + Lang.add_builtin ~base:request "dump" ~category:(`Source `Liquidsoap) + ~descr:"Immediately encode the whole contents of a request into a file." + ~flags:[`Experimental] + [ + ("", Lang.format_t kind, None, Some "Encoding format."); + ("", Lang.string_t, None, Some "Name of the file."); + ("", Request.Value.t, None, Some "Request to encode."); + ( "ratio", + Lang.float_t, + Some (Lang.float 50.), + Some + "Time ratio. A value of `50` means process data at `50x` real rate, \ + when possible." ); + ( "timeout", + Lang.float_t, + Some (Lang.float 1.), + Some + "Stop processing the source if it has not started after the given \ + timeout." ); + ( "sleep_latency", + Lang.float_t, + Some (Lang.float 0.1), + Some + "How much time ahead, in seconds, should we should be before pausing \ + the processing." ); + ] + Lang.unit_t + (fun p -> + let proto = + let p = Pipe_output.file_proto (Lang.univ_t ()) in + List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p + in + let proto = ("fallible", Lang.bool true) :: proto in + let format = Lang.assoc "" 1 p in + let file = Lang.assoc "" 2 p in + let r = Request.Value.of_value (Lang.assoc "" 3 p) in + let process s = + let p = + ("id", Lang.string "request.drop") + :: ("", format) :: ("", file) + :: ("", Lang.source s) + :: (p @ proto) + in + Pipe_output.new_file_output p + in + let ratio = Lang.to_float (List.assoc "ratio" p) in + let timeout = Lang.to_float (List.assoc "timeout" p) in + let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in + process_request ~log ~name:"request.dump" ~ratio ~timeout ~sleep_latency + ~process r; + log#info "Request dumped."; + Lang.unit) + +let _ = + let log = Log.make ["request"; "process"] in + Lang.add_builtin ~base:request "process" ~category:(`Source `Liquidsoap) + ~descr: + "Given a request and an optional function to process this request, \ + animate the source as fast as possible until the request is fully \ + processed." + [ + ("", Request.Value.t, None, Some "Request to process"); + ( "process", + Lang.fun_t + [(false, "", Lang.source_t (Lang.univ_t ()))] + (Lang.source_t (Lang.univ_t ())), + Some (Lang.val_fun [("", "", None)] (fun p -> List.assoc "" p)), + Some "Callback to create the source to animate." ); + ( "ratio", + Lang.float_t, + Some (Lang.float 50.), + Some + "Time ratio. A value of `50` means process data at `50x` real rate, \ + when possible." ); + ( "timeout", + Lang.float_t, + Some (Lang.float 1.), + Some + "Stop processing the source if it has not started after the given \ + timeout." ); + ( "sleep_latency", + Lang.float_t, + Some (Lang.float 0.1), + Some + "How much time ahead, in seconds, should we should be before pausing \ + the processing." ); + ] + Lang.unit_t + (fun p -> + let r = Request.Value.of_value (List.assoc "" p) in + let process = List.assoc "process" p in + let process s = + Lang.to_source (Lang.apply process [("", Lang.source s)]) + in + let ratio = Lang.to_float (List.assoc "ratio" p) in + let timeout = Lang.to_float (List.assoc "timeout" p) in + let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in + process_request ~log ~name:"request.process" ~ratio ~timeout + ~sleep_latency ~process r; + Lang.unit) diff --git a/src/core/builtins/builtins_source.ml b/src/core/builtins/builtins_source.ml index b1f7db39c0..db5614a960 100644 --- a/src/core/builtins/builtins_source.ml +++ b/src/core/builtins/builtins_source.ml @@ -21,11 +21,6 @@ *****************************************************************************) let source = Muxer.source -let should_stop = Atomic.make false - -let () = - Lifecycle.before_core_shutdown ~name:"builtin source shutdown" (fun () -> - Atomic.set should_stop true) let _ = Lang.add_builtin ~base:source "set_name" ~category:(`Source `Liquidsoap) @@ -161,139 +156,3 @@ let _ = let wrap_f () = ignore (Lang.apply f []) in s#on_sleep wrap_f; Lang.unit) - -let flush_source ~log ~name ~ratio ~timeout ~sleep_latency s = - let module Time = (val Clock.time_implementation () : Liq_time.T) in - let open Time in - let started = ref false in - let stopped = ref false in - let clock = - Clock.create ~id:name ~sync:`Passive - ~on_error:(fun exn bt -> - stopped := true; - Utils.log_exception ~log - ~bt:(Printexc.raw_backtrace_to_string bt) - (Printf.sprintf "Error while processing source: %s" - (Printexc.to_string exn))) - () - in - let _ = - new Output.dummy - ~clock ~infallible:false - ~on_start:(fun () -> started := true) - ~on_stop:(fun () -> stopped := true) - ~register_telnet:false ~autostart:true (Lang.source s) - in - Clock.start ~force:true clock; - log#info "Start source streaming loop (ratio: %.02fx)" ratio; - let start_time = Time.time () in - let timeout = Time.of_float timeout in - let timeout_time = Time.(start_time |+| timeout) in - let sleep_latency = Time.of_float sleep_latency in - let target_time () = - Time.(start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio)) - in - (try - while (not (Atomic.get should_stop)) && not !stopped do - if (not !started) && Time.(timeout_time |<=| start_time) then ( - log#important "Timeout while waiting for the source to start!"; - stopped := true) - else ( - Clock.tick clock; - let target_time = target_time () in - if Time.(time () |<| (target_time |+| sleep_latency)) then - sleep_until target_time) - done - with Clock.Has_stopped -> ()); - let processing_time = Time.(to_float (time () |-| start_time)) in - let effective_ratio = Clock.time clock /. processing_time in - log#info - "Source processed. Total processing time: %.02fs, effective ratio: %.02fx" - processing_time effective_ratio; - Clock.stop clock - -let flush_source ~log ~name ~ratio ~timeout ~sleep_latency s = - if Tutils.running () then - flush_source ~log ~name ~ratio ~timeout ~sleep_latency s - else log#important "Cannot run %s: scheduler not started!" name - -let _ = - let log = Log.make ["source"; "dump"] in - let kind = Lang.univ_t () in - Lang.add_builtin ~base:source "dump" ~category:(`Source `Liquidsoap) - ~descr:"Immediately encode the whole contents of a source into a file." - ~flags:[`Experimental] - [ - ("", Lang.format_t kind, None, Some "Encoding format."); - ("", Lang.string_t, None, Some "Name of the file."); - ("", Lang.source_t kind, None, Some "Source to encode."); - ( "ratio", - Lang.float_t, - Some (Lang.float 50.), - Some - "Time ratio. A value of `50` means process data at `50x` real rate, \ - when possible." ); - ( "timeout", - Lang.float_t, - Some (Lang.float 1.), - Some - "Stop processing the source if it has not started after the given \ - timeout." ); - ( "sleep_latency", - Lang.float_t, - Some (Lang.float 0.1), - Some - "How much time ahead, in seconds, should we should be before pausing \ - the processing." ); - ] - Lang.unit_t - (fun p -> - let proto = - let p = Pipe_output.file_proto (Lang.univ_t ()) in - List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p - in - let proto = ("fallible", Lang.bool true) :: proto in - let p = (("id", Lang.string "source.drop") :: p) @ proto in - let s = Pipe_output.new_file_output p in - let ratio = Lang.to_float (List.assoc "ratio" p) in - let timeout = Lang.to_float (List.assoc "timeout" p) in - let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in - flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency - (s :> Source.source); - log#info "Source dumped."; - Lang.unit) - -let _ = - let log = Log.make ["source"; "drop"] in - Lang.add_builtin ~base:source "drop" ~category:(`Source `Liquidsoap) - ~descr:"Animate the source as fast as possible, dropping its output." - ~flags:[`Experimental] - [ - ("", Lang.source_t (Lang.univ_t ()), None, Some "Source to animate."); - ( "ratio", - Lang.float_t, - Some (Lang.float 50.), - Some - "Time ratio. A value of `50` means process data at `50x` real rate, \ - when possible." ); - ( "timeout", - Lang.float_t, - Some (Lang.float 1.), - Some - "Stop processing the source if it has not started after the given \ - timeout." ); - ( "sleep_latency", - Lang.float_t, - Some (Lang.float 0.1), - Some - "How much time ahead, in seconds, should we should be before pausing \ - the processing." ); - ] - Lang.unit_t - (fun p -> - let s = List.assoc "" p |> Lang.to_source in - let ratio = Lang.to_float (List.assoc "ratio" p) in - let timeout = Lang.to_float (List.assoc "timeout" p) in - let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in - flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency s; - Lang.unit) diff --git a/src/core/clock.ml b/src/core/clock.ml index b1da4168a5..7089e9de5b 100644 --- a/src/core/clock.ml +++ b/src/core/clock.ml @@ -45,26 +45,6 @@ let log = Log.make ["clock"] let conf_clock = Dtools.Conf.void ~p:(Configure.conf#plug "clock") "Clock settings" -(** If true, a clock keeps running when an output fails. Other outputs may - * still be useful. But there may also be some useless inputs left. - * If no active output remains, the clock will exit without triggering - * shutdown. We may need some device to allow this (but active and passive - * clocks will have to be treated separately). *) -let allow_streaming_errors = - Dtools.Conf.bool - ~p:(conf_clock#plug "allow_streaming_errors") - ~d:false "Handling of streaming errors" - ~comments: - [ - "Control the behaviour of clocks when an error occurs during streaming."; - "This has no effect on errors occurring during source initializations."; - "By default, any error will cause liquidsoap to shutdown. If errors"; - "are allowed, faulty sources are simply removed and clocks keep \ - running."; - "Allowing errors can result in complex surprising situations;"; - "use at your own risk!"; - ] - let conf_log_delay = Dtools.Conf.float ~p:(conf_clock#plug "log_delay") @@ -380,6 +360,20 @@ let started c = | `Stopping _ | `Started _ -> true | `Stopped _ -> false +let wrap_errors clock fn s = + try fn s + with exn when exn <> Has_stopped -> + let bt = Printexc.get_raw_backtrace () in + Printf.printf "Error: %s\n%s\n%!" (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt); + log#severe "Source %s failed while streaming: %s!\n%s" s#id + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt); + _detach clock s; + if Queue.length clock.on_error > 0 then + Queue.iter clock.on_error (fun fn -> fn exn bt) + else Printexc.raise_with_backtrace exn bt + let rec active_params c = match Atomic.get (Unifier.deref c).state with | `Stopping s | `Started s -> s @@ -387,13 +381,14 @@ let rec active_params c = | _ -> raise Invalid_state and _activate_pending_sources ~clock x = - Queue.flush_iter clock.pending_activations (fun s -> - check_stopped (); - s#wake_up; - match s#source_type with - | `Active _ -> WeakQueue.push x.active_sources s - | `Output _ -> Queue.push x.outputs s - | `Passive -> WeakQueue.push x.passive_sources s) + Queue.flush_iter clock.pending_activations + (wrap_errors clock (fun s -> + check_stopped (); + s#wake_up; + match s#source_type with + | `Active _ -> WeakQueue.push x.active_sources s + | `Output _ -> Queue.push x.outputs s + | `Passive -> WeakQueue.push x.passive_sources s)) and _tick ~clock x = _activate_pending_sources ~clock x; @@ -402,21 +397,11 @@ and _tick ~clock x = in let sources = _animated_sources x in List.iter - (fun s -> - check_stopped (); - try - match s#source_type with - | `Output s | `Active s -> s#output - | _ -> assert false - with exn when exn <> Has_stopped -> - let bt = Printexc.get_raw_backtrace () in - if Queue.is_empty clock.on_error then ( - log#severe "Source %s failed while streaming: %s!\n%s" s#id - (Printexc.to_string exn) - (Printexc.raw_backtrace_to_string bt); - if not allow_streaming_errors#get then Tutils.shutdown 1 - else _detach clock s) - else Queue.iter clock.on_error (fun fn -> fn exn bt)) + (wrap_errors clock (fun s -> + check_stopped (); + match s#source_type with + | `Output s | `Active s -> s#output + | _ -> assert false)) sources; Queue.flush_iter x.on_tick (fun fn -> check_stopped (); diff --git a/src/core/lang_source.ml b/src/core/lang_source.ml index b34070636d..f00e120bcb 100644 --- a/src/core/lang_source.ml +++ b/src/core/lang_source.ml @@ -55,15 +55,16 @@ module ClockValue = struct Lang.val_fun [] (fun _ -> Lang.string Clock.(string_of_sync_mode (sync c))) ); ( "start", - Lang.fun_t [] Lang.unit_t, + Lang.fun_t [(true, "force", Lang.bool_t)] Lang.unit_t, "Start the clock.", fun c -> Lang.val_fun - [("", "", Some (Lang.string "auto"))] + [("force", "force", Some (Lang.bool true))] (fun p -> let pos = Lang.pos p in + let force = Lang.to_bool (List.assoc "force" p) in try - Clock.start c; + Clock.start ~force c; Lang.unit with Clock.Invalid_state -> Runtime_error.raise @@ -94,6 +95,13 @@ module ClockValue = struct let c' = of_value (List.assoc "" p) in Clock.unify ~pos c c'; Lang.unit) ); + ( "tick", + Lang.fun_t [] Lang.unit_t, + "Animate the clock and run one tick", + fun c -> + Lang.val_fun [] (fun _ -> + Clock.tick c; + Lang.unit) ); ( "ticks", Lang.fun_t [] Lang.int_t, "The total number of times the clock has ticked.", diff --git a/src/core/outputs/output.ml b/src/core/outputs/output.ml index fdd892d212..a2622a88c1 100644 --- a/src/core/outputs/output.ml +++ b/src/core/outputs/output.ml @@ -182,7 +182,7 @@ class virtual output ~output_kind ?clock ?(name = "") ~infallible if not self#fallible then ( self#log#critical "Infallible source produced a partial frame!"; assert false); - self#log#important "Source ended (no more tracks) stopping output..."; + self#log#info "Source ended (no more tracks) stopping output..."; self#transition_to `Idle); if skip then ( diff --git a/src/core/source.ml b/src/core/source.ml index d3bd3103e0..8fe69c738d 100644 --- a/src/core/source.ml +++ b/src/core/source.ml @@ -242,11 +242,12 @@ class virtual operator ?(stack = []) ?clock ?(name = "src") sources = List.iter (fun fn -> fn ()) on_wake_up with exn -> Atomic.set is_up `Error; - let bt = Printexc.get_backtrace () in - Utils.log_exception ~log ~bt - (Printf.sprintf "Error when starting source %s: %s!" self#id + let bt = Printexc.get_raw_backtrace () in + Utils.log_exception ~log + ~bt:(Printexc.raw_backtrace_to_string bt) + (Printf.sprintf "Error while starting source %s: %s!" self#id (Printexc.to_string exn)); - Tutils.shutdown 1) + Printexc.raise_with_backtrace exn bt) val mutable on_sleep = [] method on_sleep fn = on_sleep <- fn :: on_sleep diff --git a/src/core/sources/request_dynamic.ml b/src/core/sources/request_dynamic.ml index 8995487d85..b3aa692b2c 100644 --- a/src/core/sources/request_dynamic.ml +++ b/src/core/sources/request_dynamic.ml @@ -59,11 +59,11 @@ let () = Lifecycle.before_core_shutdown ~name:"request.dynamic shutdown" (fun () -> Atomic.set should_fail true) -class dynamic ~priority ~retry_delay ~available (f : Lang.value) prefetch - timeout = +class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available + (f : Lang.value) prefetch timeout = let available () = (not (Atomic.get should_fail)) && available () in object (self) - inherit source ~name:"request.dynamic" () + inherit source ~name () method fallible = true val mutable remaining = 0 method remaining = remaining diff --git a/src/core/stream/content_base.ml b/src/core/stream/content_base.ml index 02146d373d..ad93d9bfde 100644 --- a/src/core/stream/content_base.ml +++ b/src/core/stream/content_base.ml @@ -330,7 +330,7 @@ module MkContentBase (C : ContentSpecs) : let consolidate_chunks = let consolidate_chunk ~buf pos ({ data; offset } as chunk) = let length = chunk_length chunk in - C.blit data offset buf pos length; + if length > 0 then C.blit data offset buf pos length; pos + length in fun ~copy d -> diff --git a/src/libs/autocue.liq b/src/libs/autocue.liq index ca98462130..b262242965 100644 --- a/src/libs/autocue.liq +++ b/src/libs/autocue.liq @@ -59,25 +59,6 @@ let settings.autocue.internal.metadata_override = ] ) -let settings.autocue.internal.queues_count = - settings.make( - description= - "Number of dedicated queues for resolving autocue data using the internal \ - implementation", - 1 - ) - -def settings.autocue.internal.queues_count.set(c) = - settings.scheduler.queues := - [ - ...list.assoc.remove("autocue", settings.scheduler.queues()), - ("autocue", c) - ] - settings.autocue.internal.queues_count.set(c) -end - -settings.autocue.internal.queues_count.set(1) - let settings.autocue.internal.lufs_target = settings.make( description= @@ -206,30 +187,30 @@ def autocue.internal.ebur128(~duration, ~ratio=50., ~timeout=10., filename) = ) [] else - s = - request.once( - thread_queue="autocue", request.create(resolve_metadata=false, filename) - ) + r = request.create(resolve_metadata=false, filename) frames = ref([]) - def ebur128(s) = - def mk_filter(graph) = - let {audio = a} = source.tracks(s) - a = ffmpeg.filter.audio.input(graph, a) - let ([a], _) = ffmpeg.filter.ebur128(metadata=true, graph, a) + def process(s) = + def ebur128(s) = + def mk_filter(graph) = + let {audio = a} = source.tracks(s) + a = ffmpeg.filter.audio.input(graph, a) + let ([a], _) = ffmpeg.filter.ebur128(metadata=true, graph, a) + + # ebur filter seems to generate invalid PTS. + a = ffmpeg.filter.asetpts(expr="N/SR/TB", graph, a) + a = ffmpeg.filter.audio.output(id="filter_output", graph, a) + source({audio=a, metadata=track.metadata(a)}) + end - # ebur filter seems to generate invalid PTS. - a = ffmpeg.filter.asetpts(expr="N/SR/TB", graph, a) - a = ffmpeg.filter.audio.output(id="filter_output", graph, a) - source({audio=a, metadata=track.metadata(a)}) + ffmpeg.filter.create(mk_filter) end - ffmpeg.filter.create(mk_filter) + s = ebur128(s) + source.on_metadata(s, fun (m) -> frames := [...frames(), m]) end - s = ebur128(s) - s = source.on_metadata(s, fun (m) -> frames := [...frames(), m]) - source.drop(ratio=ratio, s) + request.process(ratio=ratio, process=process, r) frames() end @@ -794,7 +775,7 @@ def autocue.internal.implementation( fade_out_type?: string, fade_out_curve?: float, start_next?: float, - extra_metadata?: [(string*string)] + extra_metadata?: [(string * string)] } ) end diff --git a/src/libs/playlist.liq b/src/libs/playlist.liq index 1885731c72..89251c94c9 100644 --- a/src/libs/playlist.liq +++ b/src/libs/playlist.liq @@ -100,17 +100,8 @@ end # @category File # @param ~mime_type Default MIME type for the playlist. `null` means automatic detection. # @param ~timeout Timeout for resolving the playlist -# @param ~cue_in_metadata Metadata for cue in points. Disabled if `null`. -# @param ~cue_out_metadata Metadata for cue out points. Disabled if `null`. # @param uri Path to the playlist -def playlist.files( - ~id=null(), - ~mime_type=null(), - ~timeout=null(), - ~cue_in_metadata=null("liq_cue_in"), - ~cue_out_metadata=null("liq_cue_out"), - uri -) = +def playlist.files(~id=null(), ~mime_type=null(), ~timeout=null(), uri) = id = id ?? playlist.id(default="playlist.files", uri) if @@ -124,10 +115,7 @@ def playlist.files( files = list.filter(fun (f) -> not (file.is_directory(f)), files) files else - pl = - request.create( - cue_in_metadata=cue_in_metadata, cue_out_metadata=cue_out_metadata, uri - ) + pl = request.create(resolve_metadata=false, uri) result = if request.resolve(timeout=timeout, pl) @@ -533,12 +521,7 @@ def replaces playlist( files = try playlist.files( - id=id, - mime_type=mime_type, - timeout=timeout, - cue_in_metadata=cue_in_metadata, - cue_out_metadata=cue_out_metadata, - playlist_uri + id=id, mime_type=mime_type, timeout=timeout, playlist_uri ) catch err do log.info( diff --git a/src/libs/protocols.liq b/src/libs/protocols.liq index afa8d7b1c4..4f4b92be4c 100644 --- a/src/libs/protocols.liq +++ b/src/libs/protocols.liq @@ -660,9 +660,7 @@ def protocol.stereo(~rlog=_, ~maxtime=_, arg) = ) null() else - # TODO: the following sometimes hangs, so we resolve twice... - # source.dump(%wav, file, source.stereo(once(request.queue(queue=[r])))) - source.dump(%wav, file, stereo(once(single(arg)))) + request.dump(%wav, file, request.create(arg)) file end end @@ -1066,7 +1064,19 @@ def synth_protocol(~rlog=_, ~maxtime=_, text) = label="synth", "Synthesizing #{shape()} in #{file}." ) - source.dump(%wav, file, once(s)) + + clock.assign_new(sync="passive", [s]) + + stopped = ref(false) + output.file( + fallible=true, on_stop={stopped.set(true)}, %wav, file, once(s) + ) + + c = clock(s.clock) + c.start() + while not stopped() do c.tick() end + c.stop() + file end diff --git a/src/libs/replaygain.liq b/src/libs/replaygain.liq index 14d28a9f89..65da5042f7 100644 --- a/src/libs/replaygain.liq +++ b/src/libs/replaygain.liq @@ -22,9 +22,17 @@ def file.replaygain.compute(~ratio=50., file_name) = if request.resolve(_request) then - _source = source.replaygain.compute(request.once(_request)) - source.drop(ratio=ratio, _source) - _source.gain() + get_gain = ref(fun () -> null()) + def process(s) = + s = source.replaygain.compute(s) + get_gain := {s.gain()} + s + end + + request.process(ratio=ratio, process=process, _request) + + fn = get_gain() + fn() else null() end diff --git a/src/libs/tracks.liq b/src/libs/tracks.liq index 0eaf8f243e..9d0affa81c 100644 --- a/src/libs/tracks.liq +++ b/src/libs/tracks.liq @@ -34,6 +34,8 @@ def source.mux.midi(~id=null(), ~(midi:source), s) = source(id=id, source.tracks(s).{midi=source.tracks(midi).midi}) end +let source.drop = () + # Remove the audio track of a source. # @category Source / Track processing def source.drop.audio(~id=null(), s) =