From d34fffb9058af4ad2bdcc2676cda27b7c64c7968 Mon Sep 17 00:00:00 2001 From: Romain Beauxis Date: Sat, 23 Nov 2024 09:05:10 -0600 Subject: [PATCH] Revert scheduler changes. --- CHANGES.md | 2 - doc/content/liq/radiopi.liq | 6 + doc/content/liq/task-example.liq | 5 - doc/content/liq/task-with-queue.liq | 12 - doc/content/migrating.md | 26 --- doc/content/threads.md | 37 --- doc/dune.inc | 297 ------------------------- src/core/builtins/builtins_request.ml | 2 +- src/core/builtins/builtins_settings.ml | 27 +-- src/core/builtins/builtins_socket.ml | 6 +- src/core/builtins/builtins_thread.ml | 22 +- src/core/decoder/external_decoder.ml | 2 +- src/core/file_watcher.inotify.ml | 2 +- src/core/file_watcher.mtime.ml | 8 +- src/core/harbor/harbor.ml | 10 +- src/core/io/ffmpeg_io.ml | 2 +- src/core/io/srt_io.ml | 4 +- src/core/operators/pipe.ml | 2 +- src/core/outputs/harbor_output.ml | 8 +- src/core/sources/request_dynamic.ml | 20 +- src/core/tools/external_input.ml | 2 +- src/core/tools/liqfm.ml | 2 +- src/core/tools/server.ml | 2 +- src/core/tools/tutils.ml | 108 ++++----- src/core/tools/tutils.mli | 7 +- src/libs/extra/deprecations.liq | 9 +- src/libs/extra/native.liq | 2 +- src/libs/extra/visualization.liq | 4 +- src/libs/playlist.liq | 14 +- src/libs/request.liq | 14 +- src/libs/thread.liq | 16 +- tests/language/error.liq | 2 +- 32 files changed, 136 insertions(+), 546 deletions(-) delete mode 100644 doc/content/liq/task-example.liq delete mode 100644 doc/content/liq/task-with-queue.liq delete mode 100644 doc/content/threads.md diff --git a/CHANGES.md b/CHANGES.md index fde6df39d1..5c695618c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -35,8 +35,6 @@ New: - Added `video.canvas` to make it possible to position video elements independently of the rendered video size ([#3656](https://github.com/savonet/liquidsoap/pull/3656), [blog post](https://www.liquidsoap.info/blog/2024-02-10-video-canvas-and-ai/)) - Added cover manager from an original code by @vitoyucepi (#3651) -- Reworked scheduler queues logic, allow user-defined queues, add options to pick - the queue to send asynchronous tasks to (#4151) - Added non-interleaved API to `%ffmpeg` encoder, enabled by default when only one stream is encoded. - Allow trailing commas in record definition (#3300). diff --git a/doc/content/liq/radiopi.liq b/doc/content/liq/radiopi.liq index 9dd5e57b70..06553b29bd 100644 --- a/doc/content/liq/radiopi.liq +++ b/doc/content/liq/radiopi.liq @@ -16,6 +16,12 @@ settings.harbor.bind_addrs.set(["0.0.0.0"]) # Verbose logs log.level.set(4) +# We use the scheduler intensively, +# therefore we create many queues. +settings.scheduler.generic_queues.set(5) +settings.scheduler.fast_queues.set(3) +settings.scheduler.non_blocking_queues.set(3) + # === Settings === # The host to request files diff --git a/doc/content/liq/task-example.liq b/doc/content/liq/task-example.liq deleted file mode 100644 index b667f0083d..0000000000 --- a/doc/content/liq/task-example.liq +++ /dev/null @@ -1,5 +0,0 @@ -def connect_callback() = - ignore(http.post("http://host/on_connect")) -end - -thread.run(connect_callback) diff --git a/doc/content/liq/task-with-queue.liq b/doc/content/liq/task-with-queue.liq deleted file mode 100644 index d07692d499..0000000000 --- a/doc/content/liq/task-with-queue.liq +++ /dev/null @@ -1,12 +0,0 @@ -# Add 3 foo queue -settings.scheduler.queues.set([ - ...settings.scheduler.queues(), - ("foo", 3) -]) - -def connect_callback() = - ignore(http.post("http://host/on_connect")) -end - -# Execute inside the foo queue -thread.run(queue="foo", connect_callback) diff --git a/doc/content/migrating.md b/doc/content/migrating.md index 78aae83543..1bffd2a362 100644 --- a/doc/content/migrating.md +++ b/doc/content/migrating.md @@ -76,32 +76,6 @@ def transition(old, new) = end ``` -### Thread queues - -In order to improve issues with complex inter-dependent asynchronous tasks, scheduler queues have been updated. - -User-provided named queues can now be created and used to send asynchronous tasks, making it possible to control -concurrency of certain classes of tasks and also to remedy any potential dependency between asynchronous tasks. - -Settings for queues have thus changed and now will look like this: - -```liquidsoap -# Add a custom queue with 4 workers, increase generic queues to 4: -settings.scheduler.queues.set([ - ...list.assoc,remove("generic", settings.scheduler.queues()), - ("generic", 4), - ("custom", 4) -] -``` - -The `fast` argument of the `thread.run.*` functions has been replaced by `queue`, telling the operator which queue should the -asynchronous tasks sent to. - -Likewise, `request.dynamic`, `playlist`, `single` etc. have also been updated to accept a `thread_queue` argument controlling -which asynchronous queue their request resolution tasks should be sent to. - -See [the threads page](threads.html) for more details. - ### Replaygain - There is a new `metadata.replaygain` function that extracts the replay gain value in _dB_ from the metadata. diff --git a/doc/content/threads.md b/doc/content/threads.md deleted file mode 100644 index 45eca25668..0000000000 --- a/doc/content/threads.md +++ /dev/null @@ -1,37 +0,0 @@ -# Threads - -The main purpose of liquidsoap is to create real time media streams. When streams are created, everything that -is needed to compute them needs to happen very quickly so that we make sure that the stream can in fact -be created in real time. - -When a task is required that may take some time and whose result is not required for the stream generation, -for instance when executing a `on_stop` or `on_connect` callback, it can be useful to execute this task in a _thread_. - -Threads in liquidsoap are callback functions that are executed by an asynchronous queue. Here's an example: - -```{.liquidsoap include="task-example.liq"} - -``` - -By default, there are two type of queues available in liquidsoap: - -- `generic` queues -- `non_blocking` queues - -By convention, tasks that are known to be executing very fast should be sent to the -`non_blocking` queues and all the other tasks should be sent to the `generic` queue. - -You can decide which queue to send tasks to by using the `queue` parameter of the -`thread.run` functions. Some other operators who also use threads can have a similar -parameter such as `thread_queue` for `request.dynamic` and `playlist`. - -You can also define your own named queue using the `settings.scheduler.queues` setting. - -```{.liquidsoap include="task-with-queue.liq"} - -``` - -This is particularly useful for two applications: - -- To control concurrent execution of specific tasks. -- To prevent deadlocks in case some tasks depends on other tasks. diff --git a/doc/dune.inc b/doc/dune.inc index eab98186d9..0f1f2fdff0 100644 --- a/doc/dune.inc +++ b/doc/dune.inc @@ -167,8 +167,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -297,8 +295,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -427,8 +423,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -557,8 +551,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -687,8 +679,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -817,8 +807,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -947,8 +935,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1077,8 +1063,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1207,8 +1191,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1337,8 +1319,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1467,8 +1447,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1597,8 +1575,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1727,8 +1703,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1857,8 +1831,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -1987,8 +1959,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2117,8 +2087,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2247,8 +2215,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2377,8 +2343,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2507,8 +2471,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2637,8 +2599,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2767,8 +2727,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -2897,8 +2855,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3027,8 +2983,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3157,8 +3111,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3287,8 +3239,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3417,8 +3367,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3547,8 +3495,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3677,8 +3623,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3807,8 +3751,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -3937,8 +3879,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4067,8 +4007,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4197,8 +4135,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4327,8 +4263,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4457,8 +4391,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4587,8 +4519,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4717,8 +4647,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4847,8 +4775,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -4977,8 +4903,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5107,8 +5031,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5237,8 +5159,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5367,8 +5287,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5497,8 +5415,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5627,8 +5543,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5757,8 +5671,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -5887,8 +5799,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6017,8 +5927,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6147,8 +6055,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6277,8 +6183,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6407,8 +6311,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6537,8 +6439,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6667,8 +6567,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6797,8 +6695,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -6927,8 +6823,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7057,8 +6951,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7187,8 +7079,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7317,8 +7207,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7447,8 +7335,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7577,8 +7463,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7707,8 +7591,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7837,8 +7719,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -7967,8 +7847,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8097,8 +7975,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8227,8 +8103,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8357,8 +8231,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8487,8 +8359,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8617,8 +8487,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8747,8 +8615,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -8877,8 +8743,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -9007,8 +8871,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -9137,8 +8999,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -9165,136 +9025,6 @@ ) ) -(rule - (alias doc) - (package liquidsoap) - (enabled_if (not %{bin-available:pandoc})) - (deps (:no_pandoc no-pandoc)) - (target threads.html) - (action (run cp %{no_pandoc} %{target})) -) - -(rule - (alias doc) - (package liquidsoap) - (enabled_if %{bin-available:pandoc}) - (deps - liquidsoap.xml - language.dtd - template.html - content/liq/append-silence.liq - content/liq/archive-cleaner.liq - content/liq/basic-radio.liq - content/liq/beets-amplify.liq - content/liq/beets-protocol-short.liq - content/liq/beets-protocol.liq - content/liq/beets-source.liq - content/liq/blank-detect.liq - content/liq/blank-sorry.liq - content/liq/complete-case.liq - content/liq/cross.custom.liq - content/liq/crossfade.liq - content/liq/decoder-faad.liq - content/liq/decoder-flac.liq - content/liq/decoder-metaflac.liq - content/liq/dump-hourly.liq - content/liq/dump-hourly2.liq - content/liq/dynamic-source.liq - content/liq/external-output.file.liq - content/liq/fallback.liq - content/liq/ffmpeg-filter-dynamic-volume.liq - content/liq/ffmpeg-filter-flanger-highpass.liq - content/liq/ffmpeg-filter-hflip.liq - content/liq/ffmpeg-filter-hflip2.liq - content/liq/ffmpeg-filter-parallel-flanger-highpass.liq - content/liq/ffmpeg-live-switch.liq - content/liq/ffmpeg-relay-ondemand.liq - content/liq/ffmpeg-relay.liq - content/liq/ffmpeg-shared-encoding-rtmp.liq - content/liq/ffmpeg-shared-encoding.liq - content/liq/fixed-time1.liq - content/liq/fixed-time2.liq - content/liq/frame-size.liq - content/liq/harbor-auth.liq - content/liq/harbor-dynamic.liq - content/liq/harbor-insert-metadata.liq - content/liq/harbor-metadata.liq - content/liq/harbor-redirect.liq - content/liq/harbor-simple.liq - content/liq/harbor-usage.liq - content/liq/harbor.http.register.liq - content/liq/harbor.http.response.liq - content/liq/hls-metadata.liq - content/liq/hls-mp4.liq - content/liq/http-input.liq - content/liq/icy-update.liq - content/liq/input.mplayer.liq - content/liq/jingle-hour.liq - content/liq/json-ex.liq - content/liq/json-stringify.liq - content/liq/json1.liq - content/liq/live-switch.liq - content/liq/medialib-predicate.liq - content/liq/medialib.liq - content/liq/medialib.sqlite.liq - content/liq/multitrack-add-video-track.liq - content/liq/multitrack-add-video-track2.liq - content/liq/multitrack-default-video-track.liq - content/liq/multitrack.liq - content/liq/multitrack2.liq - content/liq/multitrack3.liq - content/liq/output.file.hls.liq - content/liq/playlists.liq - content/liq/prometheus-callback.liq - content/liq/prometheus-settings.liq - content/liq/radiopi.liq - content/liq/re-encode.liq - content/liq/regular.liq - content/liq/replaygain-metadata.liq - content/liq/replaygain-playlist.liq - content/liq/request.dynamic.liq - content/liq/rtmp.liq - content/liq/samplerate3.liq - content/liq/scheduling.liq - content/liq/seek-telnet.liq - content/liq/settings.liq - content/liq/shoutcast.liq - content/liq/single.liq - content/liq/source-cue.liq - content/liq/space_overhead.liq - content/liq/split-cue.liq - content/liq/sqlite.liq - content/liq/srt-receiver.liq - content/liq/srt-sender.liq - content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq - content/liq/transcoding.liq - content/liq/video-anonymizer.liq - content/liq/video-bluescreen.liq - content/liq/video-canvas-example.liq - content/liq/video-default-canvas.liq - content/liq/video-in-video.liq - content/liq/video-logo.liq - content/liq/video-osc.liq - content/liq/video-simple.liq - content/liq/video-static.liq - content/liq/video-text.liq - content/liq/video-transition.liq - content/liq/video-weather.liq - content/liq/video-webcam.liq - (:md content/threads.md) - ) - (target threads.html) - (action - (pipe-stdout - (run pandoc %{md} -t json) - (run pandoc-include --directory content/liq) - (run pandoc -f json --syntax-definition=liquidsoap.xml --highlight=pygments --metadata pagetitle=threads --template=template.html -o %{target}) - ) - ) -) - (rule (alias doc) (package liquidsoap) @@ -9397,8 +9127,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -9527,8 +9255,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -9657,8 +9383,6 @@ content/liq/srt-receiver.liq content/liq/srt-sender.liq content/liq/switch-show.liq - content/liq/task-example.liq - content/liq/task-with-queue.liq content/liq/transcoding.liq content/liq/video-anonymizer.liq content/liq/video-bluescreen.liq @@ -10535,26 +10259,6 @@ (action (run %{bin:liquidsoap} --check --no-fallible-check content/liq/switch-show.liq)) ) -(rule - (alias doctest) - (package liquidsoap) - (deps - (source_tree ../src/libs) - (:test_liq content/liq/task-example.liq) - ) - (action (run %{bin:liquidsoap} --check --no-fallible-check content/liq/task-example.liq)) -) - -(rule - (alias doctest) - (package liquidsoap) - (deps - (source_tree ../src/libs) - (:test_liq content/liq/task-with-queue.liq) - ) - (action (run %{bin:liquidsoap} --check --no-fallible-check content/liq/task-with-queue.liq)) -) - (rule (alias doctest) (package liquidsoap) @@ -10790,7 +10494,6 @@ (stereotool.html as html/stereotool.html) (stream_content.html as html/stream_content.html) (strings_encoding.html as html/strings_encoding.html) - (threads.html as html/threads.html) (video-static.html as html/video-static.html) (video.html as html/video.html) (yaml.html as html/yaml.html) diff --git a/src/core/builtins/builtins_request.ml b/src/core/builtins/builtins_request.ml index 3829033ad3..559a355ddb 100644 --- a/src/core/builtins/builtins_request.ml +++ b/src/core/builtins/builtins_request.ml @@ -322,7 +322,7 @@ class process ~name r = object (self) inherit Request_dynamic.dynamic - ~name ~priority:`Non_blocking + ~name ~retry_delay:(fun _ -> 0.1) ~available:(fun _ -> true) ~prefetch:1 ~timeout:None ~synchronous:true diff --git a/src/core/builtins/builtins_settings.ml b/src/core/builtins/builtins_settings.ml index 9ba65972d3..db1a4e1d20 100644 --- a/src/core/builtins/builtins_settings.ml +++ b/src/core/builtins/builtins_settings.ml @@ -92,32 +92,22 @@ let settings_module = | ty, true -> Lang.fun_t [] ty | ty, false -> Lang.fun_t [] (Lang.nullable_t ty) in - let rec get_type ?(sub = []) ~label conf = + let rec get_type ?(sub = []) conf = let ty, has_default_value = get_conf_type conf in Lang.method_t (get_t ~has_default_value ty) - (set_t ty @ leaf_types conf @ sub - @ - if label = "scheduler" then - [ - ( "queues", - ( [], - Lang.ref_t - (Lang.list_t (Lang.product_t Lang.string_t Lang.int_t)) ), - "Scheduler queue configuration." ); - ] - else []) + (set_t ty @ leaf_types conf @ sub) and leaf_types conf = List.map (fun label -> - let ty = get_type ~label (conf#path [label]) in + let ty = get_type (conf#path [label]) in let label = Utils.normalize_parameter_string label in ( label, ([], ty), Printf.sprintf "Entry for configuration key %s" label )) conf#subs in - let settings_t = get_type ~label:"settings" Configure.conf in + let settings_t = get_type Configure.conf in let get_v fn conv_to conv_from conf = let get = Lang.val_fun [] (fun _ -> @@ -132,7 +122,7 @@ let settings_module = in (get, Some set) in - let rec get_value ?(sub = []) ~label conf = + let rec get_value ?(sub = []) conf = let to_v fn conv_to conv_from = try ignore (fn conf); @@ -154,8 +144,7 @@ let settings_module = with Found v -> v in Lang.meth get_v - ((if label = "scheduler" then [("queues", Tutils.queues_conf)] else []) - @ (if set_v <> None then [("set", Option.get set_v)] else []) + ((if set_v <> None then [("set", Option.get set_v)] else []) @ [ ("description", Lang.string (String.trim conf#descr)); ( "comments", @@ -165,11 +154,11 @@ let settings_module = and leaf_values conf = List.map (fun label -> - let v = get_value ~label (conf#path [label]) in + let v = get_value (conf#path [label]) in (Utils.normalize_parameter_string label, v)) conf#subs in - settings := get_value ~label:"settings" Configure.conf; + settings := get_value Configure.conf; ignore (Lang.add_builtin_value ~category:`Settings "settings" ~descr:"All settings." ~flags:[`Hidden] !settings settings_t)) diff --git a/src/core/builtins/builtins_socket.ml b/src/core/builtins/builtins_socket.ml index 4b278e8523..f001c4a8c6 100644 --- a/src/core/builtins/builtins_socket.ml +++ b/src/core/builtins/builtins_socket.ml @@ -230,7 +230,11 @@ module Socket_value = struct [] in Duppy.Task.add Tutils.scheduler - { Duppy.Task.priority = `Generic; events; handler = fn }; + { + Duppy.Task.priority = `Maybe_blocking; + events; + handler = fn; + }; Lang.unit) ); ] in diff --git a/src/core/builtins/builtins_thread.ml b/src/core/builtins/builtins_thread.ml index 0866e9d3fc..d567c0f898 100644 --- a/src/core/builtins/builtins_thread.ml +++ b/src/core/builtins/builtins_thread.ml @@ -40,14 +40,16 @@ let _ = let _ = Lang.add_builtin ~base:thread_run "recurrent" ~category:`Programming [ - ( "queue", - Lang.string_t, - Some (Lang.string "generic"), + ( "fast", + Lang.bool_t, + Some (Lang.bool true), Some - "Queue to use for the task. Should be one of: `\"generic\"` or \ - `\"non_blocking\"`. Non blocking should be reserved for tasks that \ - are known to complete quickly. You can also use declared via \ - `settings.scheduler.queues`." ); + "Whether the thread is supposed to return quickly or not. Typically, \ + blocking tasks (e.g. fetching data over the internet) should not be \ + considered to be fast. When set to `false` its priority will be \ + lowered below that of request resolutions and fast timeouts. This \ + is only effective if you set a dedicated queue for fast tasks, see \ + the \"scheduler\" settings for more details." ); ( "delay", Lang.float_t, Some (Lang.float 0.), @@ -72,10 +74,8 @@ let _ = let delay = Lang.to_float (List.assoc "delay" p) in let f = List.assoc "" p in let priority = - match Lang.to_string (List.assoc "queue" p) with - | "generic" -> `Generic - | "non_blocking" -> `Non_blocking - | n -> `Named n + if Lang.to_bool (List.assoc "fast" p) then `Maybe_blocking + else `Blocking in let on_error = Lang.to_option (List.assoc "on_error" p) in let on_error = diff --git a/src/core/decoder/external_decoder.ml b/src/core/decoder/external_decoder.ml index efca33e872..2a60670808 100644 --- a/src/core/decoder/external_decoder.ml +++ b/src/core/decoder/external_decoder.ml @@ -49,7 +49,7 @@ let external_input process input = in let log s = log#important "%s" s in (* reading from input is blocking.. *) - let priority = `Generic in + let priority = `Blocking in let process = Process_handler.run ~priority ~on_stdin ~on_stderr ~log process in diff --git a/src/core/file_watcher.inotify.ml b/src/core/file_watcher.inotify.ml index 3ac1c84a26..8cde27f23c 100644 --- a/src/core/file_watcher.inotify.ml +++ b/src/core/file_watcher.inotify.ml @@ -53,7 +53,7 @@ let rec watchdog () = events; [watchdog ()]) in - { Duppy.Task.priority = `Generic; events = [`Read fd]; handler } + { Duppy.Task.priority = `Maybe_blocking; events = [`Read fd]; handler } let watch : watch = fun ~pos e file f -> diff --git a/src/core/file_watcher.mtime.ml b/src/core/file_watcher.mtime.ml index 2df0c6ea9a..58a89af598 100644 --- a/src/core/file_watcher.mtime.ml +++ b/src/core/file_watcher.mtime.ml @@ -61,7 +61,7 @@ let rec handler _ = (Printf.sprintf "Error while executing file watcher callback: %s" (Printexc.to_string exn))) !watched; - [{ Duppy.Task.priority = `Generic; events = [`Delay 1.]; handler }]) + [{ Duppy.Task.priority = `Maybe_blocking; events = [`Delay 1.]; handler }]) () let watch : watch = @@ -73,7 +73,11 @@ let watch : watch = if not !launched then begin launched := true; Duppy.Task.add Tutils.scheduler - { Duppy.Task.priority = `Generic; events = [`Delay 1.]; handler } + { + Duppy.Task.priority = `Maybe_blocking; + events = [`Delay 1.]; + handler; + } end; let mtime = try file_mtime file with _ -> 0. in watched := { file; mtime; callback } :: !watched; diff --git a/src/core/harbor/harbor.ml b/src/core/harbor/harbor.ml index 83ffffef9a..ae6df292d8 100644 --- a/src/core/harbor/harbor.ml +++ b/src/core/harbor/harbor.ml @@ -372,7 +372,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct simple_reply "No / mountpoint\r\n\r\n" in (* Authentication can be blocking. *) - Duppy.Monad.Io.exec ~priority:`Generic h + Duppy.Monad.Io.exec ~priority:`Maybe_blocking h (let user, auth_f = s#login in let user = if requested_user = "" then user else requested_user in if auth_f ~socket:h.Duppy.Monad.Io.socket user password then @@ -484,7 +484,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct Hashtbl.fold (fun lbl k query -> (lbl, k) :: query) query []) args in - Duppy.Monad.Io.exec ~priority:`Generic h + Duppy.Monad.Io.exec ~priority:`Maybe_blocking h (http_auth_check ?query ~login h.Duppy.Monad.Io.socket headers) (* We do not implement anything with this handler for now. *) @@ -627,7 +627,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct (Bytes.of_string (Websocket.upgrade headers)) in let* stype, huri, user, password = - Duppy.Monad.Io.exec ~priority:`Generic h + Duppy.Monad.Io.exec ~priority:`Blocking h (read_hello h.Duppy.Monad.Io.socket) in log#info "Mime type: %s" stype; @@ -895,7 +895,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct fun timeout -> fst (Http.read_chunked ~timeout socket) | _ -> fun _ -> "" in - Duppy.Monad.Io.exec ~priority:`Generic h + Duppy.Monad.Io.exec ~priority:`Maybe_blocking h (handler ~protocol ~meth ~headers ~data ~socket:h.Duppy.Monad.Io.socket ~query base_uri) | e -> @@ -965,7 +965,7 @@ module Make (T : Transport_t) : T with type socket = T.socket = struct ~priority: (* ICY = true means that authentication has already happened *) - `Generic h + `Maybe_blocking h (let valid_user, auth_f = s#login in if not diff --git a/src/core/io/ffmpeg_io.ml b/src/core/io/ffmpeg_io.ml index 189821de85..ed2380f936 100644 --- a/src/core/io/ffmpeg_io.ml +++ b/src/core/io/ffmpeg_io.ml @@ -200,7 +200,7 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug | Some t -> Duppy.Async.wake_up t | None -> let t = - Duppy.Async.add ~priority:`Generic Tutils.scheduler + Duppy.Async.add ~priority:`Blocking Tutils.scheduler self#connect_task in Atomic.set connect_task (Some t); diff --git a/src/core/io/srt_io.ml b/src/core/io/srt_io.ml index 0d0ad63855..54bc148a30 100644 --- a/src/core/io/srt_io.ml +++ b/src/core/io/srt_io.ml @@ -358,7 +358,7 @@ module Poll = struct (Printexc.to_string exn)); -1. - let task = Duppy.Async.add ~priority:`Generic Tutils.scheduler process + let task = Duppy.Async.add ~priority:`Blocking Tutils.scheduler process let add_socket ~mode socket fn = Srt.setsockflag socket Srt.sndsyn false; @@ -532,7 +532,7 @@ class virtual caller ~enforced_encryption ~pbkeylen ~passphrase ~streamid | Some t -> Duppy.Async.wake_up t | None -> let t = - Duppy.Async.add ~priority:`Generic Tutils.scheduler + Duppy.Async.add ~priority:`Blocking Tutils.scheduler self#connect_fn in connect_task <- Some t; diff --git a/src/core/operators/pipe.ml b/src/core/operators/pipe.ml index d371255dba..480ef0649b 100644 --- a/src/core/operators/pipe.ml +++ b/src/core/operators/pipe.ml @@ -254,7 +254,7 @@ class pipe ~replay_delay ~data_len ~process ~bufferize ~max ~restart Some (Process_handler.run ~on_stop:self#on_stop ~on_start:self#on_start ~on_stdout:self#on_stdout ~on_stdin:self#on_stdin - ~priority:`Generic ~on_stderr:self#on_stderr ~log process)) + ~priority:`Blocking ~on_stderr:self#on_stderr ~log process)) method! abort_track = source#abort_track diff --git a/src/core/outputs/harbor_output.ml b/src/core/outputs/harbor_output.ml index ff3534c27c..fab2ffca67 100644 --- a/src/core/outputs/harbor_output.ml +++ b/src/core/outputs/harbor_output.ml @@ -261,7 +261,7 @@ let add_meta c data = let rec client_task c = let* data = - Duppy.Monad.Io.exec ~priority:`Generic c.handler + Duppy.Monad.Io.exec ~priority:`Maybe_blocking c.handler (Mutex_utils.mutexify c.mutex (fun () -> let buflen = Strings.Mutable.length c.buffer in @@ -283,7 +283,7 @@ let rec client_task c = c.handler (Strings.to_bytes data) in let* state = - Duppy.Monad.Io.exec ~priority:`Generic c.handler + Duppy.Monad.Io.exec ~priority:`Maybe_blocking c.handler (let ret = Mutex_utils.mutexify c.mutex (fun () -> c.state) () in Duppy.Monad.return ret) in @@ -521,7 +521,7 @@ class output p = || auth_function <> None then ( let default_user = Option.value default_user ~default:"" in - Duppy.Monad.Io.exec ~priority:`Generic handler + Duppy.Monad.Io.exec ~priority:`Maybe_blocking handler (Harbor.http_auth_check ~query ~login:(default_user, login) s headers)) else Duppy.Monad.return ()) @@ -532,7 +532,7 @@ class output p = Harbor.reply s | _ -> assert false) in - Duppy.Monad.Io.exec ~priority:`Generic handler + Duppy.Monad.Io.exec ~priority:`Maybe_blocking handler (Harbor.relayed reply (fun () -> self#log#info "Client %s connected" ip; Mutex_utils.mutexify clients_m diff --git a/src/core/sources/request_dynamic.ml b/src/core/sources/request_dynamic.ml index 4f16c3d1c0..00a29985d8 100644 --- a/src/core/sources/request_dynamic.ml +++ b/src/core/sources/request_dynamic.ml @@ -26,6 +26,9 @@ module Queue = Liquidsoap_lang.Queues.Queue let conf_prefetch = Dtools.Conf.int ~p:(Request.conf#plug "prefetch") ~d:1 "Default prefetch" +(* Scheduler priority for request resolutions. *) +let priority = `Maybe_blocking + type queue_item = { request : Request.t; (* in seconds *) @@ -55,8 +58,8 @@ let () = Lifecycle.before_core_shutdown ~name:"request.dynamic shutdown" (fun () -> Atomic.set should_fail true) -class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available - ~prefetch ~synchronous ~timeout f = +class dynamic ?(name = "request.dynamic") ~retry_delay ~available ~prefetch + ~synchronous ~timeout f = let available () = (not (Atomic.get should_fail)) && available () in object (self) inherit source ~name () @@ -346,10 +349,6 @@ let _ = ~descr:"Play request dynamically created by a given function." [ ("", Lang.fun_t [] (Lang.nullable_t Request.Value.t), None, None); - ( "thread_queue", - Lang.string_t, - Some (Lang.string "generic"), - Some "Queue used to resolve requests." ); ( "retry_delay", Lang.getter_t Lang.float_t, Some (Lang.float 0.1), @@ -447,12 +446,6 @@ let _ = let f = List.assoc "" p in let available = Lang.to_bool_getter (List.assoc "available" p) in let retry_delay = Lang.to_float_getter (List.assoc "retry_delay" p) in - let priority = - match Lang.to_string (List.assoc "thread_queue" p) with - | "generic" -> `Generic - | "non_blocking" -> `Non_blocking - | n -> `Named n - in let prefetch = Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p) in @@ -461,5 +454,4 @@ let _ = let timeout = Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) in - new dynamic - ~available ~priority ~retry_delay ~prefetch ~timeout ~synchronous f) + new dynamic ~available ~retry_delay ~prefetch ~timeout ~synchronous f) diff --git a/src/core/tools/external_input.ml b/src/core/tools/external_input.ml index e4a6c155bb..4df1a70239 100644 --- a/src/core/tools/external_input.ml +++ b/src/core/tools/external_input.ml @@ -67,7 +67,7 @@ class virtual base ~name ~restart ~restart_on_error ~on_data ?read_header let log s = self#log#important "%s" s in process <- Some - (Process_handler.run ~priority:`Generic ~on_stop ~on_stdout + (Process_handler.run ~priority:`Blocking ~on_stop ~on_stdout ~on_stderr ~log (command ()))); self#on_sleep (fun () -> diff --git a/src/core/tools/liqfm.ml b/src/core/tools/liqfm.ml index 8882fb1b3b..d4c0eeddad 100644 --- a/src/core/tools/liqfm.ml +++ b/src/core/tools/liqfm.ml @@ -231,7 +231,7 @@ let init host = reason (Printexc.to_string e); -1. in - let task = Duppy.Async.add ~priority:`Generic Tutils.scheduler do_submit in + let task = Duppy.Async.add ~priority:`Blocking Tutils.scheduler do_submit in { task; submit_m; submissions } let submit (user, password) task length source stype songs = diff --git a/src/core/tools/server.ml b/src/core/tools/server.ml index d35bc5474e..16350b55ed 100644 --- a/src/core/tools/server.ml +++ b/src/core/tools/server.ml @@ -319,7 +319,7 @@ let handle_client socket ip = | e -> Duppy.Monad.raise e in let* ans = - Duppy.Monad.Io.exec ~priority:`Generic h (run (fun () -> exec req)) + Duppy.Monad.Io.exec ~priority:`Maybe_blocking h (run (fun () -> exec req)) in let* () = let* () = diff --git a/src/core/tools/tutils.ml b/src/core/tools/tutils.ml index b7faea6678..666dd14406 100644 --- a/src/core/tools/tutils.ml +++ b/src/core/tools/tutils.ml @@ -20,9 +20,6 @@ *****************************************************************************) -module Methods = Liquidsoap_lang.Methods -module Lang = Liquidsoap_lang.Lang - let conf_scheduler = Dtools.Conf.void ~p:(Configure.conf#plug "scheduler") @@ -71,6 +68,43 @@ let exit () = | `Done (`Error (bt, err)) -> Printexc.raise_with_backtrace err bt | _ -> exit (exit_code ()) +let generic_queues = + Dtools.Conf.int + ~p:(conf_scheduler#plug "generic_queues") + ~d:5 "Generic queues" + ~comments: + [ + "Number of event queues accepting any kind of task."; + "There should at least be one. Having more can be useful to make sure"; + "that trivial request resolutions (local files) are not delayed"; + "because of a stalled download. But N stalled download can block"; + "N queues anyway."; + ] + +let fast_queues = + Dtools.Conf.int + ~p:(conf_scheduler#plug "fast_queues") + ~d:0 "Fast queues" + ~comments: + [ + "Number of queues that are dedicated to fast tasks."; + "It might be useful to create some if your request resolutions,"; + "or some user defined tasks (cf `thread.run`), are"; + "delayed too much because of slow tasks blocking the generic queues,"; + "such as last.fm submissions or slow `thread.run` handlers."; + ] + +let non_blocking_queues = + Dtools.Conf.int + ~p:(conf_scheduler#plug "non_blocking_queues") + ~d:2 "Non-blocking queues" + ~comments: + [ + "Number of queues dedicated to internal non-blocking tasks."; + "These are only started if such tasks are needed."; + "There should be at least one."; + ] + let scheduler_log = Dtools.Conf.bool ~p:(conf_scheduler#plug "log") @@ -99,26 +133,6 @@ end) let all = ref Set.empty let queues = ref Set.empty -let queues_conf_ref = Atomic.make [("generic", 2); ("non_blocking", 2)] - -let queues_conf = - Lang.reference - (fun () -> - let v = Atomic.get queues_conf_ref in - Lang.list - (List.map - (fun (lbl, c) -> Lang.product (Lang.string lbl) (Lang.int c)) - v)) - (fun v -> - let v = Lang.to_list v in - let v = - List.map - (fun v -> - let lbl, c = Lang.to_product v in - (Lang.to_string lbl, Lang.to_int c)) - v - in - Atomic.set queues_conf_ref v) let join_all ~set () = let rec f () = @@ -212,8 +226,8 @@ let create ~queue f x s = () type priority = - [ `Generic (** Generic queues accept all tasks. *) - | `Named of string (** Named queues only accept tasks with their priority. *) + [ `Blocking (** For example a last.fm submission. *) + | `Maybe_blocking (** Request resolutions vary a lot. *) | `Non_blocking (** Non-blocking tasks like the server. *) ] let error_handlers = Stack.create () @@ -252,14 +266,13 @@ let scheduler_log n = fun m -> log#info "%s" m) else fun _ -> () -let new_queue ~priority ~name () = +let new_queue ?priorities ~name () = let qlog = scheduler_log name in - let priorities p = - match (priority, p) with - | `Generic, (`Generic | `Non_blocking) -> true - | v, v' -> v = v' + let queue () = + match priorities with + | None -> Duppy.queue scheduler ~log:qlog name + | Some priorities -> Duppy.queue scheduler ~log:qlog ~priorities name in - let queue () = Duppy.queue scheduler ~log:qlog ~priorities name in ignore (create ~queue:true queue () name) let create f x name = create ~queue:false f x name @@ -267,27 +280,18 @@ let join_all () = join_all ~set:all () let start () = if Atomic.compare_and_set state `Idle `Starting then ( - let queues = Methods.from_list (Atomic.get queues_conf_ref) in - Methods.iter - (fun priority count -> - let priority = - match priority with - | "generic" -> `Generic - | "non_blocking" -> `Non_blocking - | n -> `Named n - in - for i = 1 to count do - let name = - Printf.sprintf "%s queue #%d" - (match priority with - | `Generic -> "generic" - | `Named n -> n - | `Non_blocking -> "non-blocking") - i - in - new_queue ~priority ~name () - done) - queues) + for i = 1 to generic_queues#get do + let name = Printf.sprintf "generic queue #%d" i in + new_queue ~name () + done; + for i = 1 to fast_queues#get do + let name = Printf.sprintf "fast queue #%d" i in + new_queue ~name ~priorities:(fun x -> x = `Maybe_blocking) () + done; + for i = 1 to non_blocking_queues#get do + let name = Printf.sprintf "non-blocking queue #%d" i in + new_queue ~priorities:(fun x -> x = `Non_blocking) ~name () + done) (** Waits for [f()] to become true on condition [c]. *) let wait c m f = diff --git a/src/core/tools/tutils.mli b/src/core/tools/tutils.mli index 2059b28777..c9d91f9e49 100644 --- a/src/core/tools/tutils.mli +++ b/src/core/tools/tutils.mli @@ -55,13 +55,10 @@ val join_all : unit -> unit (** Priorities for the different scheduler usages. *) type priority = - [ `Generic (** Generic queues accept all tasks. *) - | `Named of string (** Named queues only accept tasks with their priority. *) + [ `Blocking (** For example a last.fm submission. *) + | `Maybe_blocking (** Request resolutions vary a lot. *) | `Non_blocking (** Non-blocking tasks like the server. *) ] -(** Queues configuration. *) -val queues_conf : Liquidsoap_lang.Lang.value - (** task scheduler *) val scheduler : priority Duppy.scheduler diff --git a/src/libs/extra/deprecations.liq b/src/libs/extra/deprecations.liq index 5337a4555c..9ac8638b4d 100644 --- a/src/libs/extra/deprecations.liq +++ b/src/libs/extra/deprecations.liq @@ -76,10 +76,9 @@ end # Deprecated: this function has been replaced by `thread.run.recurrent`. # @flag deprecated -def add_timeout(~fast, delay, f) = +def add_timeout(~fast=true, delay, f) = deprecated("add_timeout", "thread.run.recurrent") - ignore(fast or true) - thread.run.recurrent(queue="generic", delay=delay, f) + thread.run.recurrent(fast=fast, delay=delay, f) end # Deprecated: this function has been replaced by `thread.when`. @@ -315,7 +314,7 @@ def register_flow( ping_period end - thread.run.recurrent(delay=ping_period, ping) + thread.run.recurrent(fast=false, delay=ping_period, ping) # Register streams def register_stream(format_url) = @@ -340,7 +339,7 @@ def register_flow( artist = m["artist"] title = m["title"] params = [("m_title", title), ("m_artist", artist)] - thread.run({request(cmd="metadata", params=params)}) + thread.run(fast=false, {request(cmd="metadata", params=params)}) end s.on_metadata(metadata) diff --git a/src/libs/extra/native.liq b/src/libs/extra/native.liq index a7ead829e5..f8256db410 100644 --- a/src/libs/extra/native.liq +++ b/src/libs/extra/native.liq @@ -171,7 +171,7 @@ def native.request.dynamic(%argsof(request.dynamic), f) = if list.length(queue()) < prefetch then ignore(fetch()) end end - thread.run(queue=thread_queue, every=retry_delay, fill) + thread.run(every=retry_delay, fill) # Source def s() = diff --git a/src/libs/extra/visualization.liq b/src/libs/extra/visualization.liq index 93b441e8d1..20a662ca9b 100644 --- a/src/libs/extra/visualization.liq +++ b/src/libs/extra/visualization.liq @@ -28,7 +28,7 @@ def vumeter(~rms_min=-25., ~rms_max=-5., ~window=0.5, ~scroll=false, s) = print(newline=false, bar()) end - thread.run(queue="non_blocking", every=window, display) + thread.run(fast=true, every=window, display) s end @@ -62,7 +62,7 @@ def video.vumeter( width := int_of_float(x * float_of_int(video.frame.width())) end - thread.run(queue="non_blocking", every=window, update) + thread.run(fast=true, every=window, update) s = video.add_rectangle(width=width, height=height, color=color, s) video.persistence(duration=persistence, s) end diff --git a/src/libs/playlist.liq b/src/libs/playlist.liq index 89251c94c9..8e347453b5 100644 --- a/src/libs/playlist.liq +++ b/src/libs/playlist.liq @@ -173,7 +173,6 @@ let stdlib_native = native # whole round ("randomize" mode), or pick a random file in the playlist each time \ # ("random" mode). # @param ~native Use native implementation, when available. -# @param ~thread_queue Queue used to resolve requests. # @param ~on_loop Function executed when the playlist is about to loop. # @param ~on_done Function executed when the playlist is finished. # @param ~max_fail When this number of requests fail to resolve, the whole playlists is considered as failed and `on_fail` is called. @@ -186,7 +185,6 @@ let stdlib_native = native # @method remaining_files Songs remaining to be played. def playlist.list( ~id=null(), - ~thread_queue="generic", ~check_next=null(), ~prefetch=null(), ~loop=true, @@ -253,7 +251,6 @@ def playlist.list( fun () -> request.dynamic( id=id, - thread_queue=thread_queue, prefetch=prefetch, timeout=timeout, retry_delay=1., @@ -263,13 +260,7 @@ def playlist.list( s = %ifdef native - if - native - then - stdlib_native.request.dynamic(id=id, thread_queue=thread_queue, next) - else - default() - end + if native then stdlib_native.request.dynamic(id=id, next) else default() end %else default() %endif @@ -453,7 +444,6 @@ end # changed). # @param ~register_server_commands Register corresponding server commands # @param ~timeout Timeout (in sec.) to resolve the request. Defaults to `settings.request.timeout` when `null`. -# @param ~thread_queue Queue used to resolve requests. # @param ~cue_in_metadata Metadata for cue in points. Disabled if `null`. # @param ~cue_out_metadata Metadata for cue out points. Disabled if `null`. # @param uri Playlist URI. @@ -462,7 +452,6 @@ end # @method remaining_files Songs remaining to be played. def replaces playlist( ~id=null(), - ~thread_queue="generic", ~check_next=null(), ~prefetch=null(), ~loop=true, @@ -578,7 +567,6 @@ def replaces playlist( s = playlist.list( id=id, - thread_queue=thread_queue, check_next=check_next, prefetch=prefetch, loop=loop, diff --git a/src/libs/request.liq b/src/libs/request.liq index 8ecf2172bf..c27328f3eb 100644 --- a/src/libs/request.liq +++ b/src/libs/request.liq @@ -27,7 +27,6 @@ end # @param ~prefetch How many requests should be queued in advance. # @param ~native Use native implementation, when available. # @param ~queue Initial queue of requests. -# @param ~thread_queue Queue used to resolve requests. # @param ~timeout Timeout (in sec.) to resolve the request. # @method add This method is internal and should not be used. Consider using `push` instead. # @method push Push a request on the request queue. @@ -38,7 +37,6 @@ def request.queue( ~prefetch=null(), ~native=false, ~queue=[], - ~thread_queue="generic", ~timeout=null() ) = ignore(native) @@ -90,7 +88,6 @@ def request.queue( request.dynamic( id=id, prefetch=prefetch, - thread_queue=thread_queue, timeout=timeout, available={not list.is_empty(queue())}, next @@ -101,9 +98,7 @@ def request.queue( if native then - stdlib_native.request.dynamic( - id=id, thread_queue=thread_queue, timeout=timeout, next - ) + stdlib_native.request.dynamic(id=id, timeout=timeout, next) else default() end @@ -218,12 +213,10 @@ end # Play a request once and become unavailable. # @category Source / Input -# @param ~thread_queue Queue used to resolve requests. # @param ~timeout Timeout in seconds for resolving the request. # @param r Request to play. def request.once( ~id=null("request.once"), - ~thread_queue="generic", ~timeout=null(), r ) = @@ -242,7 +235,7 @@ def request.once( if request.resolve(r, timeout=timeout) then - request.queue(thread_queue=thread_queue, queue=[r]) + request.queue(queue=[r]) else log.critical( label=id, @@ -266,7 +259,6 @@ end # static, and time is not. # @category Source / Input # @param ~prefetch How many requests should be queued in advance. -# @param ~thread_queue Queue used to resolve requests. # @param ~timeout Timeout (in sec.) to resolve the request. # @param ~fallible Enforce fallibility of the request. # @param r Request @@ -274,7 +266,6 @@ def request.single( ~id=null("request.single"), ~prefetch=null(), ~timeout=null(), - ~thread_queue="generic", ~fallible=null(), r ) = @@ -358,7 +349,6 @@ def request.single( request.dynamic( id=id, prefetch=prefetch, - thread_queue=thread_queue, synchronous=infallible, next ) diff --git a/src/libs/thread.liq b/src/libs/thread.liq index 45c4dbe699..d623577108 100644 --- a/src/libs/thread.liq +++ b/src/libs/thread.liq @@ -1,15 +1,13 @@ # Run a function in a separate thread. # @category Programming -# @param ~queue Queue to use for the task. Should be one of: `"generic"` or `"non_blocking"`. \ -# Non blocking should be reserved for tasks that are known to complete quickly. \ -# You can also use a dedicated queue name declared via `settings.scheduler.queues`. +# @param ~fast Whether the thread is supposed to return quickly or not. Typically, blocking tasks (e.g. fetching data over the internet) should not be considered to be fast. When set to `false` its priority will be lowered below that of request resolutions and fast timeouts. This is only effective if you set a dedicated queue for fast tasks, see the "scheduler" settings for more details. # @param ~delay Delay (in seconds) after which the thread should be launched. # @param ~every How often (in seconds) the thread should be run. If negative or `null`, run once. # @param ~on_error Error callback executed when an error occurred while running the given function. When passed, \ # all raised errors are silenced unless re-raised by the callback. # @param f Function to execute. def replaces thread.run( - ~queue="generic", + ~fast=true, ~delay=0., ~on_error=null(), ~every=null(), @@ -33,7 +31,7 @@ def replaces thread.run( on_error ) - thread.run.recurrent(queue=queue, delay=delay, on_error=on_error, f) + thread.run.recurrent(fast=fast, delay=delay, on_error=on_error, f) end # Execute a callback when a predicate is `true`. The predicate @@ -41,9 +39,7 @@ end # called when the predicate returns `true` after having been # `false`, following the same semantics as `predicate.activates`. # @category Programming -# @param ~queue Queue to use for the task. Should be one of: `"generic"` or `"non_blocking"`. \ -# Non blocking should be reserved for tasks that are known to complete quickly. \ -# You can also use a dedicated queue name declared via `settings.scheduler.queues`. +# @param ~fast Whether the callback is supposed to return quickly or not. # @param ~init Detect at beginning. # @param ~every How often (in sec.) to check for the predicate. # @param ~once Execute the function only once. @@ -53,7 +49,7 @@ end # @param p Predicate indicating when to execute the function, typically a time interval such as `{10h-10h30}`. # @param f Function to execute when the predicate is true. def thread.when( - ~queue="generic", + ~fast=true, ~init=true, ~every=getter(0.5), ~once=false, @@ -75,7 +71,7 @@ def thread.when( end end - thread.run.recurrent(queue=queue, delay=0., on_error=on_error, check) + thread.run.recurrent(fast=fast, delay=0., on_error=on_error, check) end # @flag hidden diff --git a/tests/language/error.liq b/tests/language/error.liq index 67b11c3c51..76e177412e 100644 --- a/tests/language/error.liq +++ b/tests/language/error.liq @@ -70,7 +70,7 @@ def f() = ) test.equal(r/error.liq, line 58 char 4 - line 63 char 7/.test(pos), true) - test.equal(r/thread.liq, line 21, char 11-14/.test(pos), true) + test.equal(r/thread.liq, line 19, char 11-14/.test(pos), true) e' = error.register("bla") test.equal(false, (e == e'))