Skip to content

Commit

Permalink
Cleanup single fallible case. (#4218)
Browse files Browse the repository at this point in the history
  • Loading branch information
toots authored Nov 22, 2024
1 parent ef2a2db commit f823959
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 38 deletions.
19 changes: 17 additions & 2 deletions src/core/builtins/builtins_request.ml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ let _ =
Some
"Limit in seconds to the duration of the request resolution. \
Defaults to `settings.request.timeout` when `null`." );
( "content_type",
Lang.nullable_t (Lang.source_t (Lang.univ_t ())),
Some Lang.null,
Some
"Check that the request can decode content suitable for the given \
source." );
("", Request.Value.t, None, None);
]
Lang.bool_t
Expand All @@ -121,8 +127,17 @@ let _ =
let timeout =
Lang.to_valued_option Lang.to_float (List.assoc "timeout" p)
in
let source =
Lang.to_valued_option Lang.to_source (List.assoc "content_type" p)
in
let r = Request.Value.of_value (List.assoc "" p) in
Lang.bool (try Request.resolve ?timeout r = `Resolved with _ -> false))
Lang.bool
(match (Request.resolve ?timeout r, source) with
| `Resolved, Some s -> (
try Request.get_decoder ~ctype:s#content_type r <> None
with _ -> false)
| `Resolved, None -> true
| _ | (exception _) -> false))

let _ =
Lang.add_builtin ~base:request "metadata" ~category:`Liquidsoap
Expand Down Expand Up @@ -310,8 +325,8 @@ class process ~name r =
~name ~priority:`Non_blocking
~retry_delay:(fun _ -> 0.1)
~available:(fun _ -> true)
~prefetch:1 ~timeout:None ~synchronous:true
(Lang.val_fun [] (fun _ -> Lang.null))
1 None

initializer
self#on_wake_up (fun () ->
Expand Down
72 changes: 49 additions & 23 deletions src/core/sources/request_dynamic.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type handler = {
close : unit -> unit;
}

type task = { notify : unit -> unit; stop : unit -> unit }

let log_failed_request (log : Log.t) request ans =
log#important "Could not resolve request %s: %s."
(Request.initial_uri request)
Expand All @@ -47,20 +49,14 @@ let log_failed_request (log : Log.t) request ans =
| `Timeout -> "timeout"
| `Resolved -> "file could not be decoded with the correct content")

let extract_queued_params p =
let l = Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p) in
let l = Option.value ~default:conf_prefetch#get l in
let t = Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) in
(l, t)

let should_fail = Atomic.make false

let () =
Lifecycle.before_core_shutdown ~name:"request.dynamic shutdown" (fun () ->
Atomic.set should_fail true)

class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
(f : Lang.value) prefetch timeout =
~prefetch ~synchronous ~timeout f =
let available () = (not (Atomic.get should_fail)) && available () in
object (self)
inherit source ~name ()
Expand Down Expand Up @@ -180,17 +176,16 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
method seek_source = (self :> Source.source)
method abort_track = Atomic.set should_skip true

method private is_request_ready =
self#current <> None || try self#fetch_request with _ -> false

method can_generate_frame =
let is_ready =
(fun () ->
self#current <> None || try self#fetch_request with _ -> false)
()
in
match is_ready with
match self#is_request_ready with
| true -> true
| false ->
if available () then self#notify_new_request;
false
(* Try one more time in case a new request was queued above. *)
self#is_request_ready

val retrieved : queue_item Queue.t = Queue.create ()
method private queue_size = Queue.length retrieved
Expand Down Expand Up @@ -219,11 +214,24 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available

initializer
self#on_wake_up (fun () ->
let t = Duppy.Async.add Tutils.scheduler ~priority self#feed_queue in
Duppy.Async.wake_up t;
let task =
if synchronous then
{
notify = (fun () -> self#synchronous_feed_queue);
stop = (fun () -> ());
}
else (
let t =
Duppy.Async.add Tutils.scheduler ~priority self#feed_queue
in
{
notify = (fun () -> Duppy.Async.wake_up t);
stop = (fun () -> Duppy.Async.stop t);
})
in
assert (
Atomic.compare_and_set state `Sleeping
(`Started (Unix.gettimeofday (), t))))
(`Started (Unix.gettimeofday (), task))))

