Skip to content

Commit

Permalink
source.dynamic improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Oct 3, 2024
1 parent 2e0f2a1 commit 24c8980
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 75 deletions.
9 changes: 9 additions & 0 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ let _ =
s#set_name n;
Lang.unit)

let _ =
Lang.add_builtin ~base:source "last_metadata" ~category:(`Source `Liquidsoap)
~descr:"Return the last metadata from the source."
[("", Lang.source_t (Lang.univ_t ()), None, None)]
(Lang.nullable_t Lang.metadata_t)
(fun p ->
let s = Lang.to_source (List.assoc "" p) in
match s#last_metadata with None -> Lang.null | Some m -> Lang.metadata m)

let _ =
Lang.add_builtin ~base:source "skip" ~category:(`Source `Liquidsoap)
~descr:"Skip to the next track."
Expand Down
87 changes: 53 additions & 34 deletions src/core/operators/dyn_op.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
*****************************************************************************)

class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync
next_fn =
object (self)
inherit Source.source ~name:"source.dynamic" ()

Expand All @@ -33,46 +34,62 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
val mutable activation = []
val source : Source.source option Atomic.t = Atomic.make init
val mutable last_select = Unix.gettimeofday ()
val proposed = Atomic.make None
method propose s = Atomic.set proposed (Some s)

