From 36395cedce00844dcd5a262a8b662cc745a699f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20W=C3=B6ginger?= Date: Thu, 14 Mar 2024 22:14:31 +0100 Subject: [PATCH] WIP: event consumer --- lib/radiator/application.ex | 4 ++- lib/radiator/outline/event_consumer.ex | 43 ++++++++++++++++++++++++++ lib/radiator/outline/event_producer.ex | 6 ++++ 3 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 lib/radiator/outline/event_consumer.ex diff --git a/lib/radiator/application.ex b/lib/radiator/application.ex index b5e5bde0..7920efbb 100644 --- a/lib/radiator/application.ex +++ b/lib/radiator/application.ex @@ -5,6 +5,7 @@ defmodule Radiator.Application do use Application + alias Radiator.Outline.EventConsumer alias Radiator.Outline.EventProducer @impl true @@ -20,7 +21,8 @@ defmodule Radiator.Application do # {Radiator.Worker, arg}, # Start to serve requests, typically the last entry RadiatorWeb.Endpoint, - {EventProducer, name: EventProducer} + {EventProducer, name: EventProducer}, + {EventConsumer, name: EventConsumer} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/radiator/outline/event_consumer.ex b/lib/radiator/outline/event_consumer.ex new file mode 100644 index 00000000..8676870e --- /dev/null +++ b/lib/radiator/outline/event_consumer.ex @@ -0,0 +1,43 @@ +defmodule Radiator.Outline.EventConsumer do + use GenStage + alias Radiator.Outline.EventProducer + + def start_link(opts \\ []) do + GenStage.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(_opts) do + options = [] + {:consumer, :event_producer, subscribe_to: [{EventProducer, options}]} + end + + def handle_events(events, _from, state) do + IO.inspect(events, label: "EventConsumer handle_events") + + Enum.each(events, fn event -> + process_event(event, state) + IO.inspect(event, label: "EventConsumer handle_events event") + end) + + {:noreply, [], state} + end + + defp process_event(%InsertNodeEvent{} = event) do + # validate + # true-> + # database action: insert node() + # create && persist event (event contains all attributes, user, event_id, timestamps) + # broadcast event (topic: episode_id) + # false-> + # log error and return error (audit log) + end + + defp handle_result(:ok, event) do + persist_event(event) + broadcast_success(event) + end + + defp handle_result(:error, event) do + broadcast_error(event) + end +end diff --git a/lib/radiator/outline/event_producer.ex b/lib/radiator/outline/event_producer.ex index 54021c59..e21cfd32 100644 --- a/lib/radiator/outline/event_producer.ex +++ b/lib/radiator/outline/event_producer.ex @@ -15,6 +15,12 @@ defmodule Radiator.Outline.EventProducer do end def handle_cast({:enqueue, event}, state) do + IO.inspect(state, label: "EventProducer. handle_cast") {:noreply, [event], state} end + + def handle_demand(demand, state) do + IO.inspect(demand, label: "EventConsumer") + {:noreply, [], state} + end end