Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove source.{dump, drop}, add request.{dump, process}. #4216

Merged
merged 9 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ Changed:
now returns `max_int` and `int(-infinity)` returns `min_int`. (#3407)
- Made default font a setting (#3507)
- Changed internal metadata format to be immutable (#3297).
- Removed `source.dump` and `source.drop` in favor of safer `request.dump` and `request.drop`.
`source.{dump, drop}` can still be implemented manually when needed and with the proper
knowledge of what's going on.
- Allow a getter for the offset of `on_offset` and dropped the metadata
mechanism for updating it (#3355).
- `string.length` and `string.sub` now default to `utf8` encoding (#4109)
Expand Down
6 changes: 2 additions & 4 deletions doc/content/migrating.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ end

### Thread queues

In order to improve issues with complex inter-dependent asynchronous tasks such as `autocue` data computation,
scheduler queues have been improved.
In order to improve issues with complex inter-dependent asynchronous tasks, scheduler queues have been updated.

User-provided named queues can now be created and used to send asynchronous tasks, making it possible to control
concurrency of certain classes of tasks and also to remedy any potential dependency between asynchronous tasks.
Expand All @@ -101,8 +100,7 @@ asynchronous tasks sent to.
Likewise, `request.dynamic`, `playlist`, `single` etc. have also been updated to accept a `thread_queue` argument controlling
which asynchronous queue their request resolution tasks should be sent to.

See [the original Pull Request)[https://github.com/savonet/liquidsoap/pull/4151) and [the threads page](threads.html)
for more details.
See [the threads page](threads.html) for more details.

### Replaygain

Expand Down
11 changes: 1 addition & 10 deletions doc/content/threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ By default, there are two type of queues available in liquidsoap:
- `non_blocking` queues

By convention, tasks that are known to be executing very fast should be sent to the
`non_blockin` queues and all the other tasks should be sent to the `generic` queue.
`non_blocking` queues and all the other tasks should be sent to the `generic` queue.

You can decide which queue to send tasks to by using the `queue` parameter of the
`thread.run` functions. Some other operators who also use threads can have a similar
Expand All @@ -35,12 +35,3 @@ This is particularly useful for two applications:

- To control concurrent execution of specific tasks.
- To prevent deadlocks in case some tasks depends on other tasks.

Typically, `autocue` data resolution is executed inside a `request` resolution. To
control the concurrency with which this CPU-intensive task is executed, we place them
in a specific queue. The number of queues controls how many of these tasks can be executed
concurrently.

Also, this prevents a deadlock where all the request resolution fill up the available
`generic` queues, making it impossible for the autocue computation to finish, thus preventing
the request resolution from returning.
189 changes: 189 additions & 0 deletions src/core/builtins/builtins_request.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
*****************************************************************************)

let request = Modules.request
let should_stop = Atomic.make false

let () =
Lifecycle.before_core_shutdown ~name:"builtin source shutdown" (fun () ->
Atomic.set should_stop true)

let _ =
Lang.add_builtin ~base:request "all" ~category:`Liquidsoap
Expand Down Expand Up @@ -295,3 +300,187 @@ let _ =
| `Failed -> "failed"
in
Lang.string s)

exception Process_failed

class process ~name r =
object (self)
inherit
Request_dynamic.dynamic
~name ~priority:`Non_blocking
~retry_delay:(fun _ -> 0.1)
~available:(fun _ -> true)
(Lang.val_fun [] (fun _ -> Lang.null))
1 None

initializer
self#on_wake_up (fun () ->
match Request.get_decoder ~ctype:self#content_type r with
| Some _ -> self#set_queue [r]
| None | (exception _) -> raise Process_failed)
end

let process_request ~log ~name ~ratio ~timeout ~sleep_latency ~process r =
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let start_time = Time.time () in
match Request.resolve ~timeout r with
| `Failed | `Timeout -> ()
| `Resolved -> (
let timeout = Time.of_float timeout in
let timeout_time = Time.(start_time |+| timeout) in
try
let s = new process ~name r in
let s = (process (s :> Source.source) :> Source.source) in
let clock =
Clock.create ~id:name ~sync:`Passive
~on_error:(fun exn bt ->
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while processing source: %s"
(Printexc.to_string exn));
raise Process_failed)
()
in
Fun.protect
~finally:(fun () -> try Clock.stop clock with _ -> ())
(fun () ->
let started = ref false in
let stopped = ref false in
let _ =
new Output.dummy
~clock ~infallible:false ~register_telnet:false
~on_start:(fun () -> started := true)
~on_stop:(fun () -> stopped := true)
~autostart:true (Lang.source s)
in
Clock.start ~force:true clock;
log#info "Start streaming loop (ratio: %.02fx)" ratio;
let sleep_latency = Time.of_float sleep_latency in
let target_time () =
Time.(
start_time |+| sleep_latency
|+| of_float (Clock.time clock /. ratio))
in
while (not (Atomic.get should_stop)) && not !stopped do
if (not !started) && Time.(timeout_time |<=| Time.time ()) then (
log#important
"Timeout while waiting for the source to be ready!";
raise Process_failed)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done;
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Request processed. Total processing time: %.02fs, effective \
ratio: %.02fx"
processing_time effective_ratio)
with Process_failed | Clock.Has_stopped -> ())

let _ =
let log = Log.make ["request"; "dump"] in
let kind = Lang.univ_t () in
Lang.add_builtin ~base:request "dump" ~category:(`Source `Liquidsoap)
~descr:"Immediately encode the whole contents of a request into a file."
~flags:[`Experimental]
[
("", Lang.format_t kind, None, Some "Encoding format.");
("", Lang.string_t, None, Some "Name of the file.");
("", Request.Value.t, None, Some "Request to encode.");
( "ratio",
Lang.float_t,
Some (Lang.float 50.),
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let proto =
let p = Pipe_output.file_proto (Lang.univ_t ()) in
List.filter_map (fun (l, _, v, _) -> Option.map (fun v -> (l, v)) v) p
in
let proto = ("fallible", Lang.bool true) :: proto in
let format = Lang.assoc "" 1 p in
let file = Lang.assoc "" 2 p in
let r = Request.Value.of_value (Lang.assoc "" 3 p) in
let process s =
let p =
("id", Lang.string "request.drop")
:: ("", format) :: ("", file)
:: ("", Lang.source s)
:: (p @ proto)
in
Pipe_output.new_file_output p
in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
process_request ~log ~name:"request.dump" ~ratio ~timeout ~sleep_latency
~process r;
log#info "Request dumped.";
Lang.unit)

let _ =
let log = Log.make ["request"; "process"] in
Lang.add_builtin ~base:request "process" ~category:(`Source `Liquidsoap)
~descr:
"Given a request and an optional function to process this request, \
animate the source as fast as possible until the request is fully \
processed."
[
("", Request.Value.t, None, Some "Request to process");
( "process",
Lang.fun_t
[(false, "", Lang.source_t (Lang.univ_t ()))]
(Lang.source_t (Lang.univ_t ())),
Some (Lang.val_fun [("", "", None)] (fun p -> List.assoc "" p)),
Some "Callback to create the source to animate." );
( "ratio",
Lang.float_t,
Some (Lang.float 50.),
Some
"Time ratio. A value of `50` means process data at `50x` real rate, \
when possible." );
( "timeout",
Lang.float_t,
Some (Lang.float 1.),
Some
"Stop processing the source if it has not started after the given \
timeout." );
( "sleep_latency",
Lang.float_t,
Some (Lang.float 0.1),
Some
"How much time ahead, in seconds, should we should be before pausing \
the processing." );
]
Lang.unit_t
(fun p ->
let r = Request.Value.of_value (List.assoc "" p) in
let process = List.assoc "process" p in
let process s =
Lang.to_source (Lang.apply process [("", Lang.source s)])
in
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
process_request ~log ~name:"request.process" ~ratio ~timeout
~sleep_latency ~process r;
Lang.unit)
Loading
Loading