diff --git a/src/core/builtins/builtins_request.ml b/src/core/builtins/builtins_request.ml index 9a6daa8b5f..3829033ad3 100644 --- a/src/core/builtins/builtins_request.ml +++ b/src/core/builtins/builtins_request.ml @@ -110,6 +110,12 @@ let _ = Some "Limit in seconds to the duration of the request resolution. \ Defaults to `settings.request.timeout` when `null`." ); + ( "content_type", + Lang.nullable_t (Lang.source_t (Lang.univ_t ())), + Some Lang.null, + Some + "Check that the request can decode content suitable for the given \ + source." ); ("", Request.Value.t, None, None); ] Lang.bool_t @@ -121,8 +127,17 @@ let _ = let timeout = Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) in + let source = + Lang.to_valued_option Lang.to_source (List.assoc "content_type" p) + in let r = Request.Value.of_value (List.assoc "" p) in - Lang.bool (try Request.resolve ?timeout r = `Resolved with _ -> false)) + Lang.bool + (match (Request.resolve ?timeout r, source) with + | `Resolved, Some s -> ( + try Request.get_decoder ~ctype:s#content_type r <> None + with _ -> false) + | `Resolved, None -> true + | _ | (exception _) -> false)) let _ = Lang.add_builtin ~base:request "metadata" ~category:`Liquidsoap @@ -310,8 +325,8 @@ class process ~name r = ~name ~priority:`Non_blocking ~retry_delay:(fun _ -> 0.1) ~available:(fun _ -> true) + ~prefetch:1 ~timeout:None ~synchronous:true (Lang.val_fun [] (fun _ -> Lang.null)) - 1 None initializer self#on_wake_up (fun () -> diff --git a/src/core/sources/request_dynamic.ml b/src/core/sources/request_dynamic.ml index b3aa692b2c..4f16c3d1c0 100644 --- a/src/core/sources/request_dynamic.ml +++ b/src/core/sources/request_dynamic.ml @@ -39,6 +39,8 @@ type handler = { close : unit -> unit; } +type task = { notify : unit -> unit; stop : unit -> unit } + let log_failed_request (log : Log.t) request ans = log#important "Could not resolve request %s: %s." (Request.initial_uri request) @@ -47,12 +49,6 @@ let log_failed_request (log : Log.t) request ans = | `Timeout -> "timeout" | `Resolved -> "file could not be decoded with the correct content") -let extract_queued_params p = - let l = Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p) in - let l = Option.value ~default:conf_prefetch#get l in - let t = Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) in - (l, t) - let should_fail = Atomic.make false let () = @@ -60,7 +56,7 @@ let () = Atomic.set should_fail true) class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available - (f : Lang.value) prefetch timeout = + ~prefetch ~synchronous ~timeout f = let available () = (not (Atomic.get should_fail)) && available () in object (self) inherit source ~name () @@ -180,17 +176,16 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available method seek_source = (self :> Source.source) method abort_track = Atomic.set should_skip true + method private is_request_ready = + self#current <> None || try self#fetch_request with _ -> false + method can_generate_frame = - let is_ready = - (fun () -> - self#current <> None || try self#fetch_request with _ -> false) - () - in - match is_ready with + match self#is_request_ready with | true -> true | false -> if available () then self#notify_new_request; - false + (* Try one more time in case a new request was queued above. *) + self#is_request_ready val retrieved : queue_item Queue.t = Queue.create () method private queue_size = Queue.length retrieved @@ -219,11 +214,24 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available initializer self#on_wake_up (fun () -> - let t = Duppy.Async.add Tutils.scheduler ~priority self#feed_queue in - Duppy.Async.wake_up t; + let task = + if synchronous then + { + notify = (fun () -> self#synchronous_feed_queue); + stop = (fun () -> ()); + } + else ( + let t = + Duppy.Async.add Tutils.scheduler ~priority self#feed_queue + in + { + notify = (fun () -> Duppy.Async.wake_up t); + stop = (fun () -> Duppy.Async.stop t); + }) + in assert ( Atomic.compare_and_set state `Sleeping - (`Started (Unix.gettimeofday (), t)))) + (`Started (Unix.gettimeofday (), task)))) method private clear_retrieved = let rec clear () = @@ -238,8 +246,8 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available initializer self#on_sleep (fun () -> match Atomic.exchange state `Sleeping with - | `Started (_, t) -> - Duppy.Async.stop t; + | `Started (_, { stop }) -> + stop (); (* No more feeding task, we can go to sleep. *) self#end_request; self#log#info "Cleaning up request queue..."; @@ -250,8 +258,7 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available opportunity to feed the queue, in case it is sleeping. *) method private notify_new_request = match Atomic.get state with - | `Started (d, t) when d <= Unix.gettimeofday () -> - Duppy.Async.wake_up t + | `Started (d, { notify }) when d <= Unix.gettimeofday () -> notify () | _ -> () (** The body of the feeding task *) @@ -266,6 +273,11 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available d) | _ -> -1. + method private synchronous_feed_queue = + match self#feed_queue () with + | 0. -> self#synchronous_feed_queue + | _ -> () + method fetch = try let r = @@ -350,6 +362,12 @@ let _ = Some "Whether some new requests are available (when set to false, it \ stops after current playing request)." ); + ( "synchronous", + Lang.bool_t, + Some (Lang.bool false), + Some + "If `true`, new requests are prepared as needed instead of using an \ + asynchronous queue." ); ( "prefetch", Lang.nullable_t Lang.int_t, Some Lang.null, @@ -435,5 +453,13 @@ let _ = | "non_blocking" -> `Non_blocking | n -> `Named n in - let l, t = extract_queued_params p in - new dynamic ~available ~priority ~retry_delay f l t) + let prefetch = + Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p) + in + let prefetch = Option.value ~default:conf_prefetch#get prefetch in + let synchronous = Lang.to_bool (List.assoc "synchronous" p) in + let timeout = + Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) + in + new dynamic + ~available ~priority ~retry_delay ~prefetch ~timeout ~synchronous f) diff --git a/src/libs/extra/native.liq b/src/libs/extra/native.liq index 4cfdfa5f86..a7ead829e5 100644 --- a/src/libs/extra/native.liq +++ b/src/libs/extra/native.liq @@ -89,6 +89,7 @@ def native.request.dynamic(%argsof(request.dynamic), f) = ignore(available) ignore(timeout) ignore(native) + ignore(synchronous) def f() = try diff --git a/src/libs/request.liq b/src/libs/request.liq index c2dc9ade09..130dc8de86 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -280,7 +280,7 @@ def request.single( ) = id = string.id.default(default="single", id) - fallible = fallible ?? getter.is_constant(r) + fallible = fallible ?? not getter.is_constant(r) infallible = if @@ -302,9 +302,20 @@ def request.single( false end + if + not fallible and not infallible + then + log.severe( + label=id, + "Source was marked a infallible but its request is not a static file. The \ + source is considered fallible for backward compatibility but this will \ + fail in future versions!" + ) + end + static_request = ref(null()) - def on_wake_up() = + def on_wake_up(s) = if infallible then @@ -315,8 +326,9 @@ def request.single( label=id, "#{uri} is static, resolving once for all..." ) + if - not request.resolve(initial_request, timeout=timeout) + not request.resolve(initial_request, timeout=timeout, content_type=s) then request.destroy(initial_request) error.raise( @@ -339,20 +351,35 @@ def request.single( end def next() = - def next() = - static_request() ?? getter.get(r) - end + static_request() ?? getter.get(r) + end - s = request.dynamic(prefetch=prefetch, thread_queue=thread_queue, next) - if infallible then s.set_queue([next()]) end - s + def mk_source(id) = + request.dynamic( + id=id, + prefetch=prefetch, + thread_queue=thread_queue, + synchronous=infallible, + next + ) end + # We want to mark infallible source as such. `source.dynamic` is a nice + # way to do it as it will raise a user-friendly error in case the underlying + # source does not respect the conditions for being infallible. s = - source.dynamic( - id=id, infallible=infallible, self_sync=false, track_sensitive=true, next - ) - s.on_wake_up(on_wake_up) + if + infallible + then + s = mk_source("#{id}.actual") + source.dynamic( + id=id, infallible=infallible, self_sync=false, track_sensitive=true, {s} + ) + else + mk_source(id) + end + + s.on_wake_up(fun () -> on_wake_up(s)) s.on_shutdown(on_shutdown) (s : source_methods) end