Skip to content

Commit

Permalink
source.dynamic improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Oct 4, 2024
1 parent 2e0f2a1 commit 5bb63b2
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 121 deletions.
9 changes: 9 additions & 0 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ let _ =
s#set_name n;
Lang.unit)

let _ =
Lang.add_builtin ~base:source "last_metadata" ~category:(`Source `Liquidsoap)
~descr:"Return the last metadata from the source."
[("", Lang.source_t (Lang.univ_t ()), None, None)]
(Lang.nullable_t Lang.metadata_t)
(fun p ->
let s = Lang.to_source (List.assoc "" p) in
match s#last_metadata with None -> Lang.null | Some m -> Lang.metadata m)

let _ =
Lang.add_builtin ~base:source "skip" ~category:(`Source `Liquidsoap)
~descr:"Skip to the next track."
Expand Down
131 changes: 67 additions & 64 deletions src/core/operators/dyn_op.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,84 +20,85 @@
*****************************************************************************)

class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
class dyn ~init ~track_sensitive ~infallible ~self_sync ~merge next_fn =
object (self)
inherit Source.source ~name:"source.dynamic" ()

inherit
Source.generate_from_multiple_sources
~merge:(fun () -> false)
~track_sensitive ()

inherit Source.generate_from_multiple_sources ~merge ~track_sensitive ()
method fallible = not infallible
val mutable activation = []
val source : Source.source option Atomic.t = Atomic.make init
val current_source : Source.source option Atomic.t = Atomic.make init
method current_source = Atomic.get current_source
val mutable last_select = Unix.gettimeofday ()
val proposed = Atomic.make None
method propose s = Atomic.set proposed (Some s)

