From 1dc6faca0e2544f89bfdb216558aab0c1be783f8 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Fri, 4 Oct 2024 08:00:33 -0500 Subject: [PATCH] Cleanup source.dynamic, fix `append` (#4156) --- CHANGES.md | 3 + src/core/builtins/builtins_source.ml | 9 ++ src/core/operators/dyn_op.ml | 131 ++++++++++++++------------- src/libs/extra/source.liq | 4 +- src/libs/request.liq | 54 ++++++++--- src/libs/source.liq | 49 +++++----- src/libs/video.liq | 79 ++++++++++------ 7 files changed, 197 insertions(+), 132 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f47b5730a1..3fe09d0f5c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,6 +62,9 @@ New: Changed: +- Reimplemented `request.once`, `single` and more using `source.dynamic`. Removed experiment + flag on `source.dynamic`. The operator is considered stable enough to define advanced sources + but the user should be careful when using it. - Mute SDL startup messages (#2913). - `int` can optionally raises an error when passing `nan` or `infinity`, `int(infinity)` now returns `max_int` and `int(-infinity)` returns `min_int`. (#3407) diff --git a/src/core/builtins/builtins_source.ml b/src/core/builtins/builtins_source.ml index 1497693389..4bd5ad1f18 100644 --- a/src/core/builtins/builtins_source.ml +++ b/src/core/builtins/builtins_source.ml @@ -41,6 +41,15 @@ let _ = s#set_name n; Lang.unit) +let _ = + Lang.add_builtin ~base:source "last_metadata" ~category:(`Source `Liquidsoap) + ~descr:"Return the last metadata from the source." + [("", Lang.source_t (Lang.univ_t ()), None, None)] + (Lang.nullable_t Lang.metadata_t) + (fun p -> + let s = Lang.to_source (List.assoc "" p) in + match s#last_metadata with None -> Lang.null | Some m -> Lang.metadata m) + let _ = Lang.add_builtin ~base:source "skip" ~category:(`Source `Liquidsoap) ~descr:"Skip to the next track." diff --git a/src/core/operators/dyn_op.ml b/src/core/operators/dyn_op.ml index 426f65f874..01c60d48ac 100644 --- a/src/core/operators/dyn_op.ml +++ b/src/core/operators/dyn_op.ml @@ -20,84 +20,85 @@ *****************************************************************************) -class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f = +class dyn ~init ~track_sensitive ~infallible ~self_sync ~merge next_fn = object (self) inherit Source.source ~name:"source.dynamic" () - - inherit - Source.generate_from_multiple_sources - ~merge:(fun () -> false) - ~track_sensitive () - + inherit Source.generate_from_multiple_sources ~merge ~track_sensitive () method fallible = not infallible val mutable activation = [] - val source : Source.source option Atomic.t = Atomic.make init + val current_source : Source.source option Atomic.t = Atomic.make init + method current_source = Atomic.get current_source val mutable last_select = Unix.gettimeofday () - val proposed = Atomic.make None - method propose s = Atomic.set proposed (Some s) - method private prepare s = + method private no_source = + if infallible then + Lang.raise_error ~pos:[] + ~message: + (Printf.sprintf + "Infallible source.dynamic %s was not able to prepare a source \ + in time! Make sure to either define infallible sources in the \ + source's dynamic function or mark the source as fallible.." + self#id) + "failure"; + None + + method prepare s = Typing.(s#frame_type <: self#frame_type); Clock.unify ~pos:self#pos s#clock self#clock; - s#wake_up; - (match Atomic.exchange source (Some s) with - | Some s -> s#sleep - | None -> ()); - if s#is_ready then Some s else None + s#wake_up + + method private exchange s = + self#log#info "Switching to source %s" s#id; + self#prepare s; + Atomic.set current_source (Some s); + if s#is_ready then Some s else self#no_source method private get_next reselect = self#mutexify (fun () -> - match Atomic.exchange proposed None with - | Some s -> self#prepare s + last_select <- Unix.gettimeofday (); + let s = + Lang.apply next_fn [] |> Lang.to_option |> Option.map Lang.to_source + in + match s with | None -> ( - last_select <- Unix.gettimeofday (); - let s = - Lang.apply f [] |> Lang.to_option |> Option.map Lang.to_source - in - match s with - | None -> ( - match Atomic.get source with - | Some s - when self#can_reselect - ~reselect: - (match reselect with - | `Force -> `Ok - | v -> v) - s -> - Some s - | _ -> None) - | Some s -> self#prepare s)) + match self#current_source with + | Some s + when self#can_reselect + ~reselect: + (match reselect with `Force -> `Ok | v -> v) + s -> + Some s + | _ -> self#no_source) + | Some s -> self#exchange s) () method private get_source ~reselect () = - match (Atomic.get source, reselect) with - | None, _ | _, `Force -> self#get_next reselect + match (self#current_source, reselect) with + | None, _ | _, `Force | Some _, `After_position _ -> + self#get_next reselect | Some s, _ when self#can_reselect ~reselect s -> Some s - | Some _, _ when Unix.gettimeofday () -. last_select < resurection_time - -> - None | _ -> self#get_next reselect initializer self#on_wake_up (fun () -> Lang.iter_sources (fun s -> Typing.(s#frame_type <: self#frame_type)) - f; + next_fn; ignore (self#get_source ~reselect:`Force ())); self#on_sleep (fun () -> - match Atomic.exchange source None with + match Atomic.exchange current_source None with | Some s -> s#sleep | None -> ()) method remaining = - match Atomic.get source with Some s -> s#remaining | None -> -1 + match self#current_source with Some s -> s#remaining | None -> -1 method abort_track = - match Atomic.get source with Some s -> s#abort_track | None -> () + match self#current_source with Some s -> s#abort_track | None -> () method seek_source = - match Atomic.get source with + match self#current_source with | Some s -> s#seek_source | None -> (self :> Source.source) @@ -106,7 +107,7 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f = | Some v -> (`Static, self#source_sync v) | None -> ( ( `Dynamic, - match Atomic.get source with + match self#current_source with | Some s -> snd s#self_sync | None -> None )) end @@ -133,16 +134,13 @@ let _ = Lang.nullable_t Lang.bool_t, Some Lang.null, Some "For the source's `self_sync` property." ); - ( "resurection_time", - Lang.nullable_t Lang.float_t, - Some (Lang.float 1.), - Some - "When track sensitive and the source is unavailable, how long we \ - should wait before trying to update source again (`null` means \ - never)." ); + ( "merge", + Lang.getter_t Lang.bool_t, + Some (Lang.bool false), + Some "Set or return `true` to merge subsequent tracks." ); ( "", Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t)), - Some (Lang.val_fun [] (fun _ -> Lang.null)), + None, Some "Function returning the source to be used, `null` means keep current \ source." ); @@ -152,17 +150,25 @@ let _ = "Dynamically change the underlying source: it can either be changed by \ the function given as argument, which returns the source to be played, \ or by calling the `set` method." - ~category:`Track ~flags:[`Experimental] + ~category:`Track ~meth: [ - ( "set", + ( "current_source", + ([], Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t))), + "Return the source currently selected.", + fun s -> + Lang.val_fun [] (fun _ -> + match s#current_source with + | None -> Lang.null + | Some s -> Lang.source s) ); + ( "prepare", ([], Lang.fun_t [(false, "", Lang.source_t frame_t)] Lang.unit_t), - "Set the source.", + "Prepare a source that will be returned later.", fun s -> Lang.val_fun [("", "x", None)] (fun p -> - s#propose (List.assoc "x" p |> Lang.to_source); + s#prepare (List.assoc "x" p |> Lang.to_source); Lang.unit) ); ] (fun p -> @@ -172,13 +178,10 @@ let _ = let track_sensitive = List.assoc "track_sensitive" p |> Lang.to_getter in let track_sensitive () = Lang.to_bool (track_sensitive ()) in let infallible = List.assoc "infallible" p |> Lang.to_bool in - let resurection_time = - List.assoc "resurection_time" p |> Lang.to_valued_option Lang.to_float - in - let resurection_time = Option.value ~default:(-1.) resurection_time in + let merge = Lang.to_getter (List.assoc "merge" p) in + let merge () = Lang.to_bool (merge ()) in let self_sync = Lang.to_valued_option Lang.to_bool (List.assoc "self_sync" p) in let next = List.assoc "" p in - new dyn - ~init ~track_sensitive ~infallible ~resurection_time ~self_sync next) + new dyn ~init ~track_sensitive ~infallible ~merge ~self_sync next) diff --git a/src/libs/extra/source.liq b/src/libs/extra/source.liq index 9fe862053f..727dc5cf60 100644 --- a/src/libs/extra/source.liq +++ b/src/libs/extra/source.liq @@ -195,7 +195,7 @@ end # @param ~every Duration of a track (in seconds). # @param ~metadata Metadata for tracks. # @param s The stream. -def chop(~every=getter(3.), ~metadata=getter([]), s) = +def chop(~id=null(), ~every=getter(3.), ~metadata=getter([]), s) = s = insert_metadata(s) # Track time in the source's context: @@ -211,7 +211,7 @@ def chop(~every=getter(3.), ~metadata=getter([]), s) = end end - source.on_frame(s, f) + source.on_frame(id=id, s, f) end # Regularly skip tracks from a source (useful for testing skipping). diff --git a/src/libs/request.liq b/src/libs/request.liq index 511d312b5b..675c779481 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -43,9 +43,10 @@ def request.queue( ) = ignore(native) id = string.id.default(default="request.queue", id) - initial_queue = queue + initial_queue = ref(queue) queue = ref([]) fetch = ref(fun () -> true) + started = ref(false) def next() = if @@ -60,13 +61,23 @@ def request.queue( end def push(r) = - log.info( - label=id, - "Pushing #{r} on the queue." - ) - queue := [...queue(), r] - fn = fetch() - ignore(fn()) + if + started() + then + log.info( + label=id, + "Pushing #{r} on the queue." + ) + queue := [...queue(), r] + fn = fetch() + ignore(fn()) + else + log.info( + label=id, + "Pushing #{r} on the initial queue." + ) + initial_queue := [...initial_queue(), r] + end end def push_uri(uri) = @@ -110,12 +121,12 @@ def request.queue( end def set_queue(q) = - queue := q + if started() then queue := q else initial_queue := q end s.set_queue([]) end def get_queue() = - [...s.queue(), ...(queue())] + [...s.queue(), ...initial_queue(), ...queue()] end s = @@ -127,7 +138,14 @@ def request.queue( queue=get_queue } - s.on_wake_up({s.set_queue(initial_queue)}) + s.on_wake_up( + fun () -> + begin + started := true + s.set_queue(initial_queue()) + initial_queue := [] + end + ) source.set_name(s, "request.queue") fetch := s.fetch @@ -432,10 +450,20 @@ def request.player(~simultaneous=true) = } } else - s = source.dynamic() + next_source = ref(null()) + + def next() = + s = next_source() + next_source := null() + s + end + + s = source.dynamic(next) def play(r) = - s.set(request.once(r)) + r = request.once(r) + s.prepare(r) + next_source := r end s.{play=play, length={1}} diff --git a/src/libs/source.liq b/src/libs/source.liq index 24526c93ea..d5c0e87576 100644 --- a/src/libs/source.liq +++ b/src/libs/source.liq @@ -173,39 +173,42 @@ def append(~id=null("append"), ~insert_missing=true, ~merge=false, s, f) = last_meta = ref(null()) pending = ref(null()) - def f(m) = - if - m["liq_append"] == "false" - then - last_meta := null() - pending := null() - elsif - last_meta() != m - then - last_meta := m - pending := (f(m) : source?) - end - end - - s = if insert_missing then source.on_track(s, f) else s end - s = source.on_metadata(s, f) - def next() = p = pending() pending := null() last_meta := null() + if null.defined(p) then null.get(p) else (s : source) end + end + + d = + source.dynamic( + track_sensitive=true, merge={merge and null.defined(pending)}, next + ) + + def f(m) = if - null.defined(p) + d.current_source() == (s : source) then - p = null.get(p) - sequence(merge=merge, [p, s]) - else - null() + if + m["liq_append"] == "false" + then + last_meta := null() + pending := null() + elsif + last_meta() != m + then + last_meta := m + s = f(m) + d.prepare(s) + pending := s + end end end - d = source.dynamic(track_sensitive=false, next) + d = if insert_missing then source.on_track(d, f) else d end + d = source.on_metadata(d, f) + s = fallback(id=id, track_sensitive=false, [d, s]) s.{ pending= diff --git a/src/libs/video.liq b/src/libs/video.liq index 9797af4d58..b81e8cd1ff 100644 --- a/src/libs/video.liq +++ b/src/libs/video.liq @@ -34,27 +34,24 @@ def request.image( ~y=getter(0), req ) = - s = source.dynamic() - last_req = ref(null()) - def set_req() = + def next() = req = (getter.get(req) : request) if req != last_req() then + last_req := req image = request.single(id=id, fallible=fallible, req) image = video.crop(image) - image = video.resize(id=id, x=x, y=y, width=width, height=height, image) - s.set(image) - last_req := req + video.resize(id=id, x=x, y=y, width=width, height=height, image) + else + null() end end - s.on_wake_up(set_req) - - source.on_frame(s, set_req) + source.dynamic(id=id, track_sensitive=false, next) end # Generate a source from an image file. @@ -197,34 +194,45 @@ end # @category Source / Video processing # @param s Audio source whose metadata contain cover-art. def video.cover(s) = - video = source.dynamic() + last_filename = ref(null()) + fail = (source.fail() : source) + + def next() = + m = source.last_metadata(s ?? fail) ?? [] - def read_cover(m) = filename = m["filename"] - cover = - if file.exists(filename) then file.cover(filename) else "".{mime=""} end if - null.defined(cover) + filename != last_filename() then - cover = null.get(cover) - ext = if cover.mime == "image/png" then ".png" else ".jpg" end - f = file.temp("cover", ext) - log.debug( - "Found cover for #{filename}." - ) - file.write(data=cover, f) - video.set(request.once(request.create(temporary=true, f))) + last_filename := filename + + cover = + if file.exists(filename) then file.cover(filename) else "".{mime=""} end + + if + null.defined(cover) + then + cover = null.get(cover) + ext = if cover.mime == "image/png" then ".png" else ".jpg" end + f = file.temp("cover", ext) + log.debug( + "Found cover for #{filename}." + ) + file.write(data=cover, f) + request.once(request.create(temporary=true, f)) + else + log.debug( + "No cover for #{filename}." + ) + fail + end else - log.debug( - "No cover for #{filename}." - ) - video.set(source.fail()) + null() end end - s.on_track(read_cover) - (video : source(video=canvas)) + source.dynamic(track_sensitive=false, next) end let output.youtube = () @@ -652,7 +660,16 @@ def video.slideshow( id = string.id.default(default="video.slideshow", id) l = ref(l) n = ref(-1) - s = source.dynamic() + + next_source = ref(null()) + + def next() = + s = next_source() + next_source := null() + s + end + + s = source.dynamic(next) def current() = list.nth(l(), n()) @@ -664,7 +681,9 @@ def video.slideshow( 0 <= n' and n' < list.length(l()) and n' != n() then n := n' - s.set(request.once(request.create(current()))) + new_source = request.once(request.create(current())) + s.prepare(new_source) + next_source := new_source end end