From ea109edb71eadca0d018d02b1ca45061922bce85 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 21 Nov 2024 11:54:32 -0600 Subject: [PATCH 1/8] Cleanup single fallible case. --- src/libs/request.liq | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/libs/request.liq b/src/libs/request.liq index c2dc9ade09..b3b928dde0 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -338,20 +338,36 @@ def request.single( static_request := null() end - def next() = + def make_source(id) = def next() = static_request() ?? getter.get(r) end - s = request.dynamic(prefetch=prefetch, thread_queue=thread_queue, next) - if infallible then s.set_queue([next()]) end + s = + request.dynamic(id=id, prefetch=prefetch, thread_queue=thread_queue, next) + s.set_queue([next()]) s end + # request.dynamic can have gaps in their song fetching + # infallible sources should always be available so we + # re-create it each time. Otherwise, we can just stick + # with the request.dynamic. s = - source.dynamic( - id=id, infallible=infallible, self_sync=false, track_sensitive=true, next - ) + if + infallible + then + source.dynamic( + id=id, + infallible=infallible, + self_sync=false, + track_sensitive=true, + fun () -> make_source("#{id}.actual") + ) + else + make_source(id) + end + s.on_wake_up(on_wake_up) s.on_shutdown(on_shutdown) (s : source_methods) From c71f105adbcab476189ad2669ae8f13eaa761f06 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 21 Nov 2024 12:26:44 -0600 Subject: [PATCH 2/8] More. --- src/core/builtins/builtins_request.ml | 17 ++++++++++++++++- src/libs/request.liq | 20 ++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/core/builtins/builtins_request.ml b/src/core/builtins/builtins_request.ml index 9a6daa8b5f..c5100a3fb1 100644 --- a/src/core/builtins/builtins_request.ml +++ b/src/core/builtins/builtins_request.ml @@ -110,6 +110,12 @@ let _ = Some "Limit in seconds to the duration of the request resolution. \ Defaults to `settings.request.timeout` when `null`." ); + ( "content_type", + Lang.nullable_t (Lang.source_t (Lang.univ_t ())), + Some Lang.null, + Some + "Check that the request can decode content suitable for the given \ + source." ); ("", Request.Value.t, None, None); ] Lang.bool_t @@ -121,8 +127,17 @@ let _ = let timeout = Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) in + let source = + Lang.to_valued_option Lang.to_source (List.assoc "content_type" p) + in let r = Request.Value.of_value (List.assoc "" p) in - Lang.bool (try Request.resolve ?timeout r = `Resolved with _ -> false)) + Lang.bool + (match (Request.resolve ?timeout r, source) with + | `Resolved, Some s -> ( + try Request.get_decoder ~ctype:s#content_type r <> None + with _ -> false) + | `Resolved, None -> true + | _ | (exception _) -> false)) let _ = Lang.add_builtin ~base:request "metadata" ~category:`Liquidsoap diff --git a/src/libs/request.liq b/src/libs/request.liq index b3b928dde0..543c941aae 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -280,7 +280,7 @@ def request.single( ) = id = string.id.default(default="single", id) - fallible = fallible ?? getter.is_constant(r) + fallible = fallible ?? not getter.is_constant(r) infallible = if @@ -302,9 +302,20 @@ def request.single( false end + if + not fallible and not infallible + then + log.severe( + label=id, + "Source was marked a infallible but its request is not a static file. The \ + source is considered fallible for backward compatibility but this will \ + fail in future versions!" + ) + end + static_request = ref(null()) - def on_wake_up() = + def on_wake_up(s) = if infallible then @@ -315,8 +326,9 @@ def request.single( label=id, "#{uri} is static, resolving once for all..." ) + if - not request.resolve(initial_request, timeout=timeout) + not request.resolve(initial_request, timeout=timeout, content_type=s) then request.destroy(initial_request) error.raise( @@ -368,7 +380,7 @@ def request.single( make_source(id) end - s.on_wake_up(on_wake_up) + s.on_wake_up(fun () -> on_wake_up(s)) s.on_shutdown(on_shutdown) (s : source_methods) end From 141e381742c0a47dde19b9091c718b6e207e9dfe Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 21 Nov 2024 12:31:41 -0600 Subject: [PATCH 3/8] Bring this back --- src/libs/request.liq | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libs/request.liq b/src/libs/request.liq index 543c941aae..d6481a4ec3 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -357,7 +357,7 @@ def request.single( s = request.dynamic(id=id, prefetch=prefetch, thread_queue=thread_queue, next) - s.set_queue([next()]) + if infallible then s.set_queue([next()]) end s end From f2aab03ef29411efcdf5034a69922f833b33457a Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 21 Nov 2024 13:45:39 -0600 Subject: [PATCH 4/8] Try this. --- src/libs/request.liq | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/libs/request.liq b/src/libs/request.liq index d6481a4ec3..77f58192d0 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -350,15 +350,12 @@ def request.single( static_request := null() end - def make_source(id) = - def next() = - static_request() ?? getter.get(r) - end + def next() = + static_request() ?? getter.get(r) + end - s = - request.dynamic(id=id, prefetch=prefetch, thread_queue=thread_queue, next) - if infallible then s.set_queue([next()]) end - s + def make_source(id) = + request.dynamic(id=id, prefetch=prefetch, thread_queue=thread_queue, next) end # request.dynamic can have gaps in their song fetching @@ -369,12 +366,16 @@ def request.single( if infallible then + s = make_source("#{id}.actual") source.dynamic( id=id, infallible=infallible, self_sync=false, track_sensitive=true, - fun () -> make_source("#{id}.actual") + { + s.set_queue([next()]) + s + } ) else make_source(id) From e945565a10fdf75c7b565f630228902f4f43e04d Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 21 Nov 2024 13:47:39 -0600 Subject: [PATCH 5/8] Raise. --- src/libs/request.liq | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/libs/request.liq b/src/libs/request.liq index 77f58192d0..f6bfb39d5d 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -305,11 +305,9 @@ def request.single( if not fallible and not infallible then - log.severe( - label=id, - "Source was marked a infallible but its request is not a static file. The \ - source is considered fallible for backward compatibility but this will \ - fail in future versions!" + error.raise( + error.invalid, + "Source was marked a infallible but its request is not a static file." ) end From 0298f97ad43eff0b65b0bf1ec91b3e354b04c109 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 21 Nov 2024 14:05:30 -0600 Subject: [PATCH 6/8] Revert "Raise." This reverts commit e945565a10fdf75c7b565f630228902f4f43e04d. --- src/libs/request.liq | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/libs/request.liq b/src/libs/request.liq index f6bfb39d5d..77f58192d0 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -305,9 +305,11 @@ def request.single( if not fallible and not infallible then - error.raise( - error.invalid, - "Source was marked a infallible but its request is not a static file." + log.severe( + label=id, + "Source was marked a infallible but its request is not a static file. The \ + source is considered fallible for backward compatibility but this will \ + fail in future versions!" ) end From 821ff4cabebdfb54d3aa62595342ec7a57ad25a4 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 21 Nov 2024 16:48:36 -0600 Subject: [PATCH 7/8] Add synchronous mode to request.dynamic. --- src/core/builtins/builtins_request.ml | 2 +- src/core/sources/request_dynamic.ml | 59 +++++++++++++++++++-------- src/libs/extra/native.liq | 1 + src/libs/request.liq | 33 ++++----------- 4 files changed, 52 insertions(+), 43 deletions(-) diff --git a/src/core/builtins/builtins_request.ml b/src/core/builtins/builtins_request.ml index c5100a3fb1..3829033ad3 100644 --- a/src/core/builtins/builtins_request.ml +++ b/src/core/builtins/builtins_request.ml @@ -325,8 +325,8 @@ class process ~name r = ~name ~priority:`Non_blocking ~retry_delay:(fun _ -> 0.1) ~available:(fun _ -> true) + ~prefetch:1 ~timeout:None ~synchronous:true (Lang.val_fun [] (fun _ -> Lang.null)) - 1 None initializer self#on_wake_up (fun () -> diff --git a/src/core/sources/request_dynamic.ml b/src/core/sources/request_dynamic.ml index b3aa692b2c..1b2147f029 100644 --- a/src/core/sources/request_dynamic.ml +++ b/src/core/sources/request_dynamic.ml @@ -39,6 +39,8 @@ type handler = { close : unit -> unit; } +type task = { notify : unit -> unit; stop : unit -> unit } + let log_failed_request (log : Log.t) request ans = log#important "Could not resolve request %s: %s." (Request.initial_uri request) @@ -47,12 +49,6 @@ let log_failed_request (log : Log.t) request ans = | `Timeout -> "timeout" | `Resolved -> "file could not be decoded with the correct content") -let extract_queued_params p = - let l = Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p) in - let l = Option.value ~default:conf_prefetch#get l in - let t = Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) in - (l, t) - let should_fail = Atomic.make false let () = @@ -60,7 +56,7 @@ let () = Atomic.set should_fail true) class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available - (f : Lang.value) prefetch timeout = + ~prefetch ~synchronous ~timeout f = let available () = (not (Atomic.get should_fail)) && available () in object (self) inherit source ~name () @@ -219,11 +215,24 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available initializer self#on_wake_up (fun () -> - let t = Duppy.Async.add Tutils.scheduler ~priority self#feed_queue in - Duppy.Async.wake_up t; + let task = + if synchronous then + { + notify = (fun () -> self#synchronous_feed_queue); + stop = (fun () -> ()); + } + else ( + let t = + Duppy.Async.add Tutils.scheduler ~priority self#feed_queue + in + { + notify = (fun () -> Duppy.Async.wake_up t); + stop = (fun () -> Duppy.Async.stop t); + }) + in assert ( Atomic.compare_and_set state `Sleeping - (`Started (Unix.gettimeofday (), t)))) + (`Started (Unix.gettimeofday (), task)))) method private clear_retrieved = let rec clear () = @@ -238,8 +247,8 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available initializer self#on_sleep (fun () -> match Atomic.exchange state `Sleeping with - | `Started (_, t) -> - Duppy.Async.stop t; + | `Started (_, { stop }) -> + stop (); (* No more feeding task, we can go to sleep. *) self#end_request; self#log#info "Cleaning up request queue..."; @@ -250,8 +259,7 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available opportunity to feed the queue, in case it is sleeping. *) method private notify_new_request = match Atomic.get state with - | `Started (d, t) when d <= Unix.gettimeofday () -> - Duppy.Async.wake_up t + | `Started (d, { notify }) when d <= Unix.gettimeofday () -> notify () | _ -> () (** The body of the feeding task *) @@ -266,6 +274,11 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available d) | _ -> -1. + method private synchronous_feed_queue = + match self#feed_queue () with + | 0. -> self#synchronous_feed_queue + | _ -> () + method fetch = try let r = @@ -350,6 +363,12 @@ let _ = Some "Whether some new requests are available (when set to false, it \ stops after current playing request)." ); + ( "synchronous", + Lang.bool_t, + Some (Lang.bool false), + Some + "If `true`, new requests are prepared as needed instead of using an \ + asynchronous queue." ); ( "prefetch", Lang.nullable_t Lang.int_t, Some Lang.null, @@ -435,5 +454,13 @@ let _ = | "non_blocking" -> `Non_blocking | n -> `Named n in - let l, t = extract_queued_params p in - new dynamic ~available ~priority ~retry_delay f l t) + let prefetch = + Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p) + in + let prefetch = Option.value ~default:conf_prefetch#get prefetch in + let synchronous = Lang.to_bool (List.assoc "synchronous" p) in + let timeout = + Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) + in + new dynamic + ~available ~priority ~retry_delay ~prefetch ~timeout ~synchronous f) diff --git a/src/libs/extra/native.liq b/src/libs/extra/native.liq index 4cfdfa5f86..a7ead829e5 100644 --- a/src/libs/extra/native.liq +++ b/src/libs/extra/native.liq @@ -89,6 +89,7 @@ def native.request.dynamic(%argsof(request.dynamic), f) = ignore(available) ignore(timeout) ignore(native) + ignore(synchronous) def f() = try diff --git a/src/libs/request.liq b/src/libs/request.liq index 77f58192d0..ce86f9244d 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -354,32 +354,13 @@ def request.single( static_request() ?? getter.get(r) end - def make_source(id) = - request.dynamic(id=id, prefetch=prefetch, thread_queue=thread_queue, next) - end - - # request.dynamic can have gaps in their song fetching - # infallible sources should always be available so we - # re-create it each time. Otherwise, we can just stick - # with the request.dynamic. - s = - if - infallible - then - s = make_source("#{id}.actual") - source.dynamic( - id=id, - infallible=infallible, - self_sync=false, - track_sensitive=true, - { - s.set_queue([next()]) - s - } - ) - else - make_source(id) - end + s = request.dynamic( + id=id, + prefetch=prefetch, + thread_queue=thread_queue, + synchronous=infallible, + next + ) s.on_wake_up(fun () -> on_wake_up(s)) s.on_shutdown(on_shutdown) From de8683f7f563ff4de89ed3594da5112e136cce7a Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 21 Nov 2024 17:01:12 -0600 Subject: [PATCH 8/8] Better. --- src/core/sources/request_dynamic.ml | 13 ++++++------ src/libs/request.liq | 31 ++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/core/sources/request_dynamic.ml b/src/core/sources/request_dynamic.ml index 1b2147f029..4f16c3d1c0 100644 --- a/src/core/sources/request_dynamic.ml +++ b/src/core/sources/request_dynamic.ml @@ -176,17 +176,16 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available method seek_source = (self :> Source.source) method abort_track = Atomic.set should_skip true + method private is_request_ready = + self#current <> None || try self#fetch_request with _ -> false + method can_generate_frame = - let is_ready = - (fun () -> - self#current <> None || try self#fetch_request with _ -> false) - () - in - match is_ready with + match self#is_request_ready with | true -> true | false -> if available () then self#notify_new_request; - false + (* Try one more time in case a new request was queued above. *) + self#is_request_ready val retrieved : queue_item Queue.t = Queue.create () method private queue_size = Queue.length retrieved diff --git a/src/libs/request.liq b/src/libs/request.liq index ce86f9244d..130dc8de86 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -354,13 +354,30 @@ def request.single( static_request() ?? getter.get(r) end - s = request.dynamic( - id=id, - prefetch=prefetch, - thread_queue=thread_queue, - synchronous=infallible, - next - ) + def mk_source(id) = + request.dynamic( + id=id, + prefetch=prefetch, + thread_queue=thread_queue, + synchronous=infallible, + next + ) + end + + # We want to mark infallible source as such. `source.dynamic` is a nice + # way to do it as it will raise a user-friendly error in case the underlying + # source does not respect the conditions for being infallible. + s = + if + infallible + then + s = mk_source("#{id}.actual") + source.dynamic( + id=id, infallible=infallible, self_sync=false, track_sensitive=true, {s} + ) + else + mk_source(id) + end s.on_wake_up(fun () -> on_wake_up(s)) s.on_shutdown(on_shutdown)