method private prepare s =
method private no_source =
if infallible then
Lang.raise_error ~pos:[]
~message:
(Printf.sprintf
"Infallible source.dynamic %s was not able to prepare a source \
in time! Make sure to eithe define infallible sources in the \
source's dynamic function or mark the source as fallible.."
self#id)
"failure";
None

method prepare s =
Typing.(s#frame_type <: self#frame_type);
Clock.unify ~pos:self#pos s#clock self#clock;
s#wake_up;
s#wake_up

method private exchange s =
self#prepare s;
(match Atomic.exchange source (Some s) with
| Some s -> s#sleep
| None -> ());
if s#is_ready then Some s else None
if s#is_ready then Some s else self#no_source

method private get_next reselect =
self#mutexify
(fun () ->
match Atomic.exchange proposed None with
| Some s -> self#prepare s
last_select <- Unix.gettimeofday ();
let s =
Lang.apply next_fn
[
( "",
match Atomic.get source with
| None -> Lang.null
| Some s -> Lang.source s );
]
|> Lang.to_option |> Option.map Lang.to_source
in
match s with
| None -> (
last_select <- Unix.gettimeofday ();
let s =
Lang.apply f [] |> Lang.to_option |> Option.map Lang.to_source
in
match s with
| None -> (
match Atomic.get source with
| Some s
when self#can_reselect
~reselect:
(match reselect with
| `Force -> `Ok
| v -> v)
s ->
Some s
| _ -> None)
| Some s -> self#prepare s))
match Atomic.get source with
| Some s
when self#can_reselect
~reselect:
(match reselect with `Force -> `Ok | v -> v)
s ->
Some s
| _ -> self#no_source)
| Some s -> self#exchange s)
()

method private get_source ~reselect () =
match (Atomic.get source, reselect) with
| None, _ | _, `Force -> self#get_next reselect
| None, _ | _, `Force | Some _, `After_position _ ->
self#get_next reselect
| Some s, _ when self#can_reselect ~reselect s -> Some s
| Some _, _ when Unix.gettimeofday () -. last_select < resurection_time
->
Expand All @@ -83,7 +100,7 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
self#on_wake_up (fun () ->
Lang.iter_sources
(fun s -> Typing.(s#frame_type <: self#frame_type))
f;
next_fn;
ignore (self#get_source ~reselect:`Force ()));
self#on_sleep (fun () ->
match Atomic.exchange source None with
Expand Down Expand Up @@ -141,28 +158,30 @@ let _ =
should wait before trying to update source again (`null` means \
never)." );
( "",
Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t)),
Some (Lang.val_fun [] (fun _ -> Lang.null)),
Lang.fun_t
[(false, "", Lang.nullable_t (Lang.source_t frame_t))]
(Lang.nullable_t (Lang.source_t frame_t)),
None,
Some
"Function returning the source to be used, `null` means keep current \
source." );
source. Argument is the currently set source." );
]
~return_t:frame_t
~descr:
"Dynamically change the underlying source: it can either be changed by \
the function given as argument, which returns the source to be played, \
or by calling the `set` method."
~category:`Track ~flags:[`Experimental]
~category:`Track
~meth:
[
( "set",
( "prepare",
([], Lang.fun_t [(false, "", Lang.source_t frame_t)] Lang.unit_t),
"Set the source.",
"Prepare a source that will be returned later.",
fun s ->
Lang.val_fun
[("", "x", None)]
(fun p ->
s#propose (List.assoc "x" p |> Lang.to_source);
s#prepare (List.assoc "x" p |> Lang.to_source);
Lang.unit) );
]
(fun p ->
Expand Down
8 changes: 4 additions & 4 deletions src/libs/extra/native.liq
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ end
def native.fallback(~id=null(), ~track_sensitive=true, sources) =
fail = (source.fail() : source)

def s() =
def s(_) =
list.find(default=fail, source.is_ready, getter.get(sources))
end

Expand All @@ -44,15 +44,15 @@ def native.sequence(~id=null(), sources) =
n = ref(0)
fail = source.fail()

def rec s() =
def rec s(v) =
sn = list.nth(default=list.last(default=fail, sources), sources, n())
if
source.is_ready(sn) or n() >= len - 1
then
sn
else
ref.incr(n)
s()
s(v)
end
end

Expand Down Expand Up @@ -173,7 +173,7 @@ def native.request.dynamic(%argsof(request.dynamic), f) =
thread.run(queue=thread_queue, every=retry_delay, fill)

# Source
def s() =
def s(_) =
if
not list.is_empty(queue())
then
Expand Down
24 changes: 19 additions & 5 deletions src/libs/request.liq
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def request.once(
done = ref(false)
fail = fallback([])

def next() =
def next(_) =
if
done()
then
Expand Down Expand Up @@ -330,7 +330,7 @@ def request.single(
static_request := null()
end

def next() =
def next(_) =
if
done()
then
Expand Down Expand Up @@ -423,7 +423,11 @@ def request.player(~simultaneous=true) =
l := request.once(r)::l()
end

source.dynamic({add(normalize=false, l())}).{
def next(_) =
add(normalize=false, l())
end

source.dynamic(next).{
play=play,
length=
{
Expand All @@ -432,10 +436,20 @@ def request.player(~simultaneous=true) =
}
}
else
s = source.dynamic()
next_source = ref(null())

def next(_) =
s = next_source()
next_source := null()
s
end

s = source.dynamic(next)

def play(r) =
s.set(request.once(r))
r = request.once(r)
s.prepare(s)
next_source := r
end

s.{play=play, length={1}}
Expand Down
4 changes: 2 additions & 2 deletions src/libs/source.liq
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def append(~id=null("append"), ~insert_missing=true, ~merge=false, s, f) =
s = if insert_missing then source.on_track(s, f) else s end
s = source.on_metadata(s, f)

def next() =
def next(_) =
p = pending()
pending := null()
last_meta := null()
Expand Down Expand Up @@ -248,7 +248,7 @@ def prepend(~id=null("prepend"), ~merge=false, s, f) =
s = source.on_track(s, fun (m) -> m == [] ? last_meta := null() : on_meta(m) )
s = source.on_metadata(s, on_meta)

def next() =
def next(_) =
if
null.defined(last_meta)
then
Expand Down
79 changes: 49 additions & 30 deletions src/libs/video.liq
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,24 @@ def request.image(
~y=getter(0),
req
) =
s = source.dynamic()

last_req = ref(null())

def set_req() =
def next(_) =
req = (getter.get(req) : request)

if
req != last_req()
then
last_req := req
image = request.single(id=id, fallible=fallible, req)
image = video.crop(image)
image = video.resize(id=id, x=x, y=y, width=width, height=height, image)
s.set(image)
last_req := req
video.resize(id=id, x=x, y=y, width=width, height=height, image)
else
null()
end
end

s.on_wake_up(set_req)

source.on_frame(s, set_req)
source.dynamic(id=id, next)
end

# Generate a source from an image file.
Expand Down Expand Up @@ -197,34 +194,45 @@ end
# @category Source / Video processing
# @param s Audio source whose metadata contain cover-art.
def video.cover(s) =
video = source.dynamic()
last_filename = ref(null())
fail = (source.fail() : source)

def next(_) =
m = source.last_metadata(s ?? fail) ?? []

def read_cover(m) =
filename = m["filename"]
cover =
if file.exists(filename) then file.cover(filename) else "".{mime=""} end

if
null.defined(cover)
filename != last_filename()
then
cover = null.get(cover)
ext = if cover.mime == "image/png" then ".png" else ".jpg" end
f = file.temp("cover", ext)
log.debug(
"Found cover for #{filename}."
)
file.write(data=cover, f)
video.set(request.once(request.create(temporary=true, f)))
last_filename := filename

cover =
if file.exists(filename) then file.cover(filename) else "".{mime=""} end

if
null.defined(cover)
then
cover = null.get(cover)
ext = if cover.mime == "image/png" then ".png" else ".jpg" end
f = file.temp("cover", ext)
log.debug(
"Found cover for #{filename}."
)
file.write(data=cover, f)
request.once(request.create(temporary=true, f))
else
log.debug(
"No cover for #{filename}."
)
fail
end
else
log.debug(
"No cover for #{filename}."
)
video.set(source.fail())
null()
end
end

s.on_track(read_cover)
(video : source(video=canvas))
source.dynamic(track_sensitive=false, next)
end

let output.youtube = ()
Expand Down Expand Up @@ -652,7 +660,16 @@ def video.slideshow(
id = string.id.default(default="video.slideshow", id)
l = ref(l)
n = ref(-1)
s = source.dynamic()

next_source = ref(null())

def next(_) =
s = next_source()
next_source := null()
s
end

s = source.dynamic(next)

def current() =
list.nth(l(), n())
Expand All @@ -664,7 +681,9 @@ def video.slideshow(
0 <= n' and n' < list.length(l()) and n' != n()
then
n := n'
s.set(request.once(request.create(current())))
new_source = request.once(request.create(current()))
s.prepare(new_source)
next_source := new_source
end
end

Expand Down

0 comments on commit 24c8980

Please sign in to comment.