diff --git a/lib/radiator/application.ex b/lib/radiator/application.ex index 88e2e5cb..3a639c2c 100644 --- a/lib/radiator/application.ex +++ b/lib/radiator/application.ex @@ -24,7 +24,8 @@ defmodule Radiator.Application do RadiatorWeb.Endpoint, {EventProducer, name: EventProducer}, {CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]}, - {NodeChangeListener, name: NodeChangeListener} + {NodeChangeListener, name: NodeChangeListener}, + {DynamicSupervisor, strategy: :one_for_one, name: Radiator.JobRunner} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/radiator/job.ex b/lib/radiator/job.ex new file mode 100644 index 00000000..dae3c522 --- /dev/null +++ b/lib/radiator/job.ex @@ -0,0 +1,68 @@ +# test with +# GenServer.start(Radiator.Job, work: fn -> Process.sleep(5000);{:ok,[]} end) +defmodule Radiator.Job do + @moduledoc """ + WIP: Job module to handle work in a GenServer + rename: JobProcessor + idea taken from https://pragprog.com/titles/sgdpelixir/concurrent-data-processing-in-elixir/ + """ + use GenServer, restart: :transient + require Logger + + defstruct [:work, :id, :max_retries, retries: 0, status: "new"] + + def start_link(args) do + GenServer.start_link(__MODULE__, args) + end + + def init(args) do + work = Keyword.fetch!(args, :work) + id = Keyword.get(args, :id, random_job_id()) + max_retries = Keyword.get(args, :max_retries, 3) + + state = %Radiator.Job{id: id, work: work, max_retries: max_retries} + {:ok, state, {:continue, :run}} + end + + def handle_continue(:run, state) do + new_state = state.work.() |> handle_job_result(state) + + if new_state.status == "errored" do + Process.send_after(self(), :retry, 5000) + {:noreply, new_state} + else + Logger.info("Job exiting #{state.id}") + {:stop, :normal, new_state} + end + end + + def handle_info(:retry, state) do + # Delegate work to the `handle_continue/2` callback. + {:noreply, state, {:continue, :run}} + end + + defp handle_job_result({:ok, _data}, state) do + Logger.info("Job completed #{state.id}") + %Radiator.Job{state | status: "done"} + end + + defp handle_job_result(:error, %{status: "new"} = state) do + Logger.warning("Job errored #{state.id}") + %Radiator.Job{state | status: "errored"} + end + + defp handle_job_result(:error, %{status: "errored"} = state) do + Logger.warning("Job retry failed #{state.id}") + new_state = %Radiator.Job{state | retries: state.retries + 1} + + if new_state.retries == state.max_retries do + %Radiator.Job{new_state | status: "failed"} + else + new_state + end + end + + defp random_job_id do + :crypto.strong_rand_bytes(5) |> Base.url_encode64(padding: false) + end +end diff --git a/mix.exs b/mix.exs index eeff958f..79376c92 100644 --- a/mix.exs +++ b/mix.exs @@ -19,7 +19,7 @@ defmodule Radiator.MixProject do def application do [ mod: {Radiator.Application, []}, - extra_applications: [:logger, :runtime_tools] + extra_applications: [:logger, :runtime_tools, :crypto] ] end