method private clear_retrieved =
let rec clear () =
Expand All @@ -238,8 +246,8 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
initializer
self#on_sleep (fun () ->
match Atomic.exchange state `Sleeping with
| `Started (_, t) ->
Duppy.Async.stop t;
| `Started (_, { stop }) ->
stop ();
(* No more feeding task, we can go to sleep. *)
self#end_request;
self#log#info "Cleaning up request queue...";
Expand All @@ -250,8 +258,7 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
opportunity to feed the queue, in case it is sleeping. *)
method private notify_new_request =
match Atomic.get state with
| `Started (d, t) when d <= Unix.gettimeofday () ->
Duppy.Async.wake_up t
| `Started (d, { notify }) when d <= Unix.gettimeofday () -> notify ()
| _ -> ()

(** The body of the feeding task *)
Expand All @@ -266,6 +273,11 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
d)
| _ -> -1.

method private synchronous_feed_queue =
match self#feed_queue () with
| 0. -> self#synchronous_feed_queue
| _ -> ()

method fetch =
try
let r =
Expand Down Expand Up @@ -350,6 +362,12 @@ let _ =
Some
"Whether some new requests are available (when set to false, it \
stops after current playing request)." );
( "synchronous",
Lang.bool_t,
Some (Lang.bool false),
Some
"If `true`, new requests are prepared as needed instead of using an \
asynchronous queue." );
( "prefetch",
Lang.nullable_t Lang.int_t,
Some Lang.null,
Expand Down Expand Up @@ -435,5 +453,13 @@ let _ =
| "non_blocking" -> `Non_blocking
| n -> `Named n
in
let l, t = extract_queued_params p in
new dynamic ~available ~priority ~retry_delay f l t)
let prefetch =
Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p)
in
let prefetch = Option.value ~default:conf_prefetch#get prefetch in
let synchronous = Lang.to_bool (List.assoc "synchronous" p) in
let timeout =
Lang.to_valued_option Lang.to_float (List.assoc "timeout" p)
in
new dynamic
~available ~priority ~retry_delay ~prefetch ~timeout ~synchronous f)
1 change: 1 addition & 0 deletions src/libs/extra/native.liq
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def native.request.dynamic(%argsof(request.dynamic), f) =
ignore(available)
ignore(timeout)
ignore(native)
ignore(synchronous)

def f() =
try
Expand Down
53 changes: 40 additions & 13 deletions src/libs/request.liq
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def request.single(
) =
id = string.id.default(default="single", id)

fallible = fallible ?? getter.is_constant(r)
fallible = fallible ?? not getter.is_constant(r)

infallible =
if
Expand All @@ -302,9 +302,20 @@ def request.single(
false
end

if
not fallible and not infallible
then
log.severe(
label=id,
"Source was marked a infallible but its request is not a static file. The \
source is considered fallible for backward compatibility but this will \
fail in future versions!"
)
end

static_request = ref(null())

def on_wake_up() =
def on_wake_up(s) =
if
infallible
then
Expand All @@ -315,8 +326,9 @@ def request.single(
label=id,
"#{uri} is static, resolving once for all..."
)

if
not request.resolve(initial_request, timeout=timeout)
not request.resolve(initial_request, timeout=timeout, content_type=s)
then
request.destroy(initial_request)
error.raise(
Expand All @@ -339,20 +351,35 @@ def request.single(
end

def next() =
def next() =
static_request() ?? getter.get(r)
end
static_request() ?? getter.get(r)
end

s = request.dynamic(prefetch=prefetch, thread_queue=thread_queue, next)
if infallible then s.set_queue([next()]) end
s
def mk_source(id) =
request.dynamic(
id=id,
prefetch=prefetch,
thread_queue=thread_queue,
synchronous=infallible,
next
)
end

# We want to mark infallible source as such. `source.dynamic` is a nice
# way to do it as it will raise a user-friendly error in case the underlying
# source does not respect the conditions for being infallible.
s =
source.dynamic(
id=id, infallible=infallible, self_sync=false, track_sensitive=true, next
)
s.on_wake_up(on_wake_up)
if
infallible
then
s = mk_source("#{id}.actual")
source.dynamic(
id=id, infallible=infallible, self_sync=false, track_sensitive=true, {s}
)
else
mk_source(id)
end

s.on_wake_up(fun () -> on_wake_up(s))
s.on_shutdown(on_shutdown)
(s : source_methods)
end
Expand Down

0 comments on commit f823959

Please sign in to comment.