Galena is a topic producer-consumer library built on top of GenStage for Elixir.
I highly recommend to initiate your producers/consumers under a Supervisor.
If available in Hex, the package can be installed as:
to your list of dependencies inmix.exs
:def deps do [{:galena, "~> 0.1.2"}] end
- Define your Producer(s). Your producer could be connected to external system like RabbitMQ, Kafka, DataBases, ... Code the function handle_produce(data), where data can be whatever, and has to return a tuple where the first value has to be a topic and the second one the message. To guarantee a good performance, try to optimize as much as possible this function.
defmodule MyProducer do
use Galena.Producer
def handle_produce({topic, data}) do
{topic, data}
- Define your Producer-Consumer(s). A Producer-Consumer will have the functionalities of a consumer and a producer. It needs an implementation close to a producer (handle_produce(topic, data)) and the initialization of a consumer.
defmodule MyProducerConsumer do
use Galena.ProducerConsumer
def handle_produce(topic, data) do
result_topic = topic <> Integer.to_string(:rand.uniform(2))
{result_topic, "modified by producer-consumer: " <> data}
- Define and run your Consumer. Code the function handle_consume(topic, message). A consumer could be subscribed to different topics of the same producer or even to different producers. We have to indicate it using a Keyword list as first parameter which has to contain the information about the producers.
# Example
args = [
producers_info: [
{["topic_1", "topic_2", "topic_3"], :producer1},
{["topic_A"], :producer2},
{["topic_a", "topic_b"], :producer3},
{[], :producer4}
defmodule MyConsumer do
use Galena.Consumer
def handle_consume(topic, message) do
IO.puts(topic <> ": " <> message)
- Run and begin to ingest data
# One producer
iex> MyProducer.start_link([], [name: :producer])
# One producer-consumer
iex> MyProducerConsumer.start_link([producers_info: [{["topic"], :producer}]], [name: :prod_cons])
# Two consumers
iex> MyConsumer.start_link([producers_info: [{["topic1"], :prod_cons}]], [name: :consumer1])
iex> MyConsumer.start_link([producers_info: [{["topic2"], :prod_cons}]], [name: :consumer2])
# Data ingestion
iex> for i <- 1..100, do: MyProducer.ingest(:producer, {"topic", "Hola" <> Integer.to_string(:rand.uniform(100))})
- Run the tests.
mix test