diff --git a/.gitignore b/.gitignore index 279b41ef..ffc324db 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ doc .elixir_ls cover + +# Dialyzer's Persistent Lookup Table +priv/plts/ \ No newline at end of file diff --git a/README.md b/README.md index c0c84d34..20b3073f 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ KafkaEx [![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](http://hexdocs.pm/kafka_ex/) KafkaEx is an Elixir client for [Apache Kafka](http://kafka.apache.org/) with -support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.5+ and +support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.6+ and Erlang OTP 19+. See [http://hexdocs.pm/kafka_ex/](http://hexdocs.pm/kafka_ex/) for @@ -18,7 +18,7 @@ documentation, [https://github.com/kafkaex/kafka_ex/](https://github.com/kafkaex/kafka_ex/) for code. -KakfaEx supports the following Kafka features: +KafkaEx supports the following Kafka features: * Broker and Topic Metadata * Produce Messages diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index 2ab5c5a2..05ce8b39 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -297,11 +297,9 @@ defmodule KafkaEx.ConsumerGroup do def active?(supervisor_pid, timeout \\ 5000) do consumer_supervisor = consumer_supervisor_pid(supervisor_pid, timeout) - if consumer_supervisor && Process.alive?(consumer_supervisor) do + consumer_supervisor && + Process.alive?(consumer_supervisor) && GenConsumer.Supervisor.active?(consumer_supervisor) - else - false - end end @doc """ @@ -334,9 +332,17 @@ defmodule KafkaEx.ConsumerGroup do opts ) do child = - supervisor( - KafkaEx.GenConsumer.Supervisor, - [{gen_consumer_module, consumer_module}, group_name, assignments, opts], + Supervisor.child_spec( + { + KafkaEx.GenConsumer.Supervisor, + %{ + gen_consumer_module: gen_consumer_module, + consumer_module: consumer_module, + group_name: group_name, + assignments: assignments, + opts: opts + } + }, id: :consumer ) @@ -363,10 +369,8 @@ defmodule KafkaEx.ConsumerGroup do opts = Keyword.put(opts, :supervisor_pid, self()) children = [ - worker( - KafkaEx.ConsumerGroup.Manager, - [{gen_consumer_module, consumer_module}, group_name, topics, opts] - ) + {KafkaEx.ConsumerGroup.Manager, + {{gen_consumer_module, consumer_module}, group_name, topics, opts}} ] Supervisor.init(children, diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index ec22e041..3cbf4873 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -53,18 +53,18 @@ defmodule KafkaEx.ConsumerGroup.Manager do @doc false # use `KafkaEx.ConsumerGroup.start_link/4` instead - @spec start_link( + @spec start_link({ {module, module}, binary, [binary], KafkaEx.GenConsumer.options() - ) :: GenServer.on_start() - def start_link( + }) :: GenServer.on_start() + def start_link({ {gen_consumer_module, consumer_module}, group_name, topics, - opts \\ [] - ) do + opts + }) do gen_server_opts = Keyword.get(opts, :gen_server_opts, []) consumer_opts = Keyword.drop(opts, [:gen_server_opts]) diff --git a/lib/kafka_ex/gen_consumer/supervisor.ex b/lib/kafka_ex/gen_consumer/supervisor.ex index f69f6ade..e47bf036 100644 --- a/lib/kafka_ex/gen_consumer/supervisor.ex +++ b/lib/kafka_ex/gen_consumer/supervisor.ex @@ -15,15 +15,23 @@ defmodule KafkaEx.GenConsumer.Supervisor do use DynamicSupervisor + if Version.match?(System.version(), ">= 1.7.0") do + @doc since: "0.14.0" + end + @doc """ Starts a `GenConsumer.Supervisor` process linked to the current process. `gen_consumer_module` is a module that implements the `GenServer` behaviour which consumes events from Kafka. + `consumer_module` is a module that implements the `GenConsumer` behaviour. - `group_name` is the name of a consumer group, and `assignments` is a list of - partitions for the `GenConsumer`s to consume. `opts` accepts the same - options as `KafkaEx.GenConsumer.start_link/5`. + + `group_name` is the name of a consumer group. + + `assignments` is a list of partitions for the `GenConsumer`s to consume. + + `opts` accepts the same options as `KafkaEx.GenConsumer.start_link/5`. ### Return Values @@ -32,20 +40,22 @@ defmodule KafkaEx.GenConsumer.Supervisor do If the supervisor and its consumers are successfully created, this function returns `{:ok, pid}`, where `pid` is the PID of the supervisor. """ - @spec start_link( - {gen_consumer_module :: module, consumer_module :: module}, - consumer_group_name :: binary, - assigned_partitions :: [ + @spec start_link(%{ + gen_consumer_module: module, + consumer_module: module, + group_name: binary, + assignments: [ {topic_name :: binary, partition_id :: non_neg_integer} ], - KafkaEx.GenConsumer.options() - ) :: Elixir.Supervisor.on_start() - def start_link( - {gen_consumer_module, consumer_module}, - group_name, - assignments, - opts \\ [] - ) do + opts: KafkaEx.GenConsumer.options() + }) :: Supervisor.on_start() + def start_link(%{ + gen_consumer_module: gen_consumer_module, + consumer_module: consumer_module, + group_name: group_name, + assignments: assignments, + opts: opts + }) do start_link_result = DynamicSupervisor.start_link( __MODULE__, @@ -71,6 +81,39 @@ defmodule KafkaEx.GenConsumer.Supervisor do end end + @deprecated "Use start_link/1 instead" + @doc """ + Starts a `GenConsumer.Supervisor` process linked to the current process. + + Refer to `start_link/1` for documentation of each parameter. + + ### Return Values + + Same as `start_link/1`. + """ + @spec start_link( + {gen_consumer_module :: module, consumer_module :: module}, + consumer_group_name :: binary, + assigned_partitions :: [ + {topic_name :: binary, partition_id :: non_neg_integer} + ], + KafkaEx.GenConsumer.options() + ) :: Elixir.Supervisor.on_start() + def start_link( + {gen_consumer_module, consumer_module}, + group_name, + assignments, + opts \\ [] + ) do + start_link(%{ + gen_consumer_module: gen_consumer_module, + consumer_module: consumer_module, + group_name: group_name, + assignments: assignments, + opts: opts + }) + end + @doc """ Returns a list of child pids