Skip to content

Commit

Permalink
WIP: event consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
electronicbites committed Mar 14, 2024
1 parent 3f0f00b commit 36395ce
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
4 changes: 3 additions & 1 deletion lib/radiator/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Radiator.Application do

use Application

alias Radiator.Outline.EventConsumer
alias Radiator.Outline.EventProducer

@impl true
Expand All @@ -20,7 +21,8 @@ defmodule Radiator.Application do
# {Radiator.Worker, arg},
# Start to serve requests, typically the last entry
RadiatorWeb.Endpoint,
{EventProducer, name: EventProducer}
{EventProducer, name: EventProducer},
{EventConsumer, name: EventConsumer}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
43 changes: 43 additions & 0 deletions lib/radiator/outline/event_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Radiator.Outline.EventConsumer do
use GenStage
alias Radiator.Outline.EventProducer

def start_link(opts \\ []) do
GenStage.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(_opts) do
options = []
{:consumer, :event_producer, subscribe_to: [{EventProducer, options}]}
end

def handle_events(events, _from, state) do
IO.inspect(events, label: "EventConsumer handle_events")

Enum.each(events, fn event ->
process_event(event, state)
IO.inspect(event, label: "EventConsumer handle_events event")
end)

{:noreply, [], state}
end

defp process_event(%InsertNodeEvent{} = event) do
# validate
# true->
# database action: insert node()
# create && persist event (event contains all attributes, user, event_id, timestamps)
# broadcast event (topic: episode_id)
# false->
# log error and return error (audit log)
end

defp handle_result(:ok, event) do
persist_event(event)
broadcast_success(event)
end

defp handle_result(:error, event) do
broadcast_error(event)
end
end
6 changes: 6 additions & 0 deletions lib/radiator/outline/event_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ defmodule Radiator.Outline.EventProducer do
end

def handle_cast({:enqueue, event}, state) do
IO.inspect(state, label: "EventProducer. handle_cast")
{:noreply, [event], state}
end

def handle_demand(demand, state) do
IO.inspect(demand, label: "EventConsumer")
{:noreply, [], state}
end
end

0 comments on commit 36395ce

Please sign in to comment.