Skip to content

Commit

Permalink
Factor out source.dump/source.drop (#4207)
Browse files Browse the repository at this point in the history
  • Loading branch information
toots authored Nov 9, 2024
1 parent 1882a1f commit ccbd0ef
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 103 deletions.
163 changes: 60 additions & 103 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,57 @@ let _ =
s#on_sleep wrap_f;
Lang.unit)

let flush_source ~log ~name ~ratio ~timeout ~sleep_latency s =
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let started = ref false in
let stopped = ref false in
let clock =
Clock.create ~id:name ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while processing source: %s"
(Printexc.to_string exn)))
()
in
let _ =
new Output.dummy
~clock ~infallible:false
~on_start:(fun () -> ())
~on_stop:(fun () -> stopped := true)
~register_telnet:false ~autostart:true (Lang.source s)
in
Clock.start ~force:true clock;
log#info "Start source streaming loop (ratio: %.02fx)" ratio;
let start_time = Time.time () in
let timeout = Time.of_float timeout in
let timeout_time = Time.(start_time |+| timeout) in
let sleep_latency = Time.of_float sleep_latency in
let target_time () =
Time.(start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
if not !started then started := s#is_ready;
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Clock.Has_stopped -> ());
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Source processed. Total processing time: %.02fs, effective ratio: %.02fx"
processing_time effective_ratio;
Clock.stop clock

let _ =
let log = Log.make ["source"; "dump"] in
let kind = Lang.univ_t () in
Expand Down Expand Up @@ -193,64 +244,19 @@ let _ =
]
Lang.unit_t
(fun p ->
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let started = ref false in
let stopped = ref false in
let proto =
let p = Pipe_output.file_proto (Lang.univ_t ()) in
List.filter_map
(fun (l, _, v, _) ->
if l <> "on_stop" then Option.map (fun v -> (l, v)) v
else
Some
( "on_stop",
Lang.val_fun [] (fun _ ->
stopped := true;
Lang.unit) ))
p
List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p
in
let proto = ("fallible", Lang.bool true) :: proto in
let p = (("id", Lang.string "source.drop") :: p) @ proto in
let clock =
Clock.create ~id:"source.dump" ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while dropping source: %s"
(Printexc.to_string exn)))
()
in
let s = Pipe_output.new_file_output ~clock p in
let s = Pipe_output.new_file_output p in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Time.of_float (Lang.to_float (List.assoc "timeout" p)) in
let sleep_latency =
Time.of_float (Lang.to_float (List.assoc "sleep_latency" p))
in
Clock.start ~force:true clock;
log#info "Start dumping source (ratio: %.02fx)" ratio;
let start_time = Time.time () in
let timeout_time = Time.(start_time |+| timeout) in
let target_time () =
Time.(
start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
if not !started then started := s#is_ready;
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Clock.Has_stopped -> ());
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency
(s :> Source.source);
log#info "Source dumped.";
Clock.stop clock;
Lang.unit)

let _ =
Expand Down Expand Up @@ -281,58 +287,9 @@ let _ =
]
Lang.unit_t
(fun p ->
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let s = List.assoc "" p |> Lang.to_source in
let started = ref false in
let stopped = ref false in
let clock =
Clock.create ~id:"source.dump" ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while dropping source: %s"
(Printexc.to_string exn)))
()
in
let _ =
new Output.dummy
~clock ~infallible:false
~on_start:(fun () -> ())
~on_stop:(fun () -> stopped := true)
~register_telnet:false ~autostart:true (Lang.source s)
in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Time.of_float (Lang.to_float (List.assoc "timeout" p)) in
let sleep_latency =
Time.of_float (Lang.to_float (List.assoc "sleep_latency" p))
in
Clock.start ~force:true clock;
log#info "Start dropping source (ratio: %.02fx)" ratio;
let start_time = Time.time () in
let timeout_time = Time.(start_time |+| timeout) in
let target_time () =
Time.(start_time |+| of_float (Clock.time clock /. ratio))
in
(try
while (not (Atomic.get should_stop)) && not !stopped do
let start_time = Time.time () in
if not !started then started := s#is_ready;
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Clock.Has_stopped -> ());
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Source dropped. Total processing time: %.02fs, effective ratio: %.02fx"
processing_time effective_ratio;
Clock.stop clock;
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency s;
Lang.unit)
14 changes: 14 additions & 0 deletions tests/core/dune.inc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@
(action (run %{ffmpeg_quality} )))


(executable
(name flush_test)
(modules flush_test)
(libraries liquidsoap_core liquidsoap_optionals))

(rule
(alias citest)
(package liquidsoap)
(deps

(:flush_test flush_test.exe))
(action (run %{flush_test} )))


(executable
(name frame_test)
(modules frame_test)
Expand Down
29 changes: 29 additions & 0 deletions tests/core/flush_test.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
(* This can be used to manually benchmark memory usage. Otherwise, it simply exists. *)

let () = exit 0

let _ =
Stdlib.Lazy.force Builtins_settings.settings_module;
Lang.eval ~cache:true ~typecheck:false ~stdlib:`Disabled
{|
%include "../../src/libs/stdlib.liq"
enable_autocue_metadata()
|}

let () =
Frame_settings.lazy_config_eval := true;
Dtools.Log.conf_level#set 4;
Dtools.Log.conf_stdout#set true;
Dtools.Log.conf_file#set false;
Dtools.Init.exec Dtools.Log.start;
Tutils.start ();
for _ = 0 to 10 do
let r =
Request.create ~cue_in_metadata:None ~cue_out_metadata:None "/tmp/bla.mp3"
in
ignore (Request.resolve r);
Request.destroy r;
Gc.compact ()
done;
Dtools.Init.exec Dtools.Log.stop;
Tutils.shutdown 0

0 comments on commit ccbd0ef

Please sign in to comment.