Skip to content

Commit

Permalink
Add generic thread queues, use them to implement autocue-specific que…
Browse files Browse the repository at this point in the history
…ue (#4151)
  • Loading branch information
toots committed Jan 3, 2025
1 parent c31fd7c commit 4a5f888
Show file tree
Hide file tree
Showing 32 changed files with 582 additions and 137 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ New:
- Added `video.canvas` to make it possible to position video elements independently
of the rendered video size ([#3656](https://github.com/savonet/liquidsoap/pull/3656), [blog post](https://www.liquidsoap.info/blog/2024-02-10-video-canvas-and-ai/))
- Added cover manager from an original code by @vitoyucepi (#3651)
- Reworked scheduler queues logic, allow user-defined queues, add options to pick
the queue to send asynchronous tasks to (#4151)
- Added non-interleaved API to `%ffmpeg` encoder, enabled by default when only
one stream is encoded.
- Allow trailing commas in record definition (#3300).
Expand Down
6 changes: 0 additions & 6 deletions doc/content/liq/radiopi.liq
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ settings.harbor.bind_addrs.set(["0.0.0.0"])
# Verbose logs
log.level.set(4)

# We use the scheduler intensively,
# therefore we create many queues.
settings.scheduler.generic_queues.set(5)
settings.scheduler.fast_queues.set(3)
settings.scheduler.non_blocking_queues.set(3)

# === Settings ===

# The host to request files
Expand Down
5 changes: 5 additions & 0 deletions doc/content/liq/task-example.liq
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def connect_callback() =
ignore(http.post("http://host/on_connect"))
end

thread.run(connect_callback)
12 changes: 12 additions & 0 deletions doc/content/liq/task-with-queue.liq
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Add 3 foo queue
settings.scheduler.queues.set([
...settings.scheduler.queues(),
("foo", 3)
])

def connect_callback() =
ignore(http.post("http://host/on_connect"))
end

# Execute inside the foo queue
thread.run(queue="foo", connect_callback)
28 changes: 28 additions & 0 deletions doc/content/migrating.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,34 @@ def transition(old, new) =
end
```

### Thread queues

In order to improve issues with complex inter-dependent asynchronous tasks such as `autocue` data computation,
scheduler queues have been improved.

User-provided named queues can now be created and used to send asynchronous tasks, making it possible to control
concurrency of certain classes of tasks and also to remedy any potential dependency between asynchronous tasks.

Settings for queues have thus changed and now will look like this:

```liquidsoap
# Add a custom queue with 4 workers, increase generic queues to 4:
settings.scheduler.queues.set([
...list.assoc,remove("generic", settings.scheduler.queues()),
("generic", 4),
("custom", 4)
]
```

The `fast` argument of the `thread.run.*` functions has been replaced by `queue`, telling the operator which queue should the
asynchronous tasks sent to.

Likewise, `request.dynamic`, `playlist`, `single` etc. have also been updated to accept a `thread_queue` argument controlling
which asynchronous queue their request resolution tasks should be sent to.

See [the original Pull Request)[https://github.com/savonet/liquidsoap/pull/4151) and [the threads page](threads.html)
for more details.

### Replaygain

- There is a new `metadata.replaygain` function that extracts the replay gain value in _dB_ from the metadata.
Expand Down
45 changes: 45 additions & 0 deletions doc/content/threads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Threads

The main purpose of liquidsoap is to create real time media streams. When streams are created, everything that
is needed to compute them needs to happen very quickly so that we make sure that the stream can in fact
be created in real time.

When a tasks is required that may take some time and whose result is not required for the stream generation,
for instance when executing a `on_stop` or `on_connect` callback, it can be useful to execute this task in a _thread_.

Threads in liquidsoap are callback functions that are executed by an asynchronous queue. Here's an example:

```{.liquidsoap include="task-example.liq"}
```

By default, there are two type of queues available in liquidsoap:

- `generic` queues
- `non_blocking` queues

By convention, tasks that are known to be executing very fast should be sent to the
`non_blockin` queues and all the other tasks should be sent to the `generic` queue.

You can decide which queue to send tasks to by using the `queue` parameter of the
`thread.run` functions. Some other operators who also use threads can have a similar
parameter such as `thread_queue` for `request.dynamic` and `playlist`.

```{.liquidsoap include="task-with-queue.liq"}
```

You can also define your own named queue using the `settings.scheduler.queues` setting.
This is particularly useful for two applications:

- To control concurrent execution of specific tasks.
- To prevent deadlocks in cases some tasks depends on other tasks.

Typically, `autocue` data resolution is executed inside a `request` resolution. To
control the concurrency with which this CPU-intensive task is executed, we place them
in specific queues. The number of queues controls how many of these tasks can be executed
concurrently.

Also, this prevents a deadlock where all the request resolution fill up the available
`generic` queues, making it impossible for the autocue computation to finish, thus preventing
the request resolution from returning.
Loading

0 comments on commit 4a5f888

Please sign in to comment.