Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/monitor services #9

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
15 changes: 0 additions & 15 deletions .travis.yml

This file was deleted.

5 changes: 2 additions & 3 deletions apps/cronitex/lib/cronitex/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ defmodule Cronitex.Application do
# Start the Ecto repository
Cronitex.Repo,
# Start the PubSub system
{Phoenix.PubSub, name: Cronitex.PubSub}
# Start a worker by calling: Cronitex.Worker.start_link(arg)
# {Cronitex.Worker, arg}
{Phoenix.PubSub, name: Cronitex.PubSub},
Cronitex.MonitorServices.CronMonitorSupervisor
]

Supervisor.start_link(children, strategy: :one_for_one, name: Cronitex.Supervisor)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
defmodule Cronitex.MonitorServices.CronMonitorServer do
use GenServer
alias Cronitex.MonitorServices.LiveUpdates

def start_link(init_arg, options) do
GenServer.start_link(__MODULE__, init_arg, options)
end

@impl true
def init(state) do
state = update_monitor_state(state, :waiting)
state = schedule_work(state)
{:ok, state}
end

@impl true
def handle_info(:work, state) do
state = schedule_work(state)
state = update_monitor_state(state, :waiting)
{:noreply, state}
end

@impl true
def handle_info(:start_ping, state) do
# If we get a success ping, disregard the timeout
state = cancel_and_remove_timer(state, :timeout_timer)
state = update_monitor_state(state, :ok)
{:noreply, state}
end

@impl true
def handle_info(:timeout, state) do
state = update_monitor_state(state, :start_timeout)
{:noreply, state}
end

@impl true
def handle_info(:stop, state) do
state = cancel_and_remove_timer(state, :work_timer)
state = cancel_and_remove_timer(state, :timeout_timer)
state = update_monitor_state(state, :stopped)
{:noreply, state}
end

defp cancel_and_remove_timer(state, timer_key) when is_map_key(state, timer_key) do
Process.cancel_timer(state[timer_key])
Map.pop(state, timer_key)
end

defp cancel_and_remove_timer(state, _timer_key), do: state

defp update_monitor_state(state, monitor_state) do
state = Map.put(state, :monitor_state, monitor_state)
LiveUpdates.notify_live_view_for_monitor_id(state.config.token, monitor_state)
state
end


defp schedule_work(state) do
# Whenever we schedule work, we need to set two timers, one that executes with the crontab, and one that executes with the timeout
{:ok, next_rundate} = Crontab.Scheduler.get_next_run_date(state.config.cron_expression)
milliseconds_till_run = NaiveDateTime.diff(next_rundate, DateTime.to_naive(DateTime.utc_now()), :millisecond)
timer = Process.send_after(self(), :work, milliseconds_till_run)
state = Map.put(state, :work_timer, timer)

timeout_milliseconds = state.config.start_tolerance_seconds * 1000
timeout_timer = Process.send_after(self(), :timeout, milliseconds_till_run + timeout_milliseconds)
state = Map.put(state, :timeout_timer, timeout_timer)
state
end

end
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Cronitex.MonitorServices.CronMonitorSupervisor do
use Supervisor

alias Cronitex.Monitors
alias Cronitex.MonitorServices.CronMonitorServer

def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

@impl true
def init(_init_arg) do
Monitors.list_cronmonitors()
|> start_cron_monitor_servers()
end

def start_cron_monitor_servers(monitors) do
monitors
|> Enum.into([], &cron_monitor_to_child_map/1)
|> Supervisor.init(strategy: :one_for_one)
end

defp cron_monitor_to_child_map(model) do
%{
id: model.token,
start: {CronMonitorServer, :start_link, [%{config: model}, []]}
}
end
end
11 changes: 11 additions & 0 deletions apps/cronitex/lib/cronitex/monitor_services/live_updates.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Cronitex.MonitorServices.LiveUpdates do
alias Phoenix

def subscribe_live_view_for_monitor_id(monitor_id) do
Phoenix.PubSub.subscribe(Cronitex.PubSub, monitor_id, link: true)
end

