diff --git a/src/core/builtins/builtins_source.ml b/src/core/builtins/builtins_source.ml index 675d50aa56..fe367166f1 100644 --- a/src/core/builtins/builtins_source.ml +++ b/src/core/builtins/builtins_source.ml @@ -162,6 +162,57 @@ let _ = s#on_sleep wrap_f; Lang.unit) +let flush_source ~log ~name ~ratio ~timeout ~sleep_latency s = + let module Time = (val Clock.time_implementation () : Liq_time.T) in + let open Time in + let started = ref false in + let stopped = ref false in + let clock = + Clock.create ~id:name ~sync:`Passive + ~on_error:(fun exn bt -> + stopped := true; + Utils.log_exception ~log + ~bt:(Printexc.raw_backtrace_to_string bt) + (Printf.sprintf "Error while processing source: %s" + (Printexc.to_string exn))) + () + in + let _ = + new Output.dummy + ~clock ~infallible:false + ~on_start:(fun () -> ()) + ~on_stop:(fun () -> stopped := true) + ~register_telnet:false ~autostart:true (Lang.source s) + in + Clock.start ~force:true clock; + log#info "Start source streaming loop (ratio: %.02fx)" ratio; + let start_time = Time.time () in + let timeout = Time.of_float timeout in + let timeout_time = Time.(start_time |+| timeout) in + let sleep_latency = Time.of_float sleep_latency in + let target_time () = + Time.(start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio)) + in + (try + while (not (Atomic.get should_stop)) && not !stopped do + if not !started then started := s#is_ready; + if (not !started) && Time.(timeout_time |<=| start_time) then ( + log#important "Timeout while waiting for the source to start!"; + stopped := true) + else ( + Clock.tick clock; + let target_time = target_time () in + if Time.(time () |<| (target_time |+| sleep_latency)) then + sleep_until target_time) + done + with Clock.Has_stopped -> ()); + let processing_time = Time.(to_float (time () |-| start_time)) in + let effective_ratio = Clock.time clock /. processing_time in + log#info + "Source processed. Total processing time: %.02fs, effective ratio: %.02fx" + processing_time effective_ratio; + Clock.stop clock + let _ = let log = Log.make ["source"; "dump"] in let kind = Lang.univ_t () in @@ -193,64 +244,19 @@ let _ = ] Lang.unit_t (fun p -> - let module Time = (val Clock.time_implementation () : Liq_time.T) in - let open Time in - let started = ref false in - let stopped = ref false in let proto = let p = Pipe_output.file_proto (Lang.univ_t ()) in - List.filter_map - (fun (l, _, v, _) -> - if l <> "on_stop" then Option.map (fun v -> (l, v)) v - else - Some - ( "on_stop", - Lang.val_fun [] (fun _ -> - stopped := true; - Lang.unit) )) - p + List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p in let proto = ("fallible", Lang.bool true) :: proto in let p = (("id", Lang.string "source.drop") :: p) @ proto in - let clock = - Clock.create ~id:"source.dump" ~sync:`Passive - ~on_error:(fun exn bt -> - stopped := true; - Utils.log_exception ~log - ~bt:(Printexc.raw_backtrace_to_string bt) - (Printf.sprintf "Error while dropping source: %s" - (Printexc.to_string exn))) - () - in - let s = Pipe_output.new_file_output ~clock p in + let s = Pipe_output.new_file_output p in let ratio = Lang.to_float (List.assoc "ratio" p) in - let timeout = Time.of_float (Lang.to_float (List.assoc "timeout" p)) in - let sleep_latency = - Time.of_float (Lang.to_float (List.assoc "sleep_latency" p)) - in - Clock.start ~force:true clock; - log#info "Start dumping source (ratio: %.02fx)" ratio; - let start_time = Time.time () in - let timeout_time = Time.(start_time |+| timeout) in - let target_time () = - Time.( - start_time |+| sleep_latency |+| of_float (Clock.time clock /. ratio)) - in - (try - while (not (Atomic.get should_stop)) && not !stopped do - if not !started then started := s#is_ready; - if (not !started) && Time.(timeout_time |<=| start_time) then ( - log#important "Timeout while waiting for the source to start!"; - stopped := true) - else ( - Clock.tick clock; - let target_time = target_time () in - if Time.(time () |<| (target_time |+| sleep_latency)) then - sleep_until target_time) - done - with Clock.Has_stopped -> ()); + let timeout = Lang.to_float (List.assoc "timeout" p) in + let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in + flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency + (s :> Source.source); log#info "Source dumped."; - Clock.stop clock; Lang.unit) let _ = @@ -281,58 +287,9 @@ let _ = ] Lang.unit_t (fun p -> - let module Time = (val Clock.time_implementation () : Liq_time.T) in - let open Time in let s = List.assoc "" p |> Lang.to_source in - let started = ref false in - let stopped = ref false in - let clock = - Clock.create ~id:"source.dump" ~sync:`Passive - ~on_error:(fun exn bt -> - stopped := true; - Utils.log_exception ~log - ~bt:(Printexc.raw_backtrace_to_string bt) - (Printf.sprintf "Error while dropping source: %s" - (Printexc.to_string exn))) - () - in - let _ = - new Output.dummy - ~clock ~infallible:false - ~on_start:(fun () -> ()) - ~on_stop:(fun () -> stopped := true) - ~register_telnet:false ~autostart:true (Lang.source s) - in let ratio = Lang.to_float (List.assoc "ratio" p) in - let timeout = Time.of_float (Lang.to_float (List.assoc "timeout" p)) in - let sleep_latency = - Time.of_float (Lang.to_float (List.assoc "sleep_latency" p)) - in - Clock.start ~force:true clock; - log#info "Start dropping source (ratio: %.02fx)" ratio; - let start_time = Time.time () in - let timeout_time = Time.(start_time |+| timeout) in - let target_time () = - Time.(start_time |+| of_float (Clock.time clock /. ratio)) - in - (try - while (not (Atomic.get should_stop)) && not !stopped do - let start_time = Time.time () in - if not !started then started := s#is_ready; - if (not !started) && Time.(timeout_time |<=| start_time) then ( - log#important "Timeout while waiting for the source to start!"; - stopped := true) - else ( - Clock.tick clock; - let target_time = target_time () in - if Time.(time () |<| (target_time |+| sleep_latency)) then - sleep_until target_time) - done - with Clock.Has_stopped -> ()); - let processing_time = Time.(to_float (time () |-| start_time)) in - let effective_ratio = Clock.time clock /. processing_time in - log#info - "Source dropped. Total processing time: %.02fs, effective ratio: %.02fx" - processing_time effective_ratio; - Clock.stop clock; + let timeout = Lang.to_float (List.assoc "timeout" p) in + let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in + flush_source ~log ~name:"source.dump" ~ratio ~timeout ~sleep_latency s; Lang.unit) diff --git a/tests/core/dune.inc b/tests/core/dune.inc index 07893e27a4..2a45450f8b 100644 --- a/tests/core/dune.inc +++ b/tests/core/dune.inc @@ -41,6 +41,20 @@ (action (run %{ffmpeg_quality} ))) +(executable + (name flush_test) + (modules flush_test) + (libraries liquidsoap_core liquidsoap_optionals)) + +(rule + (alias citest) + (package liquidsoap) + (deps + + (:flush_test flush_test.exe)) + (action (run %{flush_test} ))) + + (executable (name frame_test) (modules frame_test) diff --git a/tests/core/flush_test.ml b/tests/core/flush_test.ml new file mode 100644 index 0000000000..80914e42c4 --- /dev/null +++ b/tests/core/flush_test.ml @@ -0,0 +1,29 @@ +(* This can be used to manually benchmark memory usage. Otherwise, it simply exists. *) + +let () = exit 0 + +let _ = + Stdlib.Lazy.force Builtins_settings.settings_module; + Lang.eval ~cache:true ~typecheck:false ~stdlib:`Disabled + {| +%include "../../src/libs/stdlib.liq" +enable_autocue_metadata() +|} + +let () = + Frame_settings.lazy_config_eval := true; + Dtools.Log.conf_level#set 4; + Dtools.Log.conf_stdout#set true; + Dtools.Log.conf_file#set false; + Dtools.Init.exec Dtools.Log.start; + Tutils.start (); + for _ = 0 to 10 do + let r = + Request.create ~cue_in_metadata:None ~cue_out_metadata:None "/tmp/bla.mp3" + in + ignore (Request.resolve r); + Request.destroy r; + Gc.compact () + done; + Dtools.Init.exec Dtools.Log.stop; + Tutils.shutdown 0