From fcd8bf48818ad00c8da2c5222f6c6c97baf97eaa Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Wed, 2 Oct 2024 10:31:14 -0500 Subject: [PATCH] Add generic thread queues, use them to implement autocue-specific queue (#4151) --- CHANGES.md | 2 + doc/content/liq/radiopi.liq | 6 - doc/content/liq/task-example.liq | 5 + doc/content/liq/task-with-queue.liq | 12 + doc/content/migrating.md | 28 +++ doc/content/threads.md | 45 ++++ doc/dune.inc | 297 +++++++++++++++++++++++++ src/core/builtins/builtins_settings.ml | 27 ++- src/core/builtins/builtins_socket.ml | 6 +- src/core/builtins/builtins_thread.ml | 22 +- src/core/decoder/external_decoder.ml | 2 +- src/core/file_watcher.inotify.ml | 2 +- src/core/file_watcher.mtime.ml | 8 +- src/core/harbor/harbor.ml | 10 +- src/core/io/ffmpeg_io.ml | 2 +- src/core/io/srt_io.ml | 4 +- src/core/operators/pipe.ml | 2 +- src/core/outputs/harbor_output.ml | 8 +- src/core/sources/request_dynamic.ml | 18 +- src/core/tools/external_input.ml | 2 +- src/core/tools/liqfm.ml | 2 +- src/core/tools/server.ml | 2 +- src/core/tools/tutils.ml | 108 +++++---- src/core/tools/tutils.mli | 7 +- src/libs/autocue.liq | 24 +- src/libs/extra/deprecations.liq | 9 +- src/libs/extra/native.liq | 2 +- src/libs/extra/visualization.liq | 4 +- src/libs/playlist.liq | 14 +- src/libs/request.liq | 21 +- src/libs/thread.liq | 16 +- tests/language/error.liq | 2 +- 32 files changed, 582 insertions(+), 137 deletions(-) create mode 100644 doc/content/liq/task-example.liq create mode 100644 doc/content/liq/task-with-queue.liq create mode 100644 doc/content/threads.md diff --git a/CHANGES.md b/CHANGES.md index 881114a78a..f47b5730a1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -34,6 +34,8 @@ New: - Added `video.canvas` to make it possible to position video elements independently of the rendered video size ([#3656](https://github.com/savonet/liquidsoap/pull/3656), [blog post](https://www.liquidsoap.info/blog/2024-02-10-video-canvas-and-ai/)) - Added cover manager from an original code by @vitoyucepi (#3651) +- Reworked scheduler queues logic, allow user-defined queues, add options to pick + the queue to send asynchronous tasks to (#4151) - Added non-interleaved API to `%ffmpeg` encoder, enabled by default when only one stream is encoded. - Allow trailing commas in record definition (#3300). diff --git a/doc/content/liq/radiopi.liq b/doc/content/liq/radiopi.liq index 06553b29bd..9dd5e57b70 100644 --- a/doc/content/liq/radiopi.liq +++ b/doc/content/liq/radiopi.liq @@ -16,12 +16,6 @@ settings.harbor.bind_addrs.set(["0.0.0.0"]) # Verbose logs log.level.set(4) -# We use the scheduler intensively, -# therefore we create many queues. -settings.scheduler.generic_queues.set(5) -settings.scheduler.fast_queues.set(3) -settings.scheduler.non_blocking_queues.set(3) - # === Settings === # The host to request files diff --git a/doc/content/liq/task-example.liq b/doc/content/liq/task-example.liq new file mode 100644 index 0000000000..b667f0083d --- /dev/null +++ b/doc/content/liq/task-example.liq @@ -0,0 +1,5 @@ +def connect_callback() = + ignore(http.post("http://host/on_connect")) +end + +thread.run(connect_callback) diff --git a/doc/content/liq/task-with-queue.liq b/doc/content/liq/task-with-queue.liq new file mode 100644 index 0000000000..d07692d499 --- /dev/null +++ b/doc/content/liq/task-with-queue.liq @@ -0,0 +1,12 @@ +# Add 3 foo queue +settings.scheduler.queues.set([ + ...settings.scheduler.queues(), + ("foo", 3) +]) + +def connect_callback() = + ignore(http.post("http://host/on_connect")) +end + +# Execute inside the foo queue +thread.run(queue="foo", connect_callback) diff --git a/doc/content/migrating.md b/doc/content/migrating.md index 5ec412fbb0..6f6aca2361 100644 --- a/doc/content/migrating.md +++ b/doc/content/migrating.md @@ -76,6 +76,34 @@ def transition(old, new) = end ``` +### Thread queues + +In order to improve issues with complex inter-dependent asynchronous tasks such as `autocue` data computation, +scheduler queues have been improved. + +User-provided named queues can now be created and used to send asynchronous tasks, making it possible to control +concurrency of certain classes of tasks and also to remedy any potential dependency between asynchronous tasks. + +Settings for queues have thus changed and now will look like this: + +```liquidsoap +# Add a custom queue with 4 workers, increase generic queues to 4: +settings.scheduler.queues.set([ + ...list.assoc,remove("generic", settings.scheduler.queues()), + ("generic", 4), + ("custom", 4) +] +``` + +The `fast` argument of the `thread.run.*` functions has been replaced by `queue`, telling the operator which queue should the +asynchronous tasks sent to. + +Likewise, `request.dynamic`, `playlist`, `single` etc. have also been updated to accept a `thread_queue` argument controlling +which asynchronous queue their request resolution tasks should be sent to. + +See [the original Pull Request)[https://github.com/savonet/liquidsoap/pull/4151) and [the threads page](threads.html) +for more details. + ### Replaygain - There is a new `metadata.replaygain` function that extracts the replay gain value in _dB_ from the metadata. diff --git a/doc/content/threads.md b/doc/content/threads.md new file mode 100644 index 0000000000..c30fc25d7b --- /dev/null +++ b/doc/content/threads.md @@ -0,0 +1,45 @@ +# Threads + +The main purpose of liquidsoap is to create real time media streams. When streams are created, everything that +is needed to compute them needs to happen very quickly so that we make sure that the stream can in fact +be created in real time. + +When a tasks is required that may take some time and whose result is not required for the stream generation, +for instance when executing a `on_stop` or `on_connect` callback, it can be useful to execute this task in a _thread_. + +Threads in liquidsoap are callback functions that are executed by an asynchronous queue. Here's an example: + +```{.liquidsoap include="task-example.liq"} + +``` + +By default, there are two type of queues available in liquidsoap: + +- `generic` queues +- `non_blocking` queues + +By convention, tasks that are known to be executing very fast should be sent to the +`non_blockin` queues and all the other tasks should be sent to the `generic` queue. + +You can decide which queue to send tasks to by using the `queue` parameter of the +`thread.run` functions. Some other operators who also use threads can have a similar +parameter such as `thread_queue` for `request.dynamic` and `playlist`. + +```{.liquidsoap include="task-with-queue.liq"} + +``` + +You can also define your own named queue using the `settings.scheduler.queues` setting. +This is particularly useful for two applications: + +- To control concurrent execution of specific tasks. +- To prevent deadlocks in cases some tasks depends on other tasks. + +Typically, `autocue` data resolution is executed inside a `request` resolution. To +control the concurrency with which this CPU-intensive task is executed, we place them +in specific queues. The number of queues controls how many of these tasks can be executed +concurrently. + +Also, this prevents a deadlock where all the request resolution fill up the available +`generic` queues, making it impossible for the autocue computation to finish, thus preventing +the request resolution from returning. diff --git a/doc/dune.inc b/doc/dune.inc index 0f1f2fdff0..eab98186d9 100644 --- a/doc/dune.inc +++ b/doc/dune.inc @@ -167,6 +167,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -295,6 +297,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -423,6 +427,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -551,6 +557,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -679,6 +687,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -807,6 +817,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -935,6 +947,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1063,6 +1077,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1191,6 +1207,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1319,6 +1337,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1447,6 +1467,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1575,6 +1597,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1703,6 +1727,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1831,6 +1857,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1959,6 +1987,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2087,6 +2117,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2215,6 +2247,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2343,6 +2377,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2471,6 +2507,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2599,6 +2637,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2727,6 +2767,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2855,6 +2897,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2983,6 +3027,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3111,6 +3157,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3239,6 +3287,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3367,6 +3417,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3495,6 +3547,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3623,6 +3677,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3751,6 +3807,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3879,6 +3937,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4007,6 +4067,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4135,6 +4197,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4263,6 +4327,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4391,6 +4457,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4519,6 +4587,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4647,6 +4717,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4775,6 +4847,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4903,6 +4977,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5031,6 +5107,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5159,6 +5237,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5287,6 +5367,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5415,6 +5497,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5543,6 +5627,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5671,6 +5757,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5799,6 +5887,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5927,6 +6017,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6055,6 +6147,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6183,6 +6277,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6311,6 +6407,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6439,6 +6537,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6567,6 +6667,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6695,6 +6797,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6823,6 +6927,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6951,6 +7057,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7079,6 +7187,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7207,6 +7317,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7335,6 +7447,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7463,6 +7577,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7591,6 +7707,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7719,6 +7837,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7847,6 +7967,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7975,6 +8097,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8103,6 +8227,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8231,6 +8357,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8359,6 +8487,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8487,6 +8617,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8615,6 +8747,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8743,6 +8877,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8871,6 +9007,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8999,6 +9137,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -9025,6 +9165,136 @@ ) ) +(rule + (alias doc) + (package liquidsoap) + (enabled_if (not %{bin-available:pandoc})) + (deps (:no_pandoc no-pandoc)) + (target threads.html) + (action (run cp %{no_pandoc} %{target})) +) + +(rule + (alias doc) + (package liquidsoap) + (enabled_if %{bin-available:pandoc}) + (deps + liquidsoap.xml + language.dtd + template.html + content/liq/append-silence.liq + content/liq/archive-cleaner.liq + content/liq/basic-radio.liq + content/liq/beets-amplify.liq + content/liq/beets-protocol-short.liq + content/liq/beets-protocol.liq + content/liq/beets-source.liq + content/liq/blank-detect.liq + content/liq/blank-sorry.liq + content/liq/complete-case.liq + content/liq/cross.custom.liq + content/liq/crossfade.liq + content/liq/decoder-faad.liq + content/liq/decoder-flac.liq + content/liq/decoder-metaflac.liq + content/liq/dump-hourly.liq + content/liq/dump-hourly2.liq + content/liq/dynamic-source.liq + content/liq/external-output.file.liq + content/liq/fallback.liq + content/liq/ffmpeg-filter-dynamic-volume.liq + content/liq/ffmpeg-filter-flanger-highpass.liq + content/liq/ffmpeg-filter-hflip.liq + content/liq/ffmpeg-filter-hflip2.liq + content/liq/ffmpeg-filter-parallel-flanger-highpass.liq + content/liq/ffmpeg-live-switch.liq + content/liq/ffmpeg-relay-ondemand.liq + content/liq/ffmpeg-relay.liq + content/liq/ffmpeg-shared-encoding-rtmp.liq + content/liq/ffmpeg-shared-encoding.liq + content/liq/fixed-time1.liq + content/liq/fixed-time2.liq + content/liq/frame-size.liq + content/liq/harbor-auth.liq + content/liq/harbor-dynamic.liq + content/liq/harbor-insert-metadata.liq + content/liq/harbor-metadata.liq + content/liq/harbor-redirect.liq + content/liq/harbor-simple.liq + content/liq/harbor-usage.liq + content/liq/harbor.http.register.liq + content/liq/harbor.http.response.liq + content/liq/hls-metadata.liq + content/liq/hls-mp4.liq + content/liq/http-input.liq + content/liq/icy-update.liq + content/liq/input.mplayer.liq + content/liq/jingle-hour.liq + content/liq/json-ex.liq + content/liq/json-stringify.liq + content/liq/json1.liq + content/liq/live-switch.liq + content/liq/medialib-predicate.liq + content/liq/medialib.liq + content/liq/medialib.sqlite.liq + content/liq/multitrack-add-video-track.liq + content/liq/multitrack-add-video-track2.liq + content/liq/multitrack-default-video-track.liq + content/liq/multitrack.liq + content/liq/multitrack2.liq + content/liq/multitrack3.liq + content/liq/output.file.hls.liq + content/liq/playlists.liq + content/liq/prometheus-callback.liq + content/liq/prometheus-settings.liq + content/liq/radiopi.liq + content/liq/re-encode.liq + content/liq/regular.liq + content/liq/replaygain-metadata.liq + content/liq/replaygain-playlist.liq + content/liq/request.dynamic.liq + content/liq/rtmp.liq + content/liq/samplerate3.liq + content/liq/scheduling.liq + content/liq/seek-telnet.liq + content/liq/settings.liq + content/liq/shoutcast.liq + content/liq/single.liq + content/liq/source-cue.liq + content/liq/space_overhead.liq + content/liq/split-cue.liq + content/liq/sqlite.liq + content/liq/srt-receiver.liq + content/liq/srt-sender.liq + content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq + content/liq/transcoding.liq + content/liq/video-anonymizer.liq + content/liq/video-bluescreen.liq + content/liq/video-canvas-example.liq + content/liq/video-default-canvas.liq + content/liq/video-in-video.liq + content/liq/video-logo.liq + content/liq/video-osc.liq + content/liq/video-simple.liq + content/liq/video-static.liq + content/liq/video-text.liq + content/liq/video-transition.liq + content/liq/video-weather.liq + content/liq/video-webcam.liq + (:md content/threads.md) + ) + (target threads.html) + (action + (pipe-stdout + (run pandoc %{md} -t json) + (run pandoc-include --directory content/liq) + (run pandoc -f json --syntax-definition=liquidsoap.xml --highlight=pygments --metadata pagetitle=threads --template=template.html -o %{target}) + ) + ) +) + (rule (alias doc) (package liquidsoap) @@ -9127,6 +9397,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -9255,6 +9527,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -9383,6 +9657,8 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq + content/liq/task-example.liq + content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -10259,6 +10535,26 @@ (action (run %{bin:liquidsoap} --check --no-fallible-check content/liq/switch-show.liq)) ) +(rule + (alias doctest) + (package liquidsoap) + (deps + (source_tree ../src/libs) + (:test_liq content/liq/task-example.liq) + ) + (action (run %{bin:liquidsoap} --check --no-fallible-check content/liq/task-example.liq)) +) + +(rule + (alias doctest) + (package liquidsoap) + (deps + (source_tree ../src/libs) + (:test_liq content/liq/task-with-queue.liq) + ) + (action (run %{bin:liquidsoap} --check --no-fallible-check content/liq/task-with-queue.liq)) +) + (rule (alias doctest) (package liquidsoap) @@ -10494,6 +10790,7 @@ (stereotool.html as html/stereotool.html) (stream_content.html as html/stream_content.html) (strings_encoding.html as html/strings_encoding.html) + (threads.html as html/threads.html) (video-static.html as html/video-static.html) (video.html as html/video.html) (yaml.html as html/yaml.html) diff --git a/src/core/builtins/builtins_settings.ml b/src/core/builtins/builtins_settings.ml index db1a4e1d20..9ba65972d3 100644 --- a/src/core/builtins/builtins_settings.ml +++ b/src/core/builtins/builtins_settings.ml @@ -92,22 +92,32 @@ let settings_module = | ty, true -> Lang.fun_t [] ty | ty, false -> Lang.fun_t [] (Lang.nullable_t ty) in - let rec get_type ?(sub = []) conf = + let rec get_type ?(sub = []) ~label conf = let ty, has_default_value = get_conf_type conf in Lang.method_t (get_t ~has_default_value ty) - (set_t ty @ leaf_types conf @ sub) + (set_t ty @ leaf_types conf @ sub + @ + if label = "scheduler" then + [ + ( "queues", + ( [], + Lang.ref_t + (Lang.list_t (Lang.product_t Lang.string_t Lang.int_t)) ), + "Scheduler queue configuration." ); + ] + else []) and leaf_types conf = List.map (fun label -> - let ty = get_type (conf#path [label]) in + let ty = get_type ~label (conf#path [label]) in let label = Utils.normalize_parameter_string label in ( label, ([], ty), Printf.sprintf "Entry for configuration key %s" label )) conf#subs in - let settings_t = get_type Configure.conf in + let settings_t = get_type ~label:"settings" Configure.conf in let get_v fn conv_to conv_from conf = let get = Lang.val_fun [] (fun _ -> @@ -122,7 +132,7 @@ let settings_module = in (get, Some set) in - let rec get_value ?(sub = []) conf = + let rec get_value ?(sub = []) ~label conf = let to_v fn conv_to conv_from = try ignore (fn conf); @@ -144,7 +154,8 @@ let settings_module = with Found v -> v in Lang.meth get_v - ((if set_v <> None then [("set", Option.get set_v)] else []) + ((if label = "scheduler" then [("queues", Tutils.queues_conf)] else []) + @ (if set_v <> None then [("set", Option.get set_v)] else []) @ [ ("description", Lang.string (String.trim conf#descr)); ( "comments", @@ -154,11 +165,11 @@ let settings_module = and leaf_values conf = List.map (fun label -> - let v = get_value (conf#path [label]) in + let v = get_value ~label (conf#path [label]) in (Utils.normalize_parameter_string label, v)) conf#subs in - settings := get_value Configure.conf; + settings := get_value ~label:"settings" Configure.conf; ignore (Lang.add_builtin_value ~category:`Settings "settings" ~descr:"All settings." ~flags:[`Hidden] !settings settings_t)) diff --git a/src/core/builtins/builtins_socket.ml b/src/core/builtins/builtins_socket.ml index f001c4a8c6..4b278e8523 100644 --- a/src/core/builtins/builtins_socket.ml +++ b/src/core/builtins/builtins_socket.ml @@ -230,11 +230,7 @@ module Socket_value = struct [] in Duppy.Task.add Tutils.scheduler - { - Duppy.Task.priority = `Maybe_blocking; - events; - handler = fn; - }; + { Duppy.Task.priority = `Generic; events; handler = fn }; Lang.unit) ); ] in diff --git a/src/core/builtins/builtins_thread.ml b/src/core/builtins/builtins_thread.ml index d567c0f898..0866e9d3fc 100644 --- a/src/core/builtins/builtins_thread.ml +++ b/src/core/builtins/builtins_thread.ml @@ -40,16 +40,14 @@ let _ = let _ = Lang.add_builtin ~base:thread_run "recurrent" ~category:`Programming [ - ( "fast", - Lang.bool_t, - Some (Lang.bool true), + ( "queue", + Lang.string_t, + Some (Lang.string "generic"), Some - "Whether the thread is supposed to return quickly or not. Typically, \ - blocking tasks (e.g. fetching data over the internet) should not be \ - considered to be fast. When set to `false` its priority will be \ - lowered below that of request resolutions and fast timeouts. This \ - is only effective if you set a dedicated queue for fast tasks, see \ - the \"scheduler\" settings for more details." ); + "Queue to use for the task. Should be one of: `\"generic\"` or \ + `\"non_blocking\"`. Non blocking should be reserved for tasks that \ + are known to complete quickly. You can also use declared via \ + `settings.scheduler.queues`." ); ( "delay", Lang.float_t, Some (Lang.float 0.), @@ -74,8 +72,10 @@ let _ = let delay = Lang.to_float (List.assoc "delay" p) in let f = List.assoc "" p in let priority = - if Lang.to_bool (List.assoc "fast" p) then `Maybe_blocking - else `Blocking + match Lang.to_string (List.assoc "queue" p) with + | "generic" -> `Generic + | "non_blocking" -> `Non_blocking + | n -> `Named n in let on_error = Lang.to_option (List.assoc "on_error" p) in let on_error = diff --git a/src/core/decoder/external_decoder.ml b/src/core/decoder/external_decoder.ml index 2a60670808..efca33e872 100644 --- a/src/core/decoder/external_decoder.ml +++ b/src/core/decoder/external_decoder.ml @@ -49,7 +49,7 @@ let external_input process input = in let log s = log#important "%s" s in (* reading from input is blocking.. *) - let priority = `Blocking in + let priority = `Generic in let process = Process_handler.run ~priority ~on_stdin ~on_stderr ~log process in diff --git a/src/core/file_watcher.inotify.ml b/src/core/file_watcher.inotify.ml index 8ed0e8da5e..cf36f66de0 100644 --- a/src/core/file_watcher.inotify.ml +++ b/src/core/file_watcher.inotify.ml @@ -51,7 +51,7 @@ let rec watchdog () = events; [watchdog ()]) in - { Duppy.Task.priority = `Maybe_blocking; events = [`Read fd]; handler } + { Duppy.Task.priority = `Generic; events = [`Read fd]; handler } let watch : watch = fun ~pos e file f -> diff --git a/src/core/file_watcher.mtime.ml b/src/core/file_watcher.mtime.ml index 58a89af598..2df0c6ea9a 100644 --- a/src/core/file_watcher.mtime.ml +++ b/src/core/file_watcher.mtime.ml @@ -61,7 +61,7 @@ let rec handler _ = (Printf.sprintf "Error while executing file watcher callback: %s" (Printexc.to_string exn))) !watched; - [{ Duppy.Task.priority = `Maybe_blocking; events = [`Delay 1.]; handler }]) + [{ Duppy.Task.priority = `Generic; events = [`Delay 1.]; handler }]) () let watch : watch = @@ -73,11 +73,7 @@ let watch : watch = if not !launched then begin launched := true; Duppy.Task.add Tutils.scheduler - { - Duppy.Task.priority = `Maybe_blocking; - events = [`Delay 1.]; - handler; - } + { Duppy.Task.priority = `Generic; events = [`Delay 1.]; handler } end; let mtime = try file_mtime file with _ -> 0. in watched := { file; mtime; callback } :: !watched; diff --git a/src/core/harbor/harbor.ml b/src/core/harbor/harbor.ml index a2809de4a3..3c62adde0e 100644 --- a/src/core/harbor/harbor.ml +++ b/src/core/harbor/harbor.ml @@ -374,7 +374,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct simple_reply "No / mountpoint\r\n\r\n" in (* Authentication can be blocking. *) - Duppy.Monad.Io.exec ~priority:`Maybe_blocking h + Duppy.Monad.Io.exec ~priority:`Generic h (let user, auth_f = s#login in let user = if requested_user = "" then user else requested_user in if auth_f ~socket:h.Duppy.Monad.Io.socket user password then @@ -488,7 +488,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct Hashtbl.fold (fun lbl k query -> (lbl, k) :: query) query []) args in - Duppy.Monad.Io.exec ~priority:`Maybe_blocking h + Duppy.Monad.Io.exec ~priority:`Generic h (http_auth_check ?query ~login h.Duppy.Monad.Io.socket headers) (* We do not implement anything with this handler for now. *) @@ -631,7 +631,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct (Bytes.of_string (Websocket.upgrade headers)) in let* stype, huri, user, password = - Duppy.Monad.Io.exec ~priority:`Blocking h + Duppy.Monad.Io.exec ~priority:`Generic h (read_hello h.Duppy.Monad.Io.socket) in log#info "Mime type: %s" stype; @@ -899,7 +899,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct fun timeout -> fst (Http.read_chunked ~timeout socket) | _ -> fun _ -> "" in - Duppy.Monad.Io.exec ~priority:`Maybe_blocking h + Duppy.Monad.Io.exec ~priority:`Generic h (handler ~protocol ~meth ~headers ~data ~socket:h.Duppy.Monad.Io.socket ~query base_uri) | e -> @@ -969,7 +969,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct ~priority: (* ICY = true means that authentication has already happened *) - `Maybe_blocking h + `Generic h (let valid_user, auth_f = s#login in if not diff --git a/src/core/io/ffmpeg_io.ml b/src/core/io/ffmpeg_io.ml index 0be9875102..052690de3b 100644 --- a/src/core/io/ffmpeg_io.ml +++ b/src/core/io/ffmpeg_io.ml @@ -200,7 +200,7 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug | Some t -> Duppy.Async.wake_up t | None -> let t = - Duppy.Async.add ~priority:`Blocking Tutils.scheduler + Duppy.Async.add ~priority:`Generic Tutils.scheduler self#connect_task in Atomic.set connect_task (Some t); diff --git a/src/core/io/srt_io.ml b/src/core/io/srt_io.ml index fe177dafe8..f870441e6d 100644 --- a/src/core/io/srt_io.ml +++ b/src/core/io/srt_io.ml @@ -355,7 +355,7 @@ module Poll = struct (Printexc.to_string exn)); -1. - let task = Duppy.Async.add ~priority:`Blocking Tutils.scheduler process + let task = Duppy.Async.add ~priority:`Generic Tutils.scheduler process let add_socket ~mode socket fn = Srt.setsockflag socket Srt.sndsyn false; @@ -529,7 +529,7 @@ class virtual caller ~enforced_encryption ~pbkeylen ~passphrase ~streamid | Some t -> Duppy.Async.wake_up t | None -> let t = - Duppy.Async.add ~priority:`Blocking Tutils.scheduler + Duppy.Async.add ~priority:`Generic Tutils.scheduler self#connect_fn in connect_task <- Some t; diff --git a/src/core/operators/pipe.ml b/src/core/operators/pipe.ml index 480ef0649b..d371255dba 100644 --- a/src/core/operators/pipe.ml +++ b/src/core/operators/pipe.ml @@ -254,7 +254,7 @@ class pipe ~replay_delay ~data_len ~process ~bufferize ~max ~restart Some (Process_handler.run ~on_stop:self#on_stop ~on_start:self#on_start ~on_stdout:self#on_stdout ~on_stdin:self#on_stdin - ~priority:`Blocking ~on_stderr:self#on_stderr ~log process)) + ~priority:`Generic ~on_stderr:self#on_stderr ~log process)) method! abort_track = source#abort_track diff --git a/src/core/outputs/harbor_output.ml b/src/core/outputs/harbor_output.ml index fab2ffca67..ff3534c27c 100644 --- a/src/core/outputs/harbor_output.ml +++ b/src/core/outputs/harbor_output.ml @@ -261,7 +261,7 @@ let add_meta c data = let rec client_task c = let* data = - Duppy.Monad.Io.exec ~priority:`Maybe_blocking c.handler + Duppy.Monad.Io.exec ~priority:`Generic c.handler (Mutex_utils.mutexify c.mutex (fun () -> let buflen = Strings.Mutable.length c.buffer in @@ -283,7 +283,7 @@ let rec client_task c = c.handler (Strings.to_bytes data) in let* state = - Duppy.Monad.Io.exec ~priority:`Maybe_blocking c.handler + Duppy.Monad.Io.exec ~priority:`Generic c.handler (let ret = Mutex_utils.mutexify c.mutex (fun () -> c.state) () in Duppy.Monad.return ret) in @@ -521,7 +521,7 @@ class output p = || auth_function <> None then ( let default_user = Option.value default_user ~default:"" in - Duppy.Monad.Io.exec ~priority:`Maybe_blocking handler + Duppy.Monad.Io.exec ~priority:`Generic handler (Harbor.http_auth_check ~query ~login:(default_user, login) s headers)) else Duppy.Monad.return ()) @@ -532,7 +532,7 @@ class output p = Harbor.reply s | _ -> assert false) in - Duppy.Monad.Io.exec ~priority:`Maybe_blocking handler + Duppy.Monad.Io.exec ~priority:`Generic handler (Harbor.relayed reply (fun () -> self#log#info "Client %s connected" ip; Mutex_utils.mutexify clients_m diff --git a/src/core/sources/request_dynamic.ml b/src/core/sources/request_dynamic.ml index 4408f94b98..7dddd79245 100644 --- a/src/core/sources/request_dynamic.ml +++ b/src/core/sources/request_dynamic.ml @@ -26,9 +26,6 @@ module Queue = Liquidsoap_lang.Queues.Queue let conf_prefetch = Dtools.Conf.int ~p:(Request.conf#plug "prefetch") ~d:1 "Default prefetch" -(* Scheduler priority for request resolutions. *) -let priority = `Maybe_blocking - type queue_item = { request : Request.t; (* in seconds *) @@ -62,7 +59,8 @@ let () = Lifecycle.before_core_shutdown ~name:"request.dynamic shutdown" (fun () -> Atomic.set should_fail true) -class dynamic ~retry_delay ~available (f : Lang.value) prefetch timeout = +class dynamic ~priority ~retry_delay ~available (f : Lang.value) prefetch + timeout = let available () = (not (Atomic.get should_fail)) && available () in object (self) inherit source ~name:"request.dynamic" () @@ -340,6 +338,10 @@ let _ = ~descr:"Play request dynamically created by a given function." [ ("", Lang.fun_t [] (Lang.nullable_t Request.Value.t), None, None); + ( "thread_queue", + Lang.string_t, + Some (Lang.string "generic"), + Some "Queue used to resolve requests." ); ( "retry_delay", Lang.getter_t Lang.float_t, Some (Lang.float 0.1), @@ -429,5 +431,11 @@ let _ = let f = List.assoc "" p in let available = Lang.to_bool_getter (List.assoc "available" p) in let retry_delay = Lang.to_float_getter (List.assoc "retry_delay" p) in + let priority = + match Lang.to_string (List.assoc "thread_queue" p) with + | "generic" -> `Generic + | "non_blocking" -> `Non_blocking + | n -> `Named n + in let l, t = extract_queued_params p in - new dynamic ~available ~retry_delay f l t) + new dynamic ~available ~priority ~retry_delay f l t) diff --git a/src/core/tools/external_input.ml b/src/core/tools/external_input.ml index 4df1a70239..e4a6c155bb 100644 --- a/src/core/tools/external_input.ml +++ b/src/core/tools/external_input.ml @@ -67,7 +67,7 @@ class virtual base ~name ~restart ~restart_on_error ~on_data ?read_header let log s = self#log#important "%s" s in process <- Some - (Process_handler.run ~priority:`Blocking ~on_stop ~on_stdout + (Process_handler.run ~priority:`Generic ~on_stop ~on_stdout ~on_stderr ~log (command ()))); self#on_sleep (fun () -> diff --git a/src/core/tools/liqfm.ml b/src/core/tools/liqfm.ml index d4c0eeddad..8882fb1b3b 100644 --- a/src/core/tools/liqfm.ml +++ b/src/core/tools/liqfm.ml @@ -231,7 +231,7 @@ let init host = reason (Printexc.to_string e); -1. in - let task = Duppy.Async.add ~priority:`Blocking Tutils.scheduler do_submit in + let task = Duppy.Async.add ~priority:`Generic Tutils.scheduler do_submit in { task; submit_m; submissions } let submit (user, password) task length source stype songs = diff --git a/src/core/tools/server.ml b/src/core/tools/server.ml index c0d495830f..bcb19e0386 100644 --- a/src/core/tools/server.ml +++ b/src/core/tools/server.ml @@ -317,7 +317,7 @@ let handle_client socket ip = | e -> Duppy.Monad.raise e in let* ans = - Duppy.Monad.Io.exec ~priority:`Maybe_blocking h (run (fun () -> exec req)) + Duppy.Monad.Io.exec ~priority:`Generic h (run (fun () -> exec req)) in let* () = let* () = diff --git a/src/core/tools/tutils.ml b/src/core/tools/tutils.ml index 2257120b15..cef2b88e49 100644 --- a/src/core/tools/tutils.ml +++ b/src/core/tools/tutils.ml @@ -20,6 +20,9 @@ *****************************************************************************) +module Methods = Liquidsoap_lang.Methods +module Lang = Liquidsoap_lang.Lang + let conf_scheduler = Dtools.Conf.void ~p:(Configure.conf#plug "scheduler") @@ -68,43 +71,6 @@ let exit () = | `Done (`Error (bt, err)) -> Printexc.raise_with_backtrace err bt | _ -> exit (exit_code ()) -let generic_queues = - Dtools.Conf.int - ~p:(conf_scheduler#plug "generic_queues") - ~d:5 "Generic queues" - ~comments: - [ - "Number of event queues accepting any kind of task."; - "There should at least be one. Having more can be useful to make sure"; - "that trivial request resolutions (local files) are not delayed"; - "because of a stalled download. But N stalled download can block"; - "N queues anyway."; - ] - -let fast_queues = - Dtools.Conf.int - ~p:(conf_scheduler#plug "fast_queues") - ~d:0 "Fast queues" - ~comments: - [ - "Number of queues that are dedicated to fast tasks."; - "It might be useful to create some if your request resolutions,"; - "or some user defined tasks (cf `thread.run`), are"; - "delayed too much because of slow tasks blocking the generic queues,"; - "such as last.fm submissions or slow `thread.run` handlers."; - ] - -let non_blocking_queues = - Dtools.Conf.int - ~p:(conf_scheduler#plug "non_blocking_queues") - ~d:2 "Non-blocking queues" - ~comments: - [ - "Number of queues dedicated to internal non-blocking tasks."; - "These are only started if such tasks are needed."; - "There should be at least one."; - ] - let scheduler_log = Dtools.Conf.bool ~p:(conf_scheduler#plug "log") @@ -133,6 +99,26 @@ end) let all = ref Set.empty let queues = ref Set.empty +let queues_conf_ref = Atomic.make [("generic", 2); ("non_blocking", 2)] + +let queues_conf = + Lang.reference + (fun () -> + let v = Atomic.get queues_conf_ref in + Lang.list + (List.map + (fun (lbl, c) -> Lang.product (Lang.string lbl) (Lang.int c)) + v)) + (fun v -> + let v = Lang.to_list v in + let v = + List.map + (fun v -> + let lbl, c = Lang.to_product v in + (Lang.to_string lbl, Lang.to_int c)) + v + in + Atomic.set queues_conf_ref v) let join_all ~set () = let rec f () = @@ -226,8 +212,8 @@ let create ~queue f x s = () type priority = - [ `Blocking (** For example a last.fm submission. *) - | `Maybe_blocking (** Request resolutions vary a lot. *) + [ `Generic (** Generic queues accept all tasks. *) + | `Named of string (** Named queues only accept tasks with their priority. *) | `Non_blocking (** Non-blocking tasks like the server. *) ] let error_handlers = Stack.create () @@ -266,13 +252,14 @@ let scheduler_log n = fun m -> log#info "%s" m) else fun _ -> () -let new_queue ?priorities ~name () = +let new_queue ~priority ~name () = let qlog = scheduler_log name in - let queue () = - match priorities with - | None -> Duppy.queue scheduler ~log:qlog name - | Some priorities -> Duppy.queue scheduler ~log:qlog ~priorities name + let priorities p = + match (priority, p) with + | `Generic, (`Generic | `Non_blocking) -> true + | v, v' -> v = v' in + let queue () = Duppy.queue scheduler ~log:qlog ~priorities name in ignore (create ~queue:true queue () name) let create f x name = create ~queue:false f x name @@ -280,18 +267,27 @@ let join_all () = join_all ~set:all () let start () = if Atomic.compare_and_set state `Idle `Starting then ( - for i = 1 to generic_queues#get do - let name = Printf.sprintf "generic queue #%d" i in - new_queue ~name () - done; - for i = 1 to fast_queues#get do - let name = Printf.sprintf "fast queue #%d" i in - new_queue ~name ~priorities:(fun x -> x = `Maybe_blocking) () - done; - for i = 1 to non_blocking_queues#get do - let name = Printf.sprintf "non-blocking queue #%d" i in - new_queue ~priorities:(fun x -> x = `Non_blocking) ~name () - done) + let queues = Methods.from_list (Atomic.get queues_conf_ref) in + Methods.iter + (fun priority count -> + let priority = + match priority with + | "generic" -> `Generic + | "non_blocking" -> `Non_blocking + | n -> `Named n + in + for i = 1 to count do + let name = + Printf.sprintf "%s queue #%d" + (match priority with + | `Generic -> "generic" + | `Named n -> n + | `Non_blocking -> "non-blocking") + i + in + new_queue ~priority ~name () + done) + queues) (** Waits for [f()] to become true on condition [c]. *) let wait c m f = diff --git a/src/core/tools/tutils.mli b/src/core/tools/tutils.mli index c9d91f9e49..2059b28777 100644 --- a/src/core/tools/tutils.mli +++ b/src/core/tools/tutils.mli @@ -55,10 +55,13 @@ val join_all : unit -> unit (** Priorities for the different scheduler usages. *) type priority = - [ `Blocking (** For example a last.fm submission. *) - | `Maybe_blocking (** Request resolutions vary a lot. *) + [ `Generic (** Generic queues accept all tasks. *) + | `Named of string (** Named queues only accept tasks with their priority. *) | `Non_blocking (** Non-blocking tasks like the server. *) ] +(** Queues configuration. *) +val queues_conf : Liquidsoap_lang.Lang.value + (** task scheduler *) val scheduler : priority Duppy.scheduler diff --git a/src/libs/autocue.liq b/src/libs/autocue.liq index 52beda3bc1..c5738937ee 100644 --- a/src/libs/autocue.liq +++ b/src/libs/autocue.liq @@ -58,6 +58,25 @@ let settings.autocue.internal.metadata_override = ] ) +let settings.autocue.internal.queues_count = + settings.make( + description= + "Number of dedicated queues for resolving autocue data using the internal \ + implementation", + 1 + ) + +def settings.autocue.internal.queues_count.set(c) = + settings.scheduler.queues := + [ + ...list.assoc.remove("autocue", settings.scheduler.queues()), + ("autocue", c) + ] + settings.autocue.internal.queues_count.set(c) +end + +settings.autocue.internal.queues_count.set(1) + let settings.autocue.internal.lufs_target = settings.make( description= @@ -187,7 +206,10 @@ def autocue.internal.ebur128(~ratio=50., ~timeout=10., filename) = ) [] else - s = request.once(request.create(resolve_metadata=false, filename)) + s = + request.once( + thread_queue="autocue", request.create(resolve_metadata=false, filename) + ) frames = ref([]) def ebur128(s) = diff --git a/src/libs/extra/deprecations.liq b/src/libs/extra/deprecations.liq index 9ac8638b4d..5337a4555c 100644 --- a/src/libs/extra/deprecations.liq +++ b/src/libs/extra/deprecations.liq @@ -76,9 +76,10 @@ end # Deprecated: this function has been replaced by `thread.run.recurrent`. # @flag deprecated -def add_timeout(~fast=true, delay, f) = +def add_timeout(~fast, delay, f) = deprecated("add_timeout", "thread.run.recurrent") - thread.run.recurrent(fast=fast, delay=delay, f) + ignore(fast or true) + thread.run.recurrent(queue="generic", delay=delay, f) end # Deprecated: this function has been replaced by `thread.when`. @@ -314,7 +315,7 @@ def register_flow( ping_period end - thread.run.recurrent(fast=false, delay=ping_period, ping) + thread.run.recurrent(delay=ping_period, ping) # Register streams def register_stream(format_url) = @@ -339,7 +340,7 @@ def register_flow( artist = m["artist"] title = m["title"] params = [("m_title", title), ("m_artist", artist)] - thread.run(fast=false, {request(cmd="metadata", params=params)}) + thread.run({request(cmd="metadata", params=params)}) end s.on_metadata(metadata) diff --git a/src/libs/extra/native.liq b/src/libs/extra/native.liq index 3f7b1f359b..4cfdfa5f86 100644 --- a/src/libs/extra/native.liq +++ b/src/libs/extra/native.liq @@ -170,7 +170,7 @@ def native.request.dynamic(%argsof(request.dynamic), f) = if list.length(queue()) < prefetch then ignore(fetch()) end end - thread.run(every=retry_delay, fill) + thread.run(queue=thread_queue, every=retry_delay, fill) # Source def s() = diff --git a/src/libs/extra/visualization.liq b/src/libs/extra/visualization.liq index 20a662ca9b..93b441e8d1 100644 --- a/src/libs/extra/visualization.liq +++ b/src/libs/extra/visualization.liq @@ -28,7 +28,7 @@ def vumeter(~rms_min=-25., ~rms_max=-5., ~window=0.5, ~scroll=false, s) = print(newline=false, bar()) end - thread.run(fast=true, every=window, display) + thread.run(queue="non_blocking", every=window, display) s end @@ -62,7 +62,7 @@ def video.vumeter( width := int_of_float(x * float_of_int(video.frame.width())) end - thread.run(fast=true, every=window, update) + thread.run(queue="non_blocking", every=window, update) s = video.add_rectangle(width=width, height=height, color=color, s) video.persistence(duration=persistence, s) end diff --git a/src/libs/playlist.liq b/src/libs/playlist.liq index 2a011104cb..2117b7ed90 100644 --- a/src/libs/playlist.liq +++ b/src/libs/playlist.liq @@ -185,6 +185,7 @@ let stdlib_native = native # whole round ("randomize" mode), or pick a random file in the playlist each time \ # ("random" mode). # @param ~native Use native implementation, when available. +# @param ~thread_queue Queue used to resolve requests. # @param ~on_loop Function executed when the playlist is about to loop. # @param ~on_done Function executed when the playlist is finished. # @param ~max_fail When this number of requests fail to resolve, the whole playlists is considered as failed and `on_fail` is called. @@ -197,6 +198,7 @@ let stdlib_native = native # @method remaining_files Songs remaining to be played. def playlist.list( ~id=null(), + ~thread_queue="generic", ~check_next=null(), ~prefetch=null(), ~loop=true, @@ -263,6 +265,7 @@ def playlist.list( fun () -> request.dynamic( id=id, + thread_queue=thread_queue, prefetch=prefetch, timeout=timeout, retry_delay=1., @@ -272,7 +275,13 @@ def playlist.list( s = %ifdef native - if native then stdlib_native.request.dynamic(id=id, next) else default() end + if + native + then + stdlib_native.request.dynamic(id=id, thread_queue=thread_queue, next) + else + default() + end %else default() %endif @@ -455,6 +464,7 @@ end # the playlist), "rounds", "seconds" or "watch" (reload the file whenever it is \ # changed). # @param ~timeout Timeout (in sec.) for a single download. +# @param ~thread_queue Queue used to resolve requests. # @param ~cue_in_metadata Metadata for cue in points. Disabled if `null`. # @param ~cue_out_metadata Metadata for cue out points. Disabled if `null`. # @param uri Playlist URI. @@ -463,6 +473,7 @@ end # @method remaining_files Songs remaining to be played. def replaces playlist( ~id=null(), + ~thread_queue="generic", ~check_next=null(), ~prefetch=null(), ~loop=true, @@ -582,6 +593,7 @@ def replaces playlist( s = playlist.list( id=id, + thread_queue=thread_queue, check_next=check_next, prefetch=prefetch, loop=loop, diff --git a/src/libs/request.liq b/src/libs/request.liq index fc006dfbf7..511d312b5b 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -27,6 +27,7 @@ end # @param ~prefetch How many requests should be queued in advance. # @param ~native Use native implementation, when available. # @param ~queue Initial queue of requests. +# @param ~thread_queue Queue used to resolve requests. # @param ~timeout Timeout (in sec.) for a single download. # @method add This method is internal and should not be used. Consider using `push` instead. # @method push Push a request on the request queue. @@ -37,6 +38,7 @@ def request.queue( ~prefetch=null(), ~native=false, ~queue=[], + ~thread_queue="generic", ~timeout=20. ) = ignore(native) @@ -77,6 +79,7 @@ def request.queue( request.dynamic( id=id, prefetch=prefetch, + thread_queue=thread_queue, timeout=timeout, available={not list.is_empty(queue())}, next @@ -87,7 +90,9 @@ def request.queue( if native then - stdlib_native.request.dynamic(id=id, timeout=timeout, next) + stdlib_native.request.dynamic( + id=id, thread_queue=thread_queue, timeout=timeout, next + ) else default() end @@ -204,9 +209,15 @@ end # Play a request once and become unavailable. # @category Source / Input +# @param ~thread_queue Queue used to resolve requests. # @param ~timeout Timeout in seconds for resolving the request. # @param r Request to play. -def request.once(~id=null("request.once"), ~timeout=20., r) = +def request.once( + ~id=null("request.once"), + ~thread_queue="generic", + ~timeout=20., + r +) = id = string.id.default(default="request.once", id) done = ref(false) @@ -222,7 +233,7 @@ def request.once(~id=null("request.once"), ~timeout=20., r) = if request.resolve(r, timeout=timeout) then - request.queue(queue=[r]) + request.queue(thread_queue=thread_queue, queue=[r]) else log.critical( label=id, @@ -246,6 +257,7 @@ end # static, and time is not. # @category Source / Input # @param ~prefetch How many requests should be queued in advance. +# @param ~thread_queue Queue used to resolve requests. # @param ~timeout Timeout (in sec.) for a single download. # @param ~fallible Enforce fallibility of the request. # @param r Request @@ -253,6 +265,7 @@ def request.single( ~id=null("request.single"), ~prefetch=null(), ~timeout=20., + ~thread_queue="generic", ~fallible=null(), r ) = @@ -329,7 +342,7 @@ def request.single( static_request() ?? getter.get(r) end - s = request.dynamic(prefetch=prefetch, next) + s = request.dynamic(prefetch=prefetch, thread_queue=thread_queue, next) if infallible then s.set_queue([next()]) end s end diff --git a/src/libs/thread.liq b/src/libs/thread.liq index d623577108..45c4dbe699 100644 --- a/src/libs/thread.liq +++ b/src/libs/thread.liq @@ -1,13 +1,15 @@ # Run a function in a separate thread. # @category Programming -# @param ~fast Whether the thread is supposed to return quickly or not. Typically, blocking tasks (e.g. fetching data over the internet) should not be considered to be fast. When set to `false` its priority will be lowered below that of request resolutions and fast timeouts. This is only effective if you set a dedicated queue for fast tasks, see the "scheduler" settings for more details. +# @param ~queue Queue to use for the task. Should be one of: `"generic"` or `"non_blocking"`. \ +# Non blocking should be reserved for tasks that are known to complete quickly. \ +# You can also use a dedicated queue name declared via `settings.scheduler.queues`. # @param ~delay Delay (in seconds) after which the thread should be launched. # @param ~every How often (in seconds) the thread should be run. If negative or `null`, run once. # @param ~on_error Error callback executed when an error occurred while running the given function. When passed, \ # all raised errors are silenced unless re-raised by the callback. # @param f Function to execute. def replaces thread.run( - ~fast=true, + ~queue="generic", ~delay=0., ~on_error=null(), ~every=null(), @@ -31,7 +33,7 @@ def replaces thread.run( on_error ) - thread.run.recurrent(fast=fast, delay=delay, on_error=on_error, f) + thread.run.recurrent(queue=queue, delay=delay, on_error=on_error, f) end # Execute a callback when a predicate is `true`. The predicate @@ -39,7 +41,9 @@ end # called when the predicate returns `true` after having been # `false`, following the same semantics as `predicate.activates`. # @category Programming -# @param ~fast Whether the callback is supposed to return quickly or not. +# @param ~queue Queue to use for the task. Should be one of: `"generic"` or `"non_blocking"`. \ +# Non blocking should be reserved for tasks that are known to complete quickly. \ +# You can also use a dedicated queue name declared via `settings.scheduler.queues`. # @param ~init Detect at beginning. # @param ~every How often (in sec.) to check for the predicate. # @param ~once Execute the function only once. @@ -49,7 +53,7 @@ end # @param p Predicate indicating when to execute the function, typically a time interval such as `{10h-10h30}`. # @param f Function to execute when the predicate is true. def thread.when( - ~fast=true, + ~queue="generic", ~init=true, ~every=getter(0.5), ~once=false, @@ -71,7 +75,7 @@ def thread.when( end end - thread.run.recurrent(fast=fast, delay=0., on_error=on_error, check) + thread.run.recurrent(queue=queue, delay=0., on_error=on_error, check) end # @flag hidden diff --git a/tests/language/error.liq b/tests/language/error.liq index 76e177412e..67b11c3c51 100644 --- a/tests/language/error.liq +++ b/tests/language/error.liq @@ -70,7 +70,7 @@ def f() = ) test.equal(r/error.liq, line 58 char 4 - line 63 char 7/.test(pos), true) - test.equal(r/thread.liq, line 19, char 11-14/.test(pos), true) + test.equal(r/thread.liq, line 21, char 11-14/.test(pos), true) e' = error.register("bla") test.equal(false, (e == e'))