From 24c8980a76d945f71c64f9f11de5ba4aabdf4954 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Thu, 3 Oct 2024 09:40:27 -0500 Subject: [PATCH] source.dynamic improvements. --- src/core/builtins/builtins_source.ml | 9 +++ src/core/operators/dyn_op.ml | 87 +++++++++++++++++----------- src/libs/extra/native.liq | 8 +-- src/libs/request.liq | 24 ++++++-- src/libs/source.liq | 4 +- src/libs/video.liq | 79 +++++++++++++++---------- 6 files changed, 136 insertions(+), 75 deletions(-) diff --git a/src/core/builtins/builtins_source.ml b/src/core/builtins/builtins_source.ml index 1497693389..4bd5ad1f18 100644 --- a/src/core/builtins/builtins_source.ml +++ b/src/core/builtins/builtins_source.ml @@ -41,6 +41,15 @@ let _ = s#set_name n; Lang.unit) +let _ = + Lang.add_builtin ~base:source "last_metadata" ~category:(`Source `Liquidsoap) + ~descr:"Return the last metadata from the source." + [("", Lang.source_t (Lang.univ_t ()), None, None)] + (Lang.nullable_t Lang.metadata_t) + (fun p -> + let s = Lang.to_source (List.assoc "" p) in + match s#last_metadata with None -> Lang.null | Some m -> Lang.metadata m) + let _ = Lang.add_builtin ~base:source "skip" ~category:(`Source `Liquidsoap) ~descr:"Skip to the next track." diff --git a/src/core/operators/dyn_op.ml b/src/core/operators/dyn_op.ml index 426f65f874..d4358cb5bc 100644 --- a/src/core/operators/dyn_op.ml +++ b/src/core/operators/dyn_op.ml @@ -20,7 +20,8 @@ *****************************************************************************) -class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f = +class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync + next_fn = object (self) inherit Source.source ~name:"source.dynamic" () @@ -33,46 +34,62 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f = val mutable activation = [] val source : Source.source option Atomic.t = Atomic.make init val mutable last_select = Unix.gettimeofday () - val proposed = Atomic.make None - method propose s = Atomic.set proposed (Some s) - method private prepare s = + method private no_source = + if infallible then + Lang.raise_error ~pos:[] + ~message: + (Printf.sprintf + "Infallible source.dynamic %s was not able to prepare a source \ + in time! Make sure to eithe define infallible sources in the \ + source's dynamic function or mark the source as fallible.." + self#id) + "failure"; + None + + method prepare s = Typing.(s#frame_type <: self#frame_type); Clock.unify ~pos:self#pos s#clock self#clock; - s#wake_up; + s#wake_up + + method private exchange s = + self#prepare s; (match Atomic.exchange source (Some s) with | Some s -> s#sleep | None -> ()); - if s#is_ready then Some s else None + if s#is_ready then Some s else self#no_source method private get_next reselect = self#mutexify (fun () -> - match Atomic.exchange proposed None with - | Some s -> self#prepare s + last_select <- Unix.gettimeofday (); + let s = + Lang.apply next_fn + [ + ( "", + match Atomic.get source with + | None -> Lang.null + | Some s -> Lang.source s ); + ] + |> Lang.to_option |> Option.map Lang.to_source + in + match s with | None -> ( - last_select <- Unix.gettimeofday (); - let s = - Lang.apply f [] |> Lang.to_option |> Option.map Lang.to_source - in - match s with - | None -> ( - match Atomic.get source with - | Some s - when self#can_reselect - ~reselect: - (match reselect with - | `Force -> `Ok - | v -> v) - s -> - Some s - | _ -> None) - | Some s -> self#prepare s)) + match Atomic.get source with + | Some s + when self#can_reselect + ~reselect: + (match reselect with `Force -> `Ok | v -> v) + s -> + Some s + | _ -> self#no_source) + | Some s -> self#exchange s) () method private get_source ~reselect () = match (Atomic.get source, reselect) with - | None, _ | _, `Force -> self#get_next reselect + | None, _ | _, `Force | Some _, `After_position _ -> + self#get_next reselect | Some s, _ when self#can_reselect ~reselect s -> Some s | Some _, _ when Unix.gettimeofday () -. last_select < resurection_time -> @@ -83,7 +100,7 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f = self#on_wake_up (fun () -> Lang.iter_sources (fun s -> Typing.(s#frame_type <: self#frame_type)) - f; + next_fn; ignore (self#get_source ~reselect:`Force ())); self#on_sleep (fun () -> match Atomic.exchange source None with @@ -141,28 +158,30 @@ let _ = should wait before trying to update source again (`null` means \ never)." ); ( "", - Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t)), - Some (Lang.val_fun [] (fun _ -> Lang.null)), + Lang.fun_t + [(false, "", Lang.nullable_t (Lang.source_t frame_t))] + (Lang.nullable_t (Lang.source_t frame_t)), + None, Some "Function returning the source to be used, `null` means keep current \ - source." ); + source. Argument is the currently set source." ); ] ~return_t:frame_t ~descr: "Dynamically change the underlying source: it can either be changed by \ the function given as argument, which returns the source to be played, \ or by calling the `set` method." - ~category:`Track ~flags:[`Experimental] + ~category:`Track ~meth: [ - ( "set", + ( "prepare", ([], Lang.fun_t [(false, "", Lang.source_t frame_t)] Lang.unit_t), - "Set the source.", + "Prepare a source that will be returned later.", fun s -> Lang.val_fun [("", "x", None)] (fun p -> - s#propose (List.assoc "x" p |> Lang.to_source); + s#prepare (List.assoc "x" p |> Lang.to_source); Lang.unit) ); ] (fun p -> diff --git a/src/libs/extra/native.liq b/src/libs/extra/native.liq index 4cfdfa5f86..af3ab4d7a4 100644 --- a/src/libs/extra/native.liq +++ b/src/libs/extra/native.liq @@ -19,7 +19,7 @@ end def native.fallback(~id=null(), ~track_sensitive=true, sources) = fail = (source.fail() : source) - def s() = + def s(_) = list.find(default=fail, source.is_ready, getter.get(sources)) end @@ -44,7 +44,7 @@ def native.sequence(~id=null(), sources) = n = ref(0) fail = source.fail() - def rec s() = + def rec s(v) = sn = list.nth(default=list.last(default=fail, sources), sources, n()) if source.is_ready(sn) or n() >= len - 1 @@ -52,7 +52,7 @@ def native.sequence(~id=null(), sources) = sn else ref.incr(n) - s() + s(v) end end @@ -173,7 +173,7 @@ def native.request.dynamic(%argsof(request.dynamic), f) = thread.run(queue=thread_queue, every=retry_delay, fill) # Source - def s() = + def s(_) = if not list.is_empty(queue()) then diff --git a/src/libs/request.liq b/src/libs/request.liq index 511d312b5b..6381fe2361 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -223,7 +223,7 @@ def request.once( done = ref(false) fail = fallback([]) - def next() = + def next(_) = if done() then @@ -330,7 +330,7 @@ def request.single( static_request := null() end - def next() = + def next(_) = if done() then @@ -423,7 +423,11 @@ def request.player(~simultaneous=true) = l := request.once(r)::l() end - source.dynamic({add(normalize=false, l())}).{ + def next(_) = + add(normalize=false, l()) + end + + source.dynamic(next).{ play=play, length= { @@ -432,10 +436,20 @@ def request.player(~simultaneous=true) = } } else - s = source.dynamic() + next_source = ref(null()) + + def next(_) = + s = next_source() + next_source := null() + s + end + + s = source.dynamic(next) def play(r) = - s.set(request.once(r)) + r = request.once(r) + s.prepare(s) + next_source := r end s.{play=play, length={1}} diff --git a/src/libs/source.liq b/src/libs/source.liq index 24526c93ea..bb92a214a4 100644 --- a/src/libs/source.liq +++ b/src/libs/source.liq @@ -190,7 +190,7 @@ def append(~id=null("append"), ~insert_missing=true, ~merge=false, s, f) = s = if insert_missing then source.on_track(s, f) else s end s = source.on_metadata(s, f) - def next() = + def next(_) = p = pending() pending := null() last_meta := null() @@ -248,7 +248,7 @@ def prepend(~id=null("prepend"), ~merge=false, s, f) = s = source.on_track(s, fun (m) -> m == [] ? last_meta := null() : on_meta(m) ) s = source.on_metadata(s, on_meta) - def next() = + def next(_) = if null.defined(last_meta) then diff --git a/src/libs/video.liq b/src/libs/video.liq index 9797af4d58..3677c5c518 100644 --- a/src/libs/video.liq +++ b/src/libs/video.liq @@ -34,27 +34,24 @@ def request.image( ~y=getter(0), req ) = - s = source.dynamic() - last_req = ref(null()) - def set_req() = + def next(_) = req = (getter.get(req) : request) if req != last_req() then + last_req := req image = request.single(id=id, fallible=fallible, req) image = video.crop(image) - image = video.resize(id=id, x=x, y=y, width=width, height=height, image) - s.set(image) - last_req := req + video.resize(id=id, x=x, y=y, width=width, height=height, image) + else + null() end end - s.on_wake_up(set_req) - - source.on_frame(s, set_req) + source.dynamic(id=id, next) end # Generate a source from an image file. @@ -197,34 +194,45 @@ end # @category Source / Video processing # @param s Audio source whose metadata contain cover-art. def video.cover(s) = - video = source.dynamic() + last_filename = ref(null()) + fail = (source.fail() : source) + + def next(_) = + m = source.last_metadata(s ?? fail) ?? [] - def read_cover(m) = filename = m["filename"] - cover = - if file.exists(filename) then file.cover(filename) else "".{mime=""} end if - null.defined(cover) + filename != last_filename() then - cover = null.get(cover) - ext = if cover.mime == "image/png" then ".png" else ".jpg" end - f = file.temp("cover", ext) - log.debug( - "Found cover for #{filename}." - ) - file.write(data=cover, f) - video.set(request.once(request.create(temporary=true, f))) + last_filename := filename + + cover = + if file.exists(filename) then file.cover(filename) else "".{mime=""} end + + if + null.defined(cover) + then + cover = null.get(cover) + ext = if cover.mime == "image/png" then ".png" else ".jpg" end + f = file.temp("cover", ext) + log.debug( + "Found cover for #{filename}." + ) + file.write(data=cover, f) + request.once(request.create(temporary=true, f)) + else + log.debug( + "No cover for #{filename}." + ) + fail + end else - log.debug( - "No cover for #{filename}." - ) - video.set(source.fail()) + null() end end - s.on_track(read_cover) - (video : source(video=canvas)) + source.dynamic(track_sensitive=false, next) end let output.youtube = () @@ -652,7 +660,16 @@ def video.slideshow( id = string.id.default(default="video.slideshow", id) l = ref(l) n = ref(-1) - s = source.dynamic() + + next_source = ref(null()) + + def next(_) = + s = next_source() + next_source := null() + s + end + + s = source.dynamic(next) def current() = list.nth(l(), n()) @@ -664,7 +681,9 @@ def video.slideshow( 0 <= n' and n' < list.length(l()) and n' != n() then n := n' - s.set(request.once(request.create(current()))) + new_source = request.once(request.create(current())) + s.prepare(new_source) + next_source := new_source end end