def notify_live_view_for_monitor_id(monitor_id, status) do
Phoenix.PubSub.broadcast(Cronitex.PubSub, monitor_id, status)
end
end
9 changes: 0 additions & 9 deletions apps/cronitex/lib/cronitex/monitors/cron_monitor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
defmodule Cronitex.Monitors.CronMonitor do
use Ecto.Schema
import Ecto.Changeset
import Crontab.CronExpression.Ecto.Type

schema "cronmonitors" do
field :name, :string
Expand All @@ -24,14 +23,6 @@ defmodule Cronitex.Monitors.CronMonitor do
|> put_token()
end

defp check_cron_expression(changeset) do
case Crontab.CronExpression.Parser.parse(changeset.data.cron_expression) do
{:ok, _expression} -> changeset
{:error, _error} -> add_error(changeset, :cron_expression, "Invalid Cron Expression.")
end

end

defp put_token(changeset) do
unless changeset.data.token do
put_change(changeset, :token, Ecto.UUID.generate())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Cronitex.MonitorServices.CronMonitorSupervisorTests do
use Cronitex.DataCase

alias Cronitex.MonitorServices.CronMonitorSupervisor
alias Cronitex.TestHelpers
alias Cronitex.Monitors

test "supervisor spawns with correct children" do
pid = Process.whereis(CronMonitorSupervisor)
%{active: active} = Supervisor.count_children(pid)
assert active == 0
end

test "supervisor respawns with corrct children" do
user = TestHelpers.user_fixture()
{:ok, monitor} = Monitors.create_cron_monitor(user, %{name: "valid cronmon", cron_expression: "* * * * * *"})

pid = Process.whereis(CronMonitorSupervisor)
ref = Process.monitor(pid)
Process.exit(pid, :kill)
receive do
{:DOWN, ^ref, :process, ^pid, :killed} ->
:timer.sleep 1

# Now that we've created a monitor and restarted the process, we should expect that there's an active child of
# our supervisor
pid = Process.whereis(CronMonitorSupervisor)
%{active: active} = Supervisor.count_children(pid)
assert active == 1
after
1000 ->
raise :timeout
end

end

end
25 changes: 25 additions & 0 deletions apps/cronitex/test/cronitex/monitor_services/live_updates_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Cronitex.MonitorServices.LiveUpdatesTest do
use Cronitex.DataCase, async: true

alias Cronitex.MonitorServices.LiveUpdates

@monitor_id "1234-1234-1234-1234"

test "nofify and subscribe for monitor token work correctly" do
# Subscribe to notifications
LiveUpdates.subscribe_live_view_for_monitor_id(@monitor_id)

# Make sure we have no messages
{:messages, messages} = Process.info(self(), :messages)
assert Enum.count(messages) == 0

# Notifiy on the same topic, and assert that we have one message with our payload
LiveUpdates.notify_live_view_for_monitor_id(@monitor_id, :hello)
{:messages, messages} = Process.info(self(), :messages)

assert Enum.count(messages) == 1
[message | _tail] = messages
assert message == :hello
end

end
2 changes: 1 addition & 1 deletion apps/cronitex/test/cronitex/monitors_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Cronitex.MonitorsTest do
use Cronitex.DataCase
use Cronitex.DataCase, async: true
import Crontab.CronExpression
alias Cronitex.Monitors

Expand Down
10 changes: 7 additions & 3 deletions apps/cronitex/test/support/data_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ defmodule Cronitex.DataCase do
end
end

setup _tags do
Ecto.Adapters.SQL.Sandbox.checkout(Cronitex.Repo)
setup tags do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Cronitex.Repo)

# Don't need handling for other tags here, will only be using postgres.
unless tags[:async] do
Ecto.Adapters.SQL.Sandbox.mode(Cronitex.Repo, {:shared, self()})
end

:ok
end

