Skip to content

Commit

Permalink
Backport autocue improvements from 2.2.x (#4184)
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Jan 3, 2025
1 parent 7387b07 commit 0705edb
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 71 deletions.
30 changes: 28 additions & 2 deletions src/core/builtins/builtins_resolvers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ let _ =
Lang.getter_t Lang.int_t,
Some (Lang.int 1),
Some "Resolver's priority." );
( "mime_types",
Lang.nullable_t (Lang.list_t Lang.string_t),
Some Lang.null,
Some
"Decode files that match the mime types in this list. Accept any \
file if `null`." );
( "file_extensions",
Lang.nullable_t (Lang.list_t Lang.string_t),
Some Lang.null,
Some
"Decode files that have the file extensions in this list. Accept any \
file if `null`." );
("", Lang.string_t, None, Some "Format/resolver's name.");
( "",
resolver_t,
Expand All @@ -47,11 +59,25 @@ let _ =
(fun p ->
let format = Lang.to_string (Lang.assoc "" 1 p) in
let f = Lang.assoc "" 2 p in
let mimes =
Lang.to_valued_option
(fun v -> List.map Lang.to_string (Lang.to_list v))
(List.assoc "mime_types" p)
in
let extensions =
Lang.to_valued_option
(fun v -> List.map Lang.to_string (Lang.to_list v))
(List.assoc "file_extensions" p)
in
let log = Log.make ["decoder"; "metadata"] in
let priority = Lang.to_int_getter (List.assoc "priority" p) in
let resolver ~metadata ~extension:_ ~mime:_ name =
let resolver ~metadata ~extension ~mime fname =
if
not (Decoder.test_file ~log ~extension ~mime ~mimes ~extensions fname)
then raise Metadata.Invalid;
let ret =
Lang.apply f
[("metadata", Lang.metadata metadata); ("", Lang.string name)]
[("metadata", Lang.metadata metadata); ("", Lang.string fname)]
in
let ret = Lang.to_list ret in
let ret = List.map Lang.to_product ret in
Expand Down
120 changes: 101 additions & 19 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,24 @@ let _ =
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let started = ref false in
let stopped = ref false in
let proto =
let p = Pipe_output.file_proto (Lang.univ_t ()) in
Expand All @@ -198,18 +211,44 @@ let _ =
p
in
let proto = ("fallible", Lang.bool true) :: proto in
let p = (("id", Lang.string "source_dumper") :: p) @ proto in
let clock = Clock.create ~id:"source_dumper" ~sync:`Passive () in
let _ = Pipe_output.new_file_output ~clock p in
let p = (("id", Lang.string "source.drop") :: p) @ proto in
let clock =
Clock.create ~id:"source.dump" ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while dropping source: %s"
(Printexc.to_string exn)))
()
in
let s = Pipe_output.new_file_output ~clock p in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let latency = Time.of_float (Lazy.force Frame.duration /. ratio) in
Clock.start clock;
let timeout = Time.of_float (Lang.to_float (List.assoc "timeout" p)) in
let sleep_latency =
Time.of_float (Lang.to_float (List.assoc "sleep_latency" p))
in
Clock.start ~force:true clock;
log#info "Start dumping source (ratio: %.02fx)" ratio;
while (not (Atomic.get should_stop)) && not !stopped do
let start_time = Time.time () in
Clock.tick clock;
sleep_until (start_time |+| latency)
done;
let start_time = Time.time () in
let timeout_time = Time.(start_time |+| timeout) in
let target_time () =
Time.(
start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
if not !started then started := s#is_ready;
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Clock.Has_stopped -> ());
log#info "Source dumped.";
Clock.stop clock;
Lang.unit)
Expand All @@ -227,14 +266,36 @@ let _ =
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let s = List.assoc "" p |> Lang.to_source in
let started = ref false in
let stopped = ref false in
let clock = Clock.create ~id:"source_dumper" ~sync:`Passive () in
let clock =
Clock.create ~id:"source.dump" ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while dropping source: %s"
(Printexc.to_string exn)))
()
in
let _ =
new Output.dummy
~clock ~infallible:false
Expand All @@ -243,14 +304,35 @@ let _ =
~register_telnet:false ~autostart:true (Lang.source s)
in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let latency = Time.of_float (Lazy.force Frame.duration /. ratio) in
Clock.start clock;
let timeout = Time.of_float (Lang.to_float (List.assoc "timeout" p)) in
let sleep_latency =
Time.of_float (Lang.to_float (List.assoc "sleep_latency" p))
in
Clock.start ~force:true clock;
log#info "Start dropping source (ratio: %.02fx)" ratio;
while (not (Atomic.get should_stop)) && not !stopped do
let start_time = Time.time () in
Clock.tick clock;
sleep_until (start_time |+| latency)
done;
log#info "Source dropped.";
let start_time = Time.time () in
let timeout_time = Time.(start_time |+| timeout) in
let target_time () =
Time.(start_time |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
let start_time = Time.time () in
if not !started then started := s#is_ready;
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Clock.Has_stopped -> ());
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Source dropped. Total processing time: %.02fs, effective ratio: %.02fx"
processing_time effective_ratio;
Clock.stop clock;
Lang.unit)
21 changes: 15 additions & 6 deletions src/core/clock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ let conf_clock_preferred =
let conf_clock_sleep_latency =
Dtools.Conf.int
~p:(conf_clock#plug "sleep_latency")
~d:1
~d:5
"How much time ahead (in frame duration) we should be until we let the \
streaming loop sleep."
~comments:
Expand Down Expand Up @@ -316,9 +316,13 @@ let ticks c =
| `Stopped _ -> 0
| `Stopping { ticks } | `Started { ticks } -> Atomic.get ticks

let _target_time { time_implementation; t0; frame_duration; ticks } =
let _time { time_implementation; frame_duration; ticks } =
let module Time = (val time_implementation : Liq_time.T) in
Time.(t0 |+| (frame_duration |*| of_float (float_of_int (Atomic.get ticks))))
Time.(frame_duration |*| of_float (float_of_int (Atomic.get ticks)))

let _target_time ({ time_implementation; t0 } as c) =
let module Time = (val time_implementation : Liq_time.T) in
Time.(t0 |+| _time c)

let _set_time { time_implementation; t0; frame_duration; ticks } t =
let module Time = (val time_implementation : Liq_time.T) in
Expand Down Expand Up @@ -464,7 +468,7 @@ and _can_start ?(force = false) clock =
`True sync
| _ -> `False

and _start ~sync clock =
and _start ?force ~sync clock =
Unifier.set clock.id (Lang_string.generate_id (Unifier.deref clock.id));
let id = _id clock in
log#important "Starting clock %s with %d source(s) and sync: %s" id
Expand Down Expand Up @@ -497,14 +501,14 @@ and _start ~sync clock =
ticks = Atomic.make 0;
}
in
Queue.iter clock.sub_clocks (fun c -> start c);
Queue.iter clock.sub_clocks (fun c -> start ?force c);
Atomic.set clock.state (`Started x);
if sync <> `Passive then _clock_thread ~clock x

and start ?force c =
let clock = Unifier.deref c in
match _can_start ?force clock with
| `True sync -> _start ~sync clock
| `True sync -> _start ?force ~sync clock
| `False -> ()

let create ?(stack = []) ?on_error ?(id = "generic") ?(sub_ids = [])
Expand All @@ -526,6 +530,11 @@ let create ?(stack = []) ?on_error ?(id = "generic") ?(sub_ids = [])
Queue.push clocks c;
c

let time c =
let ({ time_implementation } as c) = active_params c in
let module Time = (val time_implementation : Liq_time.T) in
Time.to_float (_time c)

let start_pending () =
let c = Queue.flush_elements clocks in
let c = List.map (fun c -> (c, Unifier.deref c)) c in
Expand Down
2 changes: 2 additions & 0 deletions src/core/clock.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*****************************************************************************)

exception Invalid_state
exception Has_stopped

type t
type active_source = < reset : unit ; output : unit >
Expand Down Expand Up @@ -96,6 +97,7 @@ val started : t -> bool
val stop : t -> unit
val set_stack : t -> Liquidsoap_lang.Pos.t list -> unit
val self_sync : t -> bool
val time : t -> float
val unify : pos:Liquidsoap_lang.Pos.Option.t -> t -> t -> unit
val create_sub_clock : id:string -> t -> t
val attach : t -> source -> unit
Expand Down
Loading

0 comments on commit 0705edb

Please sign in to comment.