Skip to content

Commit

Permalink
Add generic thread queues, use them to implement autocue-specific que…
Browse files Browse the repository at this point in the history
…ues.
  • Loading branch information
toots committed Oct 1, 2024
1 parent d6afe41 commit cb7e98b
Show file tree
Hide file tree
Showing 24 changed files with 194 additions and 130 deletions.
27 changes: 19 additions & 8 deletions src/core/builtins/builtins_settings.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,32 @@ let settings_module =
| ty, true -> Lang.fun_t [] ty
| ty, false -> Lang.fun_t [] (Lang.nullable_t ty)
in
let rec get_type ?(sub = []) conf =
let rec get_type ?(sub = []) ~label conf =
let ty, has_default_value = get_conf_type conf in
Lang.method_t
(get_t ~has_default_value ty)
(set_t ty @ leaf_types conf @ sub)
(set_t ty @ leaf_types conf @ sub
@
if label = "scheduler" then
[
( "queues",
( [],
Lang.ref_t
(Lang.list_t (Lang.product_t Lang.string_t Lang.int_t)) ),
"Scheduler queue configuration." );
]
else [])
and leaf_types conf =
List.map
(fun label ->
let ty = get_type (conf#path [label]) in
let ty = get_type ~label (conf#path [label]) in
let label = Utils.normalize_parameter_string label in
( label,
([], ty),
Printf.sprintf "Entry for configuration key %s" label ))
conf#subs
in
let settings_t = get_type Configure.conf in
let settings_t = get_type ~label:"settings" Configure.conf in
let get_v fn conv_to conv_from conf =
let get =
Lang.val_fun [] (fun _ ->
Expand All @@ -122,7 +132,7 @@ let settings_module =
in
(get, Some set)
in
let rec get_value ?(sub = []) conf =
let rec get_value ?(sub = []) ~label conf =
let to_v fn conv_to conv_from =
try
ignore (fn conf);
Expand All @@ -144,7 +154,8 @@ let settings_module =
with Found v -> v
in
Lang.meth get_v
((if set_v <> None then [("set", Option.get set_v)] else [])
((if label = "scheduler" then [("queues", Tutils.queues_conf)] else [])
@ (if set_v <> None then [("set", Option.get set_v)] else [])
@ [
("description", Lang.string (String.trim conf#descr));
( "comments",
Expand All @@ -154,11 +165,11 @@ let settings_module =
and leaf_values conf =
List.map
(fun label ->
let v = get_value (conf#path [label]) in
let v = get_value ~label (conf#path [label]) in
(Utils.normalize_parameter_string label, v))
conf#subs
in
settings := get_value Configure.conf;
settings := get_value ~label:"settings" Configure.conf;
ignore
(Lang.add_builtin_value ~category:`Settings "settings"
~descr:"All settings." ~flags:[`Hidden] !settings settings_t))
Expand Down
6 changes: 1 addition & 5 deletions src/core/builtins/builtins_socket.ml
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,7 @@ module Socket_value = struct
[]
in
Duppy.Task.add Tutils.scheduler
{
Duppy.Task.priority = `Maybe_blocking;
events;
handler = fn;
};
{ Duppy.Task.priority = `Generic; events; handler = fn };
Lang.unit) );
]
in
Expand Down
22 changes: 11 additions & 11 deletions src/core/builtins/builtins_thread.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ let _ =
let _ =
Lang.add_builtin ~base:thread_run "recurrent" ~category:`Programming
[
( "fast",
Lang.bool_t,
Some (Lang.bool true),
( "queue",
Lang.string_t,
Some (Lang.string "generic"),
Some
"Whether the thread is supposed to return quickly or not. Typically, \
blocking tasks (e.g. fetching data over the internet) should not be \
considered to be fast. When set to `false` its priority will be \
lowered below that of request resolutions and fast timeouts. This \
is only effective if you set a dedicated queue for fast tasks, see \
the \"scheduler\" settings for more details." );
"Queue to use for the task. Should be one of: `\"generic\"` or \
`\"non_blocking\"`. Non blocking should be reserved for tasks that \
are known to complete quickly. Generic queues process all tasks. \
You can also use declared via `settings.scheduler.queues`." );
( "delay",
Lang.float_t,
Some (Lang.float 0.),
Expand All @@ -74,8 +72,10 @@ let _ =
let delay = Lang.to_float (List.assoc "delay" p) in
let f = List.assoc "" p in
let priority =
if Lang.to_bool (List.assoc "fast" p) then `Maybe_blocking
else `Blocking
match Lang.to_string (List.assoc "queue" p) with
| "generic" -> `Generic
| "non_blocking" -> `Non_blocking
| n -> `Named n
in
let on_error = Lang.to_option (List.assoc "on_error" p) in
let on_error =
Expand Down
2 changes: 1 addition & 1 deletion src/core/decoder/external_decoder.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ let external_input process input =
in
let log s = log#important "%s" s in
(* reading from input is blocking.. *)
let priority = `Blocking in
let priority = `Generic in
let process =
Process_handler.run ~priority ~on_stdin ~on_stderr ~log process
in
Expand Down
2 changes: 1 addition & 1 deletion src/core/file_watcher.inotify.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ let rec watchdog () =
events;
[watchdog ()])
in
{ Duppy.Task.priority = `Maybe_blocking; events = [`Read fd]; handler }
{ Duppy.Task.priority = `Generic; events = [`Read fd]; handler }

let watch : watch =
fun ~pos e file f ->
Expand Down
8 changes: 2 additions & 6 deletions src/core/file_watcher.mtime.ml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ let rec handler _ =
(Printf.sprintf "Error while executing file watcher callback: %s"
(Printexc.to_string exn)))
!watched;
[{ Duppy.Task.priority = `Maybe_blocking; events = [`Delay 1.]; handler }])
[{ Duppy.Task.priority = `Generic; events = [`Delay 1.]; handler }])
()

let watch : watch =
Expand All @@ -73,11 +73,7 @@ let watch : watch =
if not !launched then begin
launched := true;
Duppy.Task.add Tutils.scheduler
{
Duppy.Task.priority = `Maybe_blocking;
events = [`Delay 1.];
handler;
}
{ Duppy.Task.priority = `Generic; events = [`Delay 1.]; handler }
end;
let mtime = try file_mtime file with _ -> 0. in
watched := { file; mtime; callback } :: !watched;
Expand Down
10 changes: 5 additions & 5 deletions src/core/harbor/harbor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
simple_reply "No / mountpoint\r\n\r\n"
in
(* Authentication can be blocking. *)
Duppy.Monad.Io.exec ~priority:`Maybe_blocking h
Duppy.Monad.Io.exec ~priority:`Generic h
(let user, auth_f = s#login in
let user = if requested_user = "" then user else requested_user in
if auth_f ~socket:h.Duppy.Monad.Io.socket user password then
Expand Down Expand Up @@ -488,7 +488,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
Hashtbl.fold (fun lbl k query -> (lbl, k) :: query) query [])
args
in
Duppy.Monad.Io.exec ~priority:`Maybe_blocking h
Duppy.Monad.Io.exec ~priority:`Generic h
(http_auth_check ?query ~login h.Duppy.Monad.Io.socket headers)

(* We do not implement anything with this handler for now. *)
Expand Down Expand Up @@ -631,7 +631,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
(Bytes.of_string (Websocket.upgrade headers))
in
let* stype, huri, user, password =
Duppy.Monad.Io.exec ~priority:`Blocking h
Duppy.Monad.Io.exec ~priority:`Generic h
(read_hello h.Duppy.Monad.Io.socket)
in
log#info "Mime type: %s" stype;
Expand Down Expand Up @@ -899,7 +899,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
fun timeout -> fst (Http.read_chunked ~timeout socket)
| _ -> fun _ -> ""
in
Duppy.Monad.Io.exec ~priority:`Maybe_blocking h
Duppy.Monad.Io.exec ~priority:`Generic h
(handler ~protocol ~meth ~headers ~data
~socket:h.Duppy.Monad.Io.socket ~query base_uri)
| e ->
Expand Down Expand Up @@ -969,7 +969,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct
~priority:
(* ICY = true means that authentication has already
happened *)
`Maybe_blocking h
`Generic h
(let valid_user, auth_f = s#login in
if
not
Expand Down
2 changes: 1 addition & 1 deletion src/core/io/ffmpeg_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
| Some t -> Duppy.Async.wake_up t
| None ->
let t =
Duppy.Async.add ~priority:`Blocking Tutils.scheduler
Duppy.Async.add ~priority:`Generic Tutils.scheduler
self#connect_task
in
Atomic.set connect_task (Some t);
Expand Down
4 changes: 2 additions & 2 deletions src/core/io/srt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ module Poll = struct
(Printexc.to_string exn));
-1.

let task = Duppy.Async.add ~priority:`Blocking Tutils.scheduler process
let task = Duppy.Async.add ~priority:`Generic Tutils.scheduler process

let add_socket ~mode socket fn =
Srt.setsockflag socket Srt.sndsyn false;
Expand Down Expand Up @@ -529,7 +529,7 @@ class virtual caller ~enforced_encryption ~pbkeylen ~passphrase ~streamid
| Some t -> Duppy.Async.wake_up t
| None ->
let t =
Duppy.Async.add ~priority:`Blocking Tutils.scheduler
Duppy.Async.add ~priority:`Generic Tutils.scheduler
self#connect_fn
in
connect_task <- Some t;
Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/pipe.ml
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class pipe ~replay_delay ~data_len ~process ~bufferize ~max ~restart
Some
(Process_handler.run ~on_stop:self#on_stop ~on_start:self#on_start
~on_stdout:self#on_stdout ~on_stdin:self#on_stdin
~priority:`Blocking ~on_stderr:self#on_stderr ~log process))
~priority:`Generic ~on_stderr:self#on_stderr ~log process))

method! abort_track = source#abort_track

Expand Down
8 changes: 4 additions & 4 deletions src/core/outputs/harbor_output.ml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ let add_meta c data =

let rec client_task c =
let* data =
Duppy.Monad.Io.exec ~priority:`Maybe_blocking c.handler
Duppy.Monad.Io.exec ~priority:`Generic c.handler
(Mutex_utils.mutexify c.mutex
(fun () ->
let buflen = Strings.Mutable.length c.buffer in
Expand All @@ -283,7 +283,7 @@ let rec client_task c =
c.handler (Strings.to_bytes data)
in
let* state =
Duppy.Monad.Io.exec ~priority:`Maybe_blocking c.handler
Duppy.Monad.Io.exec ~priority:`Generic c.handler
(let ret = Mutex_utils.mutexify c.mutex (fun () -> c.state) () in
Duppy.Monad.return ret)
in
Expand Down Expand Up @@ -521,7 +521,7 @@ class output p =
|| auth_function <> None
then (
let default_user = Option.value default_user ~default:"" in
Duppy.Monad.Io.exec ~priority:`Maybe_blocking handler
Duppy.Monad.Io.exec ~priority:`Generic handler
(Harbor.http_auth_check ~query ~login:(default_user, login) s
headers))
else Duppy.Monad.return ())
Expand All @@ -532,7 +532,7 @@ class output p =
Harbor.reply s
| _ -> assert false)
in
Duppy.Monad.Io.exec ~priority:`Maybe_blocking handler
Duppy.Monad.Io.exec ~priority:`Generic handler
(Harbor.relayed reply (fun () ->
self#log#info "Client %s connected" ip;
Mutex_utils.mutexify clients_m
Expand Down
18 changes: 13 additions & 5 deletions src/core/sources/request_dynamic.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ module Queue = Liquidsoap_lang.Queues.Queue
let conf_prefetch =
Dtools.Conf.int ~p:(Request.conf#plug "prefetch") ~d:1 "Default prefetch"

(* Scheduler priority for request resolutions. *)
let priority = `Maybe_blocking

type queue_item = {
request : Request.t;
(* in seconds *)
Expand Down Expand Up @@ -62,7 +59,8 @@ let () =
Lifecycle.before_core_shutdown ~name:"request.dynamic shutdown" (fun () ->
Atomic.set should_fail true)

class dynamic ~retry_delay ~available (f : Lang.value) prefetch timeout =
class dynamic ~priority ~retry_delay ~available (f : Lang.value) prefetch
timeout =
let available () = (not (Atomic.get should_fail)) && available () in
object (self)
inherit source ~name:"request.dynamic" ()
Expand Down Expand Up @@ -340,6 +338,10 @@ let _ =
~descr:"Play request dynamically created by a given function."
[
("", Lang.fun_t [] (Lang.nullable_t Request.Value.t), None, None);
( "thread_queue",
Lang.string_t,
Some (Lang.string "generic"),
Some "Queue used to resolve requests." );
( "retry_delay",
Lang.getter_t Lang.float_t,
Some (Lang.float 0.1),
Expand Down Expand Up @@ -429,5 +431,11 @@ let _ =
let f = List.assoc "" p in
let available = Lang.to_bool_getter (List.assoc "available" p) in
let retry_delay = Lang.to_float_getter (List.assoc "retry_delay" p) in
let priority =
match Lang.to_string (List.assoc "thread_queue" p) with
| "generic" -> `Generic
| "non_blocking" -> `Non_blocking
| n -> `Named n
in
let l, t = extract_queued_params p in
new dynamic ~available ~retry_delay f l t)
new dynamic ~available ~priority ~retry_delay f l t)
2 changes: 1 addition & 1 deletion src/core/tools/external_input.ml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class virtual base ~name ~restart ~restart_on_error ~on_data ?read_header
let log s = self#log#important "%s" s in
process <-
Some
(Process_handler.run ~priority:`Blocking ~on_stop ~on_stdout
(Process_handler.run ~priority:`Generic ~on_stop ~on_stdout
~on_stderr ~log (command ())));

self#on_sleep (fun () ->
Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/liqfm.ml
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ let init host =
reason (Printexc.to_string e);
-1.
in
let task = Duppy.Async.add ~priority:`Blocking Tutils.scheduler do_submit in
let task = Duppy.Async.add ~priority:`Generic Tutils.scheduler do_submit in
{ task; submit_m; submissions }

let submit (user, password) task length source stype songs =
Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ let handle_client socket ip =
| e -> Duppy.Monad.raise e
in
let* ans =
Duppy.Monad.Io.exec ~priority:`Maybe_blocking h (run (fun () -> exec req))
Duppy.Monad.Io.exec ~priority:`Generic h (run (fun () -> exec req))
in
let* () =
let* () =
Expand Down
Loading

0 comments on commit cb7e98b

Please sign in to comment.