Skip to content

Commit

Permalink
Cleanup source.dynamic, fix append (#4156)
Browse files Browse the repository at this point in the history
  • Loading branch information
toots authored Oct 4, 2024
1 parent 2e0f2a1 commit 1dc6fac
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 132 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ New:

Changed:

- Reimplemented `request.once`, `single` and more using `source.dynamic`. Removed experiment
flag on `source.dynamic`. The operator is considered stable enough to define advanced sources
but the user should be careful when using it.
- Mute SDL startup messages (#2913).
- `int` can optionally raises an error when passing `nan` or `infinity`, `int(infinity)`
now returns `max_int` and `int(-infinity)` returns `min_int`. (#3407)
Expand Down
9 changes: 9 additions & 0 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ let _ =
s#set_name n;
Lang.unit)

let _ =
Lang.add_builtin ~base:source "last_metadata" ~category:(`Source `Liquidsoap)
~descr:"Return the last metadata from the source."
[("", Lang.source_t (Lang.univ_t ()), None, None)]
(Lang.nullable_t Lang.metadata_t)
(fun p ->
let s = Lang.to_source (List.assoc "" p) in
match s#last_metadata with None -> Lang.null | Some m -> Lang.metadata m)

let _ =
Lang.add_builtin ~base:source "skip" ~category:(`Source `Liquidsoap)
~descr:"Skip to the next track."
Expand Down
131 changes: 67 additions & 64 deletions src/core/operators/dyn_op.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,84 +20,85 @@
*****************************************************************************)

class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
class dyn ~init ~track_sensitive ~infallible ~self_sync ~merge next_fn =
object (self)
inherit Source.source ~name:"source.dynamic" ()

inherit
Source.generate_from_multiple_sources
~merge:(fun () -> false)
~track_sensitive ()

inherit Source.generate_from_multiple_sources ~merge ~track_sensitive ()
method fallible = not infallible
val mutable activation = []
val source : Source.source option Atomic.t = Atomic.make init
val current_source : Source.source option Atomic.t = Atomic.make init
method current_source = Atomic.get current_source
val mutable last_select = Unix.gettimeofday ()
val proposed = Atomic.make None
method propose s = Atomic.set proposed (Some s)

method private prepare s =
method private no_source =
if infallible then
Lang.raise_error ~pos:[]
~message:
(Printf.sprintf
"Infallible source.dynamic %s was not able to prepare a source \
in time! Make sure to either define infallible sources in the \
source's dynamic function or mark the source as fallible.."
self#id)
"failure";
None

method prepare s =
Typing.(s#frame_type <: self#frame_type);
Clock.unify ~pos:self#pos s#clock self#clock;
s#wake_up;
(match Atomic.exchange source (Some s) with
| Some s -> s#sleep
| None -> ());
if s#is_ready then Some s else None
s#wake_up

method private exchange s =
self#log#info "Switching to source %s" s#id;
self#prepare s;
Atomic.set current_source (Some s);
if s#is_ready then Some s else self#no_source

method private get_next reselect =
self#mutexify
(fun () ->
match Atomic.exchange proposed None with
| Some s -> self#prepare s
last_select <- Unix.gettimeofday ();
let s =
Lang.apply next_fn [] |> Lang.to_option |> Option.map Lang.to_source
in
match s with
| None -> (
last_select <- Unix.gettimeofday ();
let s =
Lang.apply f [] |> Lang.to_option |> Option.map Lang.to_source
in
match s with
| None -> (
match Atomic.get source with
| Some s
when self#can_reselect
~reselect:
(match reselect with
| `Force -> `Ok
| v -> v)
s ->
Some s
| _ -> None)
| Some s -> self#prepare s))
match self#current_source with
| Some s
when self#can_reselect
~reselect:
(match reselect with `Force -> `Ok | v -> v)
s ->
Some s
| _ -> self#no_source)
| Some s -> self#exchange s)
()

method private get_source ~reselect () =
match (Atomic.get source, reselect) with
| None, _ | _, `Force -> self#get_next reselect
match (self#current_source, reselect) with
| None, _ | _, `Force | Some _, `After_position _ ->
self#get_next reselect
| Some s, _ when self#can_reselect ~reselect s -> Some s
| Some _, _ when Unix.gettimeofday () -. last_select < resurection_time
->
None
| _ -> self#get_next reselect

initializer
self#on_wake_up (fun () ->
Lang.iter_sources
(fun s -> Typing.(s#frame_type <: self#frame_type))
f;
next_fn;
ignore (self#get_source ~reselect:`Force ()));
self#on_sleep (fun () ->
match Atomic.exchange source None with
match Atomic.exchange current_source None with
| Some s -> s#sleep
| None -> ())

method remaining =
match Atomic.get source with Some s -> s#remaining | None -> -1
match self#current_source with Some s -> s#remaining | None -> -1

method abort_track =
match Atomic.get source with Some s -> s#abort_track | None -> ()
match self#current_source with Some s -> s#abort_track | None -> ()

method seek_source =
match Atomic.get source with
match self#current_source with
| Some s -> s#seek_source
| None -> (self :> Source.source)

Expand All @@ -106,7 +107,7 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time ~self_sync f =
| Some v -> (`Static, self#source_sync v)
| None -> (
( `Dynamic,
match Atomic.get source with
match self#current_source with
| Some s -> snd s#self_sync
| None -> None ))
end
Expand All @@ -133,16 +134,13 @@ let _ =
Lang.nullable_t Lang.bool_t,
Some Lang.null,
Some "For the source's `self_sync` property." );
( "resurection_time",
Lang.nullable_t Lang.float_t,
Some (Lang.float 1.),
Some
"When track sensitive and the source is unavailable, how long we \
should wait before trying to update source again (`null` means \
never)." );
( "merge",
Lang.getter_t Lang.bool_t,
Some (Lang.bool false),
Some "Set or return `true` to merge subsequent tracks." );
( "",
Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t)),
Some (Lang.val_fun [] (fun _ -> Lang.null)),
None,
Some
"Function returning the source to be used, `null` means keep current \
source." );
Expand All @@ -152,17 +150,25 @@ let _ =
"Dynamically change the underlying source: it can either be changed by \
the function given as argument, which returns the source to be played, \
or by calling the `set` method."
~category:`Track ~flags:[`Experimental]
~category:`Track
~meth:
[
( "set",
( "current_source",
([], Lang.fun_t [] (Lang.nullable_t (Lang.source_t frame_t))),
"Return the source currently selected.",
fun s ->
Lang.val_fun [] (fun _ ->
match s#current_source with
| None -> Lang.null
| Some s -> Lang.source s) );
( "prepare",
([], Lang.fun_t [(false, "", Lang.source_t frame_t)] Lang.unit_t),
"Set the source.",
"Prepare a source that will be returned later.",
fun s ->
Lang.val_fun
[("", "x", None)]
(fun p ->
s#propose (List.assoc "x" p |> Lang.to_source);
s#prepare (List.assoc "x" p |> Lang.to_source);
Lang.unit) );
]
(fun p ->
Expand All @@ -172,13 +178,10 @@ let _ =
let track_sensitive = List.assoc "track_sensitive" p |> Lang.to_getter in
let track_sensitive () = Lang.to_bool (track_sensitive ()) in
let infallible = List.assoc "infallible" p |> Lang.to_bool in
let resurection_time =
List.assoc "resurection_time" p |> Lang.to_valued_option Lang.to_float
in
let resurection_time = Option.value ~default:(-1.) resurection_time in
let merge = Lang.to_getter (List.assoc "merge" p) in
let merge () = Lang.to_bool (merge ()) in
let self_sync =
Lang.to_valued_option Lang.to_bool (List.assoc "self_sync" p)
in
let next = List.assoc "" p in
new dyn
~init ~track_sensitive ~infallible ~resurection_time ~self_sync next)
new dyn ~init ~track_sensitive ~infallible ~merge ~self_sync next)
4 changes: 2 additions & 2 deletions src/libs/extra/source.liq
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ end
# @param ~every Duration of a track (in seconds).
# @param ~metadata Metadata for tracks.
# @param s The stream.
def chop(~every=getter(3.), ~metadata=getter([]), s) =
def chop(~id=null(), ~every=getter(3.), ~metadata=getter([]), s) =
s = insert_metadata(s)

# Track time in the source's context:
Expand All @@ -211,7 +211,7 @@ def chop(~every=getter(3.), ~metadata=getter([]), s) =
end
end

source.on_frame(s, f)
source.on_frame(id=id, s, f)
end

# Regularly skip tracks from a source (useful for testing skipping).
Expand Down
54 changes: 41 additions & 13 deletions src/libs/request.liq
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ def request.queue(
) =
ignore(native)
id = string.id.default(default="request.queue", id)
initial_queue = queue
initial_queue = ref(queue)
queue = ref([])
fetch = ref(fun () -> true)
started = ref(false)

def next() =
if
Expand All @@ -60,13 +61,23 @@ def request.queue(
end

def push(r) =
log.info(
label=id,
"Pushing #{r} on the queue."
)
queue := [...queue(), r]
fn = fetch()
ignore(fn())
if
started()
then
log.info(
label=id,
"Pushing #{r} on the queue."
)
queue := [...queue(), r]
fn = fetch()
ignore(fn())
else
log.info(
label=id,
"Pushing #{r} on the initial queue."
)
initial_queue := [...initial_queue(), r]
end
end

def push_uri(uri) =
Expand Down Expand Up @@ -110,12 +121,12 @@ def request.queue(
end

def set_queue(q) =
queue := q
if started() then queue := q else initial_queue := q end
s.set_queue([])
end

def get_queue() =
[...s.queue(), ...(queue())]
[...s.queue(), ...initial_queue(), ...queue()]
end

s =
Expand All @@ -127,7 +138,14 @@ def request.queue(
queue=get_queue
}

s.on_wake_up({s.set_queue(initial_queue)})
s.on_wake_up(
fun () ->
begin
started := true
s.set_queue(initial_queue())
initial_queue := []
end
)

source.set_name(s, "request.queue")
fetch := s.fetch
Expand Down Expand Up @@ -432,10 +450,20 @@ def request.player(~simultaneous=true) =
}
}
else
s = source.dynamic()
next_source = ref(null())

def next() =
s = next_source()
next_source := null()
s
end

s = source.dynamic(next)

def play(r) =
s.set(request.once(r))
r = request.once(r)
s.prepare(r)
next_source := r
end

s.{play=play, length={1}}
Expand Down
Loading

0 comments on commit 1dc6fac

Please sign in to comment.