From 54242d11a356d72cb008105bea78b76627fa077f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20W=C3=B6ginger?= Date: Fri, 6 Sep 2024 21:01:45 +0200 Subject: [PATCH] revert rename --- lib/radiator/application.ex | 9 ++++++- lib/radiator/{job_processor.ex => job.ex} | 31 ++++++++++++++++++----- lib/radiator/job_supervisor.ex | 21 +++++++++++++++ 3 files changed, 53 insertions(+), 8 deletions(-) rename lib/radiator/{job_processor.ex => job.ex} (65%) create mode 100644 lib/radiator/job_supervisor.ex diff --git a/lib/radiator/application.ex b/lib/radiator/application.ex index 27065f49..ca8da6a3 100644 --- a/lib/radiator/application.ex +++ b/lib/radiator/application.ex @@ -11,6 +11,12 @@ defmodule Radiator.Application do @impl true def start(_type, _args) do + job_runner_config = [ + strategy: :one_for_one, + max_seconds: 30, + name: Radiator.JobRunner + ] + children = [ RadiatorWeb.Telemetry, Radiator.Repo, @@ -25,7 +31,8 @@ defmodule Radiator.Application do {EventProducer, name: EventProducer}, {CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]}, {NodeChangeListener, name: NodeChangeListener}, - {DynamicSupervisor, strategy: :one_for_one, name: Radiator.JobProcessor} + {Registry, keys: :unique, name: Radiator.JobRegistry}, + {DynamicSupervisor, job_runner_config} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/radiator/job_processor.ex b/lib/radiator/job.ex similarity index 65% rename from lib/radiator/job_processor.ex rename to lib/radiator/job.ex index d54b7122..982f6a96 100644 --- a/lib/radiator/job_processor.ex +++ b/lib/radiator/job.ex @@ -1,6 +1,6 @@ # test with -# GenServer.start(Radiator.JobProcessor, work: fn -> Process.sleep(5000);{:ok,[]} end) -defmodule Radiator.JobProcessor do +# GenServer.start(Radiator.Job, work: fn -> Process.sleep(5000);{:ok,[]} end) +defmodule Radiator.Job do @moduledoc """ WIP: Job module to handle work in a GenServer idea taken from https://pragprog.com/titles/sgdpelixir/concurrent-data-processing-in-elixir/ @@ -9,9 +9,26 @@ defmodule Radiator.JobProcessor do require Logger alias __MODULE__ + alias Radiator.JobRunner + alias Radiator.JobSupervisor defstruct [:work, :id, :max_retries, retries: 0, status: "new"] + def start_job(args) do + if Enum.count(running_imports()) >= 5 do + {:error, :import_quota_reached} + else + DynamicSupervisor.start_child(JobRunner, {JobSupervisor, args}) + end + end + + def running_imports() do + match_all = {:"$1", :"$2", :"$3"} + guards = [{:==, :"$3", "import"}] + map_result = [%{id: :"$1", pid: :"$2", type: :"$3"}] + Registry.select(Radiator.JobRegistry, [{match_all, guards, map_result}]) + end + def start_link(args) do GenServer.start_link(__MODULE__, args) end @@ -21,7 +38,7 @@ defmodule Radiator.JobProcessor do id = Keyword.get(args, :id, random_job_id()) max_retries = Keyword.get(args, :max_retries, 3) - state = %JobProcessor{id: id, work: work, max_retries: max_retries} + state = %Job{id: id, work: work, max_retries: max_retries} {:ok, state, {:continue, :run}} end @@ -44,20 +61,20 @@ defmodule Radiator.JobProcessor do defp handle_job_result({:ok, _data}, state) do Logger.info("Job completed #{state.id}") - %JobProcessor{state | status: "done"} + %Job{state | status: "done"} end defp handle_job_result(:error, %{status: "new"} = state) do Logger.warning("Job errored #{state.id}") - %JobProcessor{state | status: "errored"} + %Job{state | status: "errored"} end defp handle_job_result(:error, %{status: "errored"} = state) do Logger.warning("Job retry failed #{state.id}") - new_state = %JobProcessor{state | retries: state.retries + 1} + new_state = %Job{state | retries: state.retries + 1} if new_state.retries == state.max_retries do - %JobProcessor{new_state | status: "failed"} + %Job{new_state | status: "failed"} else new_state end diff --git a/lib/radiator/job_supervisor.ex b/lib/radiator/job_supervisor.ex new file mode 100644 index 00000000..5e0c3320 --- /dev/null +++ b/lib/radiator/job_supervisor.ex @@ -0,0 +1,21 @@ + +defmodule Radiator.JobSupervisor do + use Supervisor, restart: :temporary + + def start_link(args) do + Supervisor.start_link(__MODULE__, args) + end + + def init(args) do + children = [ + {Radiator.Job, args} + ] + + options = [ + strategy: :one_for_one, + max_seconds: 30 + ] + + Supervisor.init(children, options) + end +end