Skip to content

Commit

Permalink
revert rename
Browse files Browse the repository at this point in the history
  • Loading branch information
electronicbites committed Sep 6, 2024
1 parent 8a97650 commit 54242d1
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
9 changes: 8 additions & 1 deletion lib/radiator/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ defmodule Radiator.Application do

@impl true
def start(_type, _args) do
job_runner_config = [
strategy: :one_for_one,
max_seconds: 30,
name: Radiator.JobRunner
]

children = [
RadiatorWeb.Telemetry,
Radiator.Repo,
Expand All @@ -25,7 +31,8 @@ defmodule Radiator.Application do
{EventProducer, name: EventProducer},
{CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]},
{NodeChangeListener, name: NodeChangeListener},
{DynamicSupervisor, strategy: :one_for_one, name: Radiator.JobProcessor}
{Registry, keys: :unique, name: Radiator.JobRegistry},
{DynamicSupervisor, job_runner_config}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
31 changes: 24 additions & 7 deletions lib/radiator/job_processor.ex → lib/radiator/job.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# test with
# GenServer.start(Radiator.JobProcessor, work: fn -> Process.sleep(5000);{:ok,[]} end)
defmodule Radiator.JobProcessor do
# GenServer.start(Radiator.Job, work: fn -> Process.sleep(5000);{:ok,[]} end)
defmodule Radiator.Job do
@moduledoc """
WIP: Job module to handle work in a GenServer
idea taken from https://pragprog.com/titles/sgdpelixir/concurrent-data-processing-in-elixir/
Expand All @@ -9,9 +9,26 @@ defmodule Radiator.JobProcessor do
require Logger

alias __MODULE__
alias Radiator.JobRunner
alias Radiator.JobSupervisor

defstruct [:work, :id, :max_retries, retries: 0, status: "new"]

def start_job(args) do
if Enum.count(running_imports()) >= 5 do
{:error, :import_quota_reached}
else
DynamicSupervisor.start_child(JobRunner, {JobSupervisor, args})
end
end

def running_imports() do
match_all = {:"$1", :"$2", :"$3"}
guards = [{:==, :"$3", "import"}]
map_result = [%{id: :"$1", pid: :"$2", type: :"$3"}]
Registry.select(Radiator.JobRegistry, [{match_all, guards, map_result}])
end

def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
Expand All @@ -21,7 +38,7 @@ defmodule Radiator.JobProcessor do
id = Keyword.get(args, :id, random_job_id())
max_retries = Keyword.get(args, :max_retries, 3)

state = %JobProcessor{id: id, work: work, max_retries: max_retries}
state = %Job{id: id, work: work, max_retries: max_retries}
{:ok, state, {:continue, :run}}
end

Expand All @@ -44,20 +61,20 @@ defmodule Radiator.JobProcessor do

defp handle_job_result({:ok, _data}, state) do
Logger.info("Job completed #{state.id}")
%JobProcessor{state | status: "done"}
%Job{state | status: "done"}
end

defp handle_job_result(:error, %{status: "new"} = state) do
Logger.warning("Job errored #{state.id}")
%JobProcessor{state | status: "errored"}
%Job{state | status: "errored"}
end

defp handle_job_result(:error, %{status: "errored"} = state) do
Logger.warning("Job retry failed #{state.id}")
new_state = %JobProcessor{state | retries: state.retries + 1}
new_state = %Job{state | retries: state.retries + 1}

if new_state.retries == state.max_retries do
%JobProcessor{new_state | status: "failed"}
%Job{new_state | status: "failed"}
else
new_state
end
Expand Down
21 changes: 21 additions & 0 deletions lib/radiator/job_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

defmodule Radiator.JobSupervisor do
use Supervisor, restart: :temporary

def start_link(args) do
Supervisor.start_link(__MODULE__, args)
end

def init(args) do
children = [
{Radiator.Job, args}
]

options = [
strategy: :one_for_one,
max_seconds: 30
]

Supervisor.init(children, options)
end
end

0 comments on commit 54242d1

Please sign in to comment.