From 3f0f00bede512d9078b46b2aa3358e0dcc9146c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20W=C3=B6ginger?= Date: Thu, 14 Mar 2024 21:38:26 +0100 Subject: [PATCH] start implementing Genstage server for producing outline events --- lib/radiator/application.ex | 5 ++++- lib/radiator/outline/event.ex | 16 +++++++++++++++ lib/radiator/outline/event_producer.ex | 20 ++++++++++++++++++ lib/radiator/outline/server.ex | 28 ++++++++++++++++++++++++++ mix.exs | 1 + mix.lock | 1 + 6 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 lib/radiator/outline/event.ex create mode 100644 lib/radiator/outline/event_producer.ex create mode 100644 lib/radiator/outline/server.ex diff --git a/lib/radiator/application.ex b/lib/radiator/application.ex index 5e50461c..b5e5bde0 100644 --- a/lib/radiator/application.ex +++ b/lib/radiator/application.ex @@ -5,6 +5,8 @@ defmodule Radiator.Application do use Application + alias Radiator.Outline.EventProducer + @impl true def start(_type, _args) do children = [ @@ -17,7 +19,8 @@ defmodule Radiator.Application do # Start a worker by calling: Radiator.Worker.start_link(arg) # {Radiator.Worker, arg}, # Start to serve requests, typically the last entry - RadiatorWeb.Endpoint + RadiatorWeb.Endpoint, + {EventProducer, name: EventProducer} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/radiator/outline/event.ex b/lib/radiator/outline/event.ex new file mode 100644 index 00000000..3b20c63d --- /dev/null +++ b/lib/radiator/outline/event.ex @@ -0,0 +1,16 @@ +defmodule Radiator.Outline.Event do + alias Radiator.Outline.EventProducer + + def build(event_id, event_type, user_id, payload) do + %{ + event_id: event_id, + event_type: event_type, + user_id: user_id, + payload: payload + } + end + + def enqueue(event) do + EventProducer.enqueue(event) + end +end diff --git a/lib/radiator/outline/event_producer.ex b/lib/radiator/outline/event_producer.ex new file mode 100644 index 00000000..54021c59 --- /dev/null +++ b/lib/radiator/outline/event_producer.ex @@ -0,0 +1,20 @@ +defmodule Radiator.Outline.EventProducer do + use GenStage + + def start_link(opts \\ []) do + GenStage.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(_opts) do + {:producer, []} + end + + def enqueue(event) do + GenStage.cast(__MODULE__, {:enqueue, event}) + :ok + end + + def handle_cast({:enqueue, event}, state) do + {:noreply, [event], state} + end +end diff --git a/lib/radiator/outline/server.ex b/lib/radiator/outline/server.ex new file mode 100644 index 00000000..5ae9fde3 --- /dev/null +++ b/lib/radiator/outline/server.ex @@ -0,0 +1,28 @@ +defmodule Radiator.Outline.Server do + alias Radiator.Outline.Event + + def insert_node(attributes, user_id, event_id) do + "insert_node" + |> Event.build(attributes, user_id, event_id) + |> Event.enqueue() + + # generate event + # send to Eventserver + # validate + # true-> + # database action: insert node() + # create && persist event (event contains all attributes, user, event_id, timestamps) + # broadcast event (topic: episode_id) + # broadcast node (topic: episode_id) + # false-> + # log error and return error (audit log) + :ok + end + + # TODO + # update_node + # delete_node + # move_node + + # list_node different case, sync call +end diff --git a/mix.exs b/mix.exs index fc294aca..2015fc2e 100644 --- a/mix.exs +++ b/mix.exs @@ -40,6 +40,7 @@ defmodule Radiator.MixProject do {:finch, "~> 0.13"}, {:floki, ">= 0.30.0", only: :test}, {:gen_smtp, "~> 1.1"}, + {:gen_stage, "~> 1.2"}, {:gettext, "~> 0.20"}, {:jason, "~> 1.2"}, {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 74e51b0b..581957ed 100644 --- a/mix.lock +++ b/mix.lock @@ -19,6 +19,7 @@ "finch": {:hex, :finch, "0.16.0", "40733f02c89f94a112518071c0a91fe86069560f5dbdb39f9150042f44dcfb1a", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f660174c4d519e5fec629016054d60edd822cdfe2b7270836739ac2f97735ec5"}, "floki": {:hex, :floki, "0.35.2", "87f8c75ed8654b9635b311774308b2760b47e9a579dabf2e4d5f1e1d42c39e0b", [:mix], [], "hexpm", "6b05289a8e9eac475f644f09c2e4ba7e19201fd002b89c28c1293e7bd16773d9"}, "gen_smtp": {:hex, :gen_smtp, "1.2.0", "9cfc75c72a8821588b9b9fe947ae5ab2aed95a052b81237e0928633a13276fd3", [:rebar3], [{:ranch, ">= 1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "5ee0375680bca8f20c4d85f58c2894441443a743355430ff33a783fe03296779"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "gettext": {:hex, :gettext, "0.23.1", "821e619a240e6000db2fc16a574ef68b3bd7fe0167ccc264a81563cc93e67a31", [:mix], [{:expo, "~> 0.4.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "19d744a36b809d810d610b57c27b934425859d158ebd56561bc41f7eeb8795db"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},