@doc """
Expand Down
20 changes: 19 additions & 1 deletion apps/cronitex_web/assets/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,22 @@ import "../css/app.scss"
// import {Socket} from "phoenix"
// import socket from "./socket"
//
import "phoenix_html"
import "phoenix_html"

// assets/js/app.js
import {Socket} from "phoenix"
import LiveSocket from "phoenix_live_view"

let csrfToken = document.querySelector("meta[name='csrf-token']").getAttribute("content")
let liveSocket = new LiveSocket("/live", Socket, {params: {_csrf_token: csrfToken}})

// Connect if there are any LiveViews on the page
liveSocket.connect()

// Expose liveSocket on window for web console debug logs and latency simulation:
// >> liveSocket.enableDebug()
// >> liveSocket.enableLatencySim(1000)
// The latency simulator is enabled for the duration of the browser session.
// Call disableLatencySim() to disable:
// >> liveSocket.disableLatencySim()
window.liveSocket = liveSocket
3 changes: 3 additions & 0 deletions apps/cronitex_web/assets/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/cronitex_web/assets/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
},
"dependencies": {
"phoenix": "file:../../../deps/phoenix",
"phoenix_html": "file:../../../deps/phoenix_html"
"phoenix_html": "file:../../../deps/phoenix_html",
"phoenix_live_view": "file:../../../deps/phoenix_live_view"
},
"devDependencies": {
"@babel/core": "^7.0.0",
Expand Down
4 changes: 3 additions & 1 deletion apps/cronitex_web/lib/cronitex_web.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule CronitexWeb do
import CronitexWeb.Gettext
import CronitexWeb.Auth, only: [authenticate_user: 2]
alias CronitexWeb.Router.Helpers, as: Routes
import Phoenix.LiveView.Controller
end
end

Expand All @@ -36,7 +37,7 @@ defmodule CronitexWeb do

# Import convenience functions from controllers
import Phoenix.Controller, only: [get_flash: 1, get_flash: 2, view_module: 1]

import Phoenix.LiveView.Helpers
# Include shared imports and aliases for views
unquote(view_helpers())
end
Expand All @@ -49,6 +50,7 @@ defmodule CronitexWeb do
import Plug.Conn
import Phoenix.Controller
import CronitexWeb.Auth, only: [authenticate_user: 2]
import Phoenix.LiveView.Router
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule CronitexWeb.CronMonitorStatusLive do
use Phoenix.LiveView
alias Cronitex.MonitorServices.LiveUpdates


def render(assigns) do
~L"""
<%= @status %>
"""
end

def handle_info(message, socket) do
{:noreply, assign(socket, :status, message)}
end

def mount(_params, %{"monitor_token" => monitor_token}, socket) do
LiveUpdates.subscribe_live_view_for_monitor_id(monitor_token)

{:ok, assign(socket, :status, "Ready!")}
end
end
5 changes: 3 additions & 2 deletions apps/cronitex_web/lib/cronitex_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ defmodule CronitexWeb.Router do
pipeline :browser do
plug :accepts, ["html"]
plug :fetch_session
plug :fetch_flash
plug :fetch_live_flash
plug :protect_from_forgery
plug :put_secure_browser_headers
plug :put_root_layout, {CronitexWeb.LayoutView, :root}
plug CronitexWeb.Auth
end

Expand All @@ -18,7 +19,7 @@ defmodule CronitexWeb.Router do
pipe_through :browser

get "/", PageController, :index
resources "/users", UserController, only: [:new, :create, :show, :delete]
resources "/users", UserController, only: [:index, :new, :create, :show, :delete, :edit]
resources "/sessions", SessionController, only: [:new, :create, :delete]
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<th>Token</th>
<th>Cron expression</th>
<th>Start tolerance seconds</th>

<th>Status</th>
<th></th>
</tr>
</thead>
Expand All @@ -18,6 +18,7 @@
<td><%= cron_monitor.token %></td>
<td><%= cron_str(cron_monitor.cron_expression) %></td>
<td><%= cron_monitor.start_tolerance_seconds %></td>
<td><%= live_render(@conn, CronitexWeb.CronMonitorStatusLive, session: %{"monitor_token" => cron_monitor.token}) %></td>

<td>
<span><%= link "Show", to: Routes.cron_monitor_path(@conn, :show, cron_monitor) %></span>
Expand Down
Loading