Skip to content

Commit

Permalink
Remove source.{dump, drop}, add request.{dump, process}.
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Nov 19, 2024
1 parent cf3fd62 commit 7f0e7b9
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 188 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ Changed:
now returns `max_int` and `int(-infinity)` returns `min_int`. (#3407)
- Made default font a setting (#3507)
- Changed internal metadata format to be immutable (#3297).
- Removed `source.dump` and `source.drop` in favor safer `request.dump` and `request.drop`.
`source.{dump, drop}` can still be implemented manually when needed and with the proper
knowledge of what's going on.
- Allow a getter for the offset of `on_offset` and dropped the metadata
mechanism for updating it (#3355).
- `string.length` and `string.sub` now default to `utf8` encoding (#4109)
Expand Down
185 changes: 185 additions & 0 deletions src/core/builtins/builtins_request.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
*****************************************************************************)

let request = Modules.request
let should_stop = Atomic.make false

let () =
Lifecycle.before_core_shutdown ~name:"builtin source shutdown" (fun () ->
Atomic.set should_stop true)

let _ =
Lang.add_builtin ~base:request "all" ~category:`Liquidsoap
Expand Down Expand Up @@ -295,3 +300,183 @@ let _ =
| `Failed -> "failed"
in
Lang.string s)

exception Invalid

class flush ~name r =
object (self)
inherit
Request_dynamic.dynamic
~name ~priority:`Non_blocking
~retry_delay:(fun _ -> 0.1)
~available:(fun _ -> true)
(Lang.val_fun [] (fun _ -> Lang.null))
1 None

initializer
self#on_wake_up (fun () ->
match Request.get_decoder ~ctype:self#content_type r with
| Some _ -> self#set_queue [r]
| None | (exception _) -> raise Invalid)
end

let flush_request ~log ~name ~ratio ~timeout ~sleep_latency ~process r =
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let start_time = Time.time () in
match Request.resolve ~timeout r with
| `Failed | `Timeout -> ()
| `Resolved ->
let timeout = Time.of_float timeout in
let timeout_time = Time.(start_time |+| timeout) in
let started = ref false in
let stopped = ref false in
let s = new flush ~name r in
let s = (process (s :> Source.source) :> Source.source) in
let clock =
Clock.create ~id:name ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while processing source: %s"
(Printexc.to_string exn)))
()
in
let _ =
new Output.dummy
~clock ~infallible:false
~on_start:(fun () -> started := true)
~on_stop:(fun () -> stopped := true)
~register_telnet:false ~autostart:true (Lang.source s)
in
Clock.start ~force:true clock;
log#info "Start streaming loop (ratio: %.02fx)" ratio;
let sleep_latency = Time.of_float sleep_latency in
let target_time () =
Time.(
start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Invalid | Clock.Has_stopped -> ());
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Request processed. Total processing time: %.02fs, effective ratio: \
%.02fx"
processing_time effective_ratio;
Clock.stop clock

let _ =
let log = Log.make ["request"; "dump"] in
let kind = Lang.univ_t () in
Lang.add_builtin ~base:request "dump" ~category:(`Source `Liquidsoap)
~descr:"Immediately encode the whole contents of a request into a file."
~flags:[`Experimental]
[
("", Lang.format_t kind, None, Some "Encoding format.");
("", Lang.string_t, None, Some "Name of the file.");
("", Request.Value.t, None, Some "Request to encode.");
( "ratio",
Lang.float_t,
Some (Lang.float 50.),
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let proto =
let p = Pipe_output.file_proto (Lang.univ_t ()) in
List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p
in
let proto = ("fallible", Lang.bool true) :: proto in
let format = Lang.assoc "" 1 p in
let file = Lang.assoc "" 2 p in
let r = Request.Value.of_value (Lang.assoc "" 3 p) in
let process s =
let p =
("id", Lang.string "request.drop")
:: ("", format) :: ("", file)
:: ("", Lang.source s)
:: (p @ proto)
in
Pipe_output.new_file_output p
in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
flush_request ~log ~name:"request.dump" ~ratio ~timeout ~sleep_latency
~process r;
log#info "Request dumped.";
Lang.unit)

let _ =
let log = Log.make ["request"; "process"] in
Lang.add_builtin ~base:request "process" ~category:(`Source `Liquidsoap)
~descr:
"Given a request and an optional function to process this request, \
animate the source as fast as possible until the request is fully \
processed."
[
("", Request.Value.t, None, Some "Request to process");
( "process",
Lang.fun_t
[(false, "", Lang.source_t (Lang.univ_t ()))]
(Lang.source_t (Lang.univ_t ())),
Some (Lang.val_fun [("", "", None)] (fun p -> List.assoc "" p)),
Some "Callback to create the source to animate." );
( "ratio",
Lang.float_t,
Some (Lang.float 50.),
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let r = Request.Value.of_value (List.assoc "" p) in
let process = List.assoc "process" p in
let process s =
Lang.to_source (Lang.apply process [("", Lang.source s)])
in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
flush_request ~log ~name:"request.process" ~ratio ~timeout ~sleep_latency
~process r;
Lang.unit)
141 changes: 0 additions & 141 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@
*****************************************************************************)

let source = Muxer.source
let should_stop = Atomic.make false

let () =
Lifecycle.before_core_shutdown ~name:"builtin source shutdown" (fun () ->
Atomic.set should_stop true)

let _ =
Lang.add_builtin ~base:source "set_name" ~category:(`Source `Liquidsoap)
Expand Down Expand Up @@ -161,139 +156,3 @@ let _ =
let wrap_f () = ignore (Lang.apply f []) in
s#on_sleep wrap_f;
Lang.unit)

let flush_source ~log ~name ~ratio ~timeout ~sleep_latency s =
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let started = ref false in
let stopped = ref false in
let clock =
Clock.create ~id:name ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while processing source: %s"
(Printexc.to_string exn)))
()
in
let _ =
new Output.dummy
~clock ~infallible:false
~on_start:(fun () -> started := true)
~on_stop:(fun () -> stopped := true)
~register_telnet:false ~autostart:true (Lang.source s)
in
Clock.start ~force:true clock;
log#info "Start source streaming loop (ratio: %.02fx)" ratio;
let start_time = Time.time () in
let timeout = Time.of_float timeout in
let timeout_time = Time.(start_time |+| timeout) in
let sleep_latency = Time.of_float sleep_latency in
let target_time () =
Time.(start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Clock.Has_stopped -> ());
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Source processed. Total processing time: %.02fs, effective ratio: %.02fx"
processing_time effective_ratio;
Clock.stop clock

let flush_source ~log ~name ~ratio ~timeout ~sleep_latency s =
if Tutils.running () then
flush_source ~log ~name ~ratio ~timeout ~sleep_latency s
else log#important "Cannot run %s: scheduler not started!" name

let _ =
let log = Log.make ["source"; "dump"] in
let kind = Lang.univ_t () in
Lang.add_builtin ~base:source "dump" ~category:(`Source `Liquidsoap)
~descr:"Immediately encode the whole contents of a source into a file."
~flags:[`Experimental]
[
("", Lang.format_t kind, None, Some "Encoding format.");
("", Lang.string_t, None, Some "Name of the file.");
("", Lang.source_t kind, None, Some "Source to encode.");
( "ratio",
Lang.float_t,
Some (Lang.float 50.),
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let proto =
let p = Pipe_output.file_proto (Lang.univ_t ()) in
List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p
in
let proto = ("fallible", Lang.bool true) :: proto in
let p = (("id", Lang.string "source.drop") :: p) @ proto in
let s = Pipe_output.new_file_output p in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency
(s :> Source.source);
log#info "Source dumped.";
Lang.unit)

let _ =
let log = Log.make ["source"; "drop"] in
Lang.add_builtin ~base:source "drop" ~category:(`Source `Liquidsoap)
~descr:"Animate the source as fast as possible, dropping its output."
~flags:[`Experimental]
[
("", Lang.source_t (Lang.univ_t ()), None, Some "Source to animate.");
( "ratio",
Lang.float_t,
Some (Lang.float 50.),
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let s = List.assoc "" p |> Lang.to_source in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency s;
Lang.unit)
7 changes: 7 additions & 0 deletions src/core/lang_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ module ClockValue = struct
let c' = of_value (List.assoc "" p) in
Clock.unify ~pos c c';
Lang.unit) );
( "tick",
Lang.fun_t [] Lang.unit_t,
"Animate the clock and run one tick",
fun c ->
Lang.val_fun [] (fun _ ->
Clock.tick c;
Lang.unit) );
( "ticks",
Lang.fun_t [] Lang.int_t,
"The total number of times the clock has ticked.",
Expand Down
6 changes: 3 additions & 3 deletions src/core/sources/request_dynamic.ml
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ let () =
Lifecycle.before_core_shutdown ~name:"request.dynamic shutdown" (fun () ->
Atomic.set should_fail true)

class dynamic ~priority ~retry_delay ~available (f : Lang.value) prefetch
timeout =
class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
(f : Lang.value) prefetch timeout =
let available () = (not (Atomic.get should_fail)) && available () in
object (self)
inherit source ~name:"request.dynamic" ()
inherit source ~name ()
method fallible = true
val mutable remaining = 0
method remaining = remaining
Expand Down
Loading

0 comments on commit 7f0e7b9

Please sign in to comment.