method private prepare s =
method private no_source =
if infallible then
Lang.raise_error ~pos:[]
~message:
(Printf.sprintf
"Infallible source.dynamic %s was not able to prepare a source \
in time! Make sure to either define infallible sources in the \
source's dynamic function or mark the source as fallible.."
self#id)
"failure";
None

method prepare s =
Typing.(s#frame_type <: self#frame_type);
Clock.unify ~pos:self#pos s#clock self#clock;
s#wake_up;
(match Atomic.exchange source (Some s) with
| Some s -> s#sleep
| None -> ());
if s#is_ready then Some s else None
s#wake_up

method private exchange s =
self#log#info "Switching to source %s" s#id;
self#prepare s;
Atomic.set current_source (Some s);
if s#is_ready then Some s else self#no_source

method private get_next reselect =
self#mutexify
(fun () ->
match Atomic.exchange proposed None with
| Some s -> self#prepare s
last_select <- Unix.gettimeofday ();
let s =
Lang.apply next_fn [] |> Lang.to_option |> Option.map Lang.to_source
in
match s with
| None -> (
last_select <- Unix.gettimeofday ();
let s =
Lang.apply f [] |> Lang.to_option |> Option.map Lang.to_source
in
match s with
| None -> (
match Atomic.get source with
| Some s
when self#can_reselect
~reselect:
(match reselect with
| `Force -> `Ok
| v -> v)
s ->
Some s
| _ -> None)
| Some s -> self#prepare s))
match self#current_source with
| Some s
when self#can_reselect
~reselect:
(match reselect with `Force -> `Ok | v -> v)
s ->
Some s
| _ -> self#no_source)
| Some s -> self#exchange s)
()

method private get_source ~reselect () =
match (Atomic.get source, reselect) with
| None, _ | _, `Force -> self#get_next reselect
match (self#current_source, reselect) with
| None, _ | _, `Force | Some _, `After_position _ ->
self#get_next reselect
| Some s, _ when self#can_reselect ~reselect s -> Some s
| Some _, _ when Unix.gettimeofday () -. last_select < resurection_time
->
None
| _ -> self#get_next reselect

initializer
self#on_wake_up (fun () ->
Lang.iter_sources
(fun s -> Typing.(s#frame_type <: self#frame_type))
f;
next_fn;
ignore (self#get_source ~reselect:`Force ()));
self#on_sleep (fun () ->
match Atomic.exchange source None with
match Atomic.exchange current_source None with
| Some s -> s#sleep
| None -> ())

method remaining =
match Atomic.get source with Some s -> s#remaining | None -> -1
match self#current_source with Some s -> s#remaining | None -> -1

method abort_track =
match Atomic.get source with Some s -> s#abort_track | None -> ()
match self#current_source with Some s -> s#abort_track | None -> ()

method seek_source =
match Atomic.get source with
match self#current_source with
| Some s -> s#seek_source
| None -> (self :> Source.source)

Expand All @@ -106,7 +107,7 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
| Some v -> (`Static, self#source_sync v)
| None -> (
( `Dynamic,
match Atomic.get source with
match self#current_source with
| Some s -> snd s#self_sync
| None -> None ))
end
Expand All @@ -133,16 +134,13 @@ let _ =
Lang.nullable_t Lang.bool_t,
Some Lang.null,
Some "For the source's `self_sync` property." );
( "resurection_time",
Lang.nullable_t Lang.float_t,
Some (Lang.float 1.),
Some
"When track sensitive and the source is unavailable, how long we \
should wait before trying to update source again (`null` means \
never)." );
( "merge",
Lang.getter_t Lang.bool_t,
Some (Lang.bool false),
Some "Set or return `true` to merge subsequent tracks." );
( "",
Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t)),
Some (Lang.val_fun [] (fun _ -> Lang.null)),
None,
Some
"Function returning the source to be used, `null` means keep current \
source." );
Expand All @@ -152,17 +150,25 @@ let _ =
"Dynamically change the underlying source: it can either be changed by \
the function given as argument, which returns the source to be played, \
or by calling the `set` method."
~category:`Track ~flags:[`Experimental]
~category:`Track
~meth:
[
( "set",
( "current_source",
([], Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t))),
"Return the source currently selected.",
fun s ->
Lang.val_fun [] (fun _ ->
match s#current_source with
| None -> Lang.null
| Some s -> Lang.source s) );
( "prepare",
([], Lang.fun_t [(false, "", Lang.source_t frame_t)] Lang.unit_t),
"Set the source.",
"Prepare a source that will be returned later.",
fun s ->
Lang.val_fun
[("", "x", None)]
(fun p ->
s#propose (List.assoc "x" p |> Lang.to_source);
s#prepare (List.assoc "x" p |> Lang.to_source);
Lang.unit) );
]
(fun p ->
Expand All @@ -172,13 +178,10 @@ let _ =
let track_sensitive = List.assoc "track_sensitive" p |> Lang.to_getter in
let track_sensitive () = Lang.to_bool (track_sensitive ()) in
let infallible = List.assoc "infallible" p |> Lang.to_bool in
let resurection_time =
List.assoc "resurection_time" p |> Lang.to_valued_option Lang.to_float
in
let resurection_time = Option.value ~default:(-1.) resurection_time in
let merge = Lang.to_getter (List.assoc "merge" p) in
let merge () = Lang.to_bool (merge ()) in
let self_sync =
Lang.to_valued_option Lang.to_bool (List.assoc "self_sync" p)
in
let next = List.assoc "" p in
new dyn
~init ~track_sensitive ~infallible ~resurection_time ~self_sync next)
new dyn ~init ~track_sensitive ~infallible ~merge ~self_sync next)
4 changes: 2 additions & 2 deletions src/libs/extra/source.liq
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ end
# @param ~every Duration of a track (in seconds).
# @param ~metadata Metadata for tracks.
# @param s The stream.
def chop(~every=getter(3.), ~metadata=getter([]), s) =
def chop(~id=null(), ~every=getter(3.), ~metadata=getter([]), s) =
s = insert_metadata(s)

# Track time in the source's context:
Expand All @@ -211,7 +211,7 @@ def chop(~every=getter(3.), ~metadata=getter([]), s) =
end
end

source.on_frame(s, f)
source.on_frame(id=id, s, f)
end

# Regularly skip tracks from a source (useful for testing skipping).
Expand Down
14 changes: 12 additions & 2 deletions src/libs/request.liq
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,20 @@ def request.player(~simultaneous=true) =
}
}
else
s = source.dynamic()
next_source = ref(null())

def next() =
s = next_source()
next_source := null()
s
end

s = source.dynamic(next)

def play(r) =
s.set(request.once(r))
r = request.once(r)
s.prepare(s)
next_source := r
end

s.{play=play, length={1}}
Expand Down
49 changes: 26 additions & 23 deletions src/libs/source.liq
Original file line number Diff line number Diff line change
Expand Up @@ -173,39 +173,42 @@ def append(~id=null("append"), ~insert_missing=true, ~merge=false, s, f) =
last_meta = ref(null())
pending = ref(null())

def f(m) =
if
m["liq_append"] == "false"
then
last_meta := null()
pending := null()
elsif
last_meta() != m
then
last_meta := m
pending := (f(m) : source?)
end
end

s = if insert_missing then source.on_track(s, f) else s end
s = source.on_metadata(s, f)

def next() =
p = pending()
pending := null()
last_meta := null()

if null.defined(p) then null.get(p) else (s : source) end
end

d =
source.dynamic(
track_sensitive=true, merge={merge and null.defined(pending)}, next
)

def f(m) =
if
null.defined(p)
d.current_source() == (s : source)
then
p = null.get(p)
sequence(merge=merge, [p, s])
else
null()
if
m["liq_append"] == "false"
then
last_meta := null()
pending := null()
elsif
last_meta() != m
then
last_meta := m
s = f(m)
d.prepare(s)
pending := s
end
end
end

d = source.dynamic(track_sensitive=false, next)
d = if insert_missing then source.on_track(d, f) else d end
d = source.on_metadata(d, f)

s = fallback(id=id, track_sensitive=false, [d, s])
s.{
pending=
Expand Down
Loading

0 comments on commit 5bb63b2

Please sign in to comment.