Skip to content

Commit

Permalink
* BUG Fix: when the connection is busy and multiple watches are start…
Browse files Browse the repository at this point in the history
…ed simultaneously, they get inverted and the process ends up receiving duplicated events from one key and no events from another.

* Bump to 1.1.6.
  • Loading branch information
balena committed Aug 25, 2022
1 parent e738a0b commit aa91bb7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
51 changes: 27 additions & 24 deletions lib/etcd_ex/watch_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,22 @@ defmodule EtcdEx.WatchStream do
events: []
})

first? = :queue.is_empty(pending_reqs)
pending_reqs = :queue.in_r(watch_ref, pending_reqs)
watch_stream = %{watch_stream | pending_reqs: pending_reqs, watches: watches}

if first? do
case EtcdEx.Mint.watch(env, request_ref, key, opts) do
{:ok, env} -> {:ok, env, watch_stream, watch_ref}
error -> error
watch_stream = %{
watch_stream
| pending_reqs: :queue.in(watch_ref, pending_reqs),
watches: watches
}

result =
if :queue.is_empty(pending_reqs) do
EtcdEx.Mint.watch(env, request_ref, key, opts)
else
{:ok, env}
end
else
{:ok, env, watch_stream, watch_ref}

case result do
{:ok, env} -> {:ok, env, watch_stream, watch_ref}
error -> error
end
end

Expand Down Expand Up @@ -88,7 +93,6 @@ defmodule EtcdEx.WatchStream do
end
end

@doc false
defp stream_data(env, request_ref, watch_stream, %{created: true, watch_id: watch_id}) do
%{pending_reqs: pending_reqs, watches: watches, watch_ids: watch_ids} = watch_stream

Expand All @@ -107,22 +111,23 @@ defmodule EtcdEx.WatchStream do
watch_ids: watch_ids
}

case :queue.peek(pending_reqs) do
:empty ->
{:ok, env, watch_stream, {:etcd_watch_created, watch_ref}}
result =
case :queue.peek(pending_reqs) do
:empty ->
{:ok, env}

{:value, next_watch_ref} ->
%{key: key, opts: opts} = Map.fetch!(watches, next_watch_ref)
{:value, next_watch_ref} ->
%{key: key, opts: opts} = Map.fetch!(watches, next_watch_ref)
EtcdEx.Mint.watch(env, request_ref, key, opts)
end

case EtcdEx.Mint.watch(env, request_ref, key, opts) do
{:ok, env} -> {:ok, env, watch_stream, {:etcd_watch_created, watch_ref}}
error -> error
end
case result do
{:ok, env} -> {:ok, env, watch_stream, {:etcd_watch_created, watch_ref}}
error -> error
end
end
end

@doc false
defp stream_data(
env,
request_ref,
Expand Down Expand Up @@ -165,7 +170,6 @@ defmodule EtcdEx.WatchStream do
end
end

@doc false
defp stream_data(env, _request_ref, watch_stream, %{
fragment: true,
events: events,
Expand All @@ -183,7 +187,6 @@ defmodule EtcdEx.WatchStream do
end
end

@doc false
defp stream_data(
env,
request_ref,
Expand Down Expand Up @@ -228,7 +231,7 @@ defmodule EtcdEx.WatchStream do
|> Enum.reduce({:queue.new(), watches}, fn
watch_ref, {pending_reqs, watches} ->
watches = Map.update!(watches, watch_ref, &%{&1 | events: [], watch_id: nil})
pending_reqs = :queue.in_r(watch_ref, pending_reqs)
pending_reqs = :queue.in(watch_ref, pending_reqs)
{pending_reqs, watches}
end)

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule EtcdEx.MixProject do
def project do
[
app: :etcdex,
version: "1.1.5",
version: "1.1.6",
elixir: "~> 1.13",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down

0 comments on commit aa91bb7

Please sign in to comment.