From 8e3e8e659096b2cb67047bca30fccc2faa28f7b0 Mon Sep 17 00:00:00 2001 From: Henry Popp Date: Mon, 4 Dec 2017 16:34:03 -0600 Subject: [PATCH] fix: stream_id exhaustion on connections (#97) * fix: stream_id exhaustion on connections * fix: removed now-useless reconnect? functionality APNS/FCM config reconnect key still remains to not break API, though now it doesn't do anything. Also: cleaned up some compiler warnings. * fix: connection asks for more demand on stream completion Previous behavior was to ask for more demand as soon as events arrived, though this leads to unreliable timeouts and such when Kadabra can't process fast enough. Potential issue: The requested streams don't timeout, so theoretically a worker could hang if there was no response. Will also need to add an error response for any :closed messages while there are outstanding streams (though this should never happen for FCM session_timeouts) Also fixed: FCM Poison parse failure hard crashes due to bad pattern match. Now logs an error instead. * fix: connections supervised again Crashed connections will cause a memory leak with hpack/stream workers in Kadabra. Will need to be addressed. * chore: bump version number * chore: update CHANGELOG --- CHANGELOG.md | 5 ++ README.md | 4 +- docs/Getting Started.md | 6 +- lib/pigeon.ex | 7 ++ lib/pigeon/apns.ex | 4 +- lib/pigeon/apns/config.ex | 6 +- lib/pigeon/configurable.ex | 17 +--- lib/pigeon/connection.ex | 174 +++++++++++++++++++++++++++++++++++++ lib/pigeon/fcm.ex | 2 +- lib/pigeon/fcm/config.ex | 16 ++-- lib/pigeon/worker.ex | 136 +++++++---------------------- mix.exs | 5 +- mix.lock | 15 ++-- test/apns_test.exs | 22 ++--- test/fcm/worker_test.exs | 44 ++++++---- test/fcm_test.exs | 6 +- 16 files changed, 297 insertions(+), 172 deletions(-) create mode 100644 lib/pigeon/connection.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index d5bfb5b9..9bb60cd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## v1.1.2 +* Auto-restart connections if max stream ID is reached +* FCM/APNS Workers now use GenStage to queue pending pushes +* Bumped minimum Kadabra version to `v0.3.5` + ## v1.1.1 * Bumped minimum Kadabra version to `v0.3.4` diff --git a/README.md b/README.md index 1b1f9fd1..af886126 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,8 @@ Add pigeon and kadabra as `mix.exs` dependencies: ```elixir def deps do [ - {:pigeon, "~> 1.1.1"}, - {:kadabra, "~> 0.3.4"} + {:pigeon, "~> 1.1.2"}, + {:kadabra, "~> 0.3.5"} ] end ``` diff --git a/docs/Getting Started.md b/docs/Getting Started.md index b08fb53c..62b07e0d 100644 --- a/docs/Getting Started.md +++ b/docs/Getting Started.md @@ -2,7 +2,7 @@ > HTTP2-compliant wrapper for sending iOS and Android push notifications. [![Build Status](https://travis-ci.org/codedge-llc/pigeon.svg?branch=master)](https://travis-ci.org/codedge-llc/pigeon) -[![Coverage Status](https://coveralls.io/repos/github/codedge-llc/pigeon/badge.svg?branch=v1.1.0)](https://coveralls.io/github/codedge-llc/pigeon?branch=v1.1.0) +[![Coverage Status](https://coveralls.io/repos/github/codedge-llc/pigeon/badge.svg?branch=v1.1.0)](https://coveralls.io/github/codedge-llc/pigeon) [![Hex.pm](http://img.shields.io/hexpm/v/pigeon.svg)](https://hex.pm/packages/pigeon) [![Hex.pm](http://img.shields.io/hexpm/dt/pigeon.svg)](https://hex.pm/packages/pigeon) [![Deps Status](https://beta.hexfaktor.org/badge/all/github/codedge-llc/pigeon.svg)](https://beta.hexfaktor.org/github/codedge-llc/pigeon) @@ -12,8 +12,8 @@ Add pigeon and kadabra as `mix.exs` dependencies: ```elixir def deps do [ - {:pigeon, "~> 1.1.1"}, - {:kadabra, "~> 0.3.4"} + {:pigeon, "~> 1.1.2"}, + {:kadabra, "~> 0.3.5"} ] end ``` diff --git a/lib/pigeon.ex b/lib/pigeon.ex index 440962e2..cb663f20 100644 --- a/lib/pigeon.ex +++ b/lib/pigeon.ex @@ -71,5 +71,12 @@ defmodule Pigeon do end end + @doc false + def start_connection(state) do + opts = [restart: :temporary, id: :erlang.make_ref] + spec = worker(Pigeon.Connection, [state], opts) + Supervisor.start_child(:pigeon, spec) + end + def debug_log?, do: Application.get_env(:pigeon, :debug_log, false) end diff --git a/lib/pigeon/apns.ex b/lib/pigeon/apns.ex index 1bef85fb..fbd4b9e9 100644 --- a/lib/pigeon/apns.ex +++ b/lib/pigeon/apns.ex @@ -106,7 +106,7 @@ defmodule Pigeon.APNS do end defp push(notification, on_response, opts) do worker_name = opts[:to] || Config.default_name - Worker.cast_push(worker_name, notification, on_response: on_response) + Worker.send_push(worker_name, notification, on_response: on_response) end @doc ~S""" @@ -155,7 +155,7 @@ defmodule Pigeon.APNS do on_response = fn(x) -> send pid, {ref, x} end worker_name = opts[:to] || Config.default_name - Worker.cast_push(worker_name, notification, on_response: on_response) + Worker.send_push(worker_name, notification, on_response: on_response) receive do {^ref, x} -> x diff --git a/lib/pigeon/apns/config.ex b/lib/pigeon/apns/config.ex index b8e53034..65e2dc10 100644 --- a/lib/pigeon/apns/config.ex +++ b/lib/pigeon/apns/config.ex @@ -170,6 +170,9 @@ defimpl Pigeon.Configurable, for: Pigeon.APNS.Config do @spec worker_name(any) :: atom | nil def worker_name(%Config{name: name}), do: name + @spec max_demand(any) :: non_neg_integer + def max_demand(_config), do: 1_000 + @spec connect(any) :: {:ok, sock} | {:error, String.t} def connect(%Config{uri: uri} = config) do uri = to_charlist(uri) @@ -240,9 +243,6 @@ defimpl Pigeon.Configurable, for: Pigeon.APNS.Config do Process.send_after(self(), :ping, ping) end - @spec reconnect?(any) :: boolean - def reconnect?(%Config{reconnect: reconnect}), do: reconnect - def close(_config) do end diff --git a/lib/pigeon/configurable.ex b/lib/pigeon/configurable.ex index 6fb69aa6..b7c04303 100644 --- a/lib/pigeon/configurable.ex +++ b/lib/pigeon/configurable.ex @@ -24,6 +24,9 @@ defprotocol Pigeon.Configurable do def handle_end_stream(config, stream, notification, on_response) + @spec max_demand(any) :: non_neg_integer + def max_demand(config) + @doc ~S""" Schedules connection ping if necessary. @@ -48,19 +51,5 @@ defprotocol Pigeon.Configurable do @spec schedule_ping(any) :: no_return def schedule_ping(config) - @doc ~S""" - Returns whether connection should reconnect if dropped. - - ## Examples - - iex> reconnect?(%Pigeon.APNS.Config{reconnect: true}) - true - - iex> reconnect?(%Pigeon.FCM.Config{}) # always false - false - """ - @spec reconnect?(any) :: boolean - def reconnect?(config) - def close(config) end diff --git a/lib/pigeon/connection.ex b/lib/pigeon/connection.ex new file mode 100644 index 00000000..2a72961f --- /dev/null +++ b/lib/pigeon/connection.ex @@ -0,0 +1,174 @@ +defmodule Pigeon.Connection do + @moduledoc false + + defstruct config: nil, + from: nil, + socket: nil, + queue: %{}, + stream_id: 1, + requested: 0, + completed: 0 + + use GenStage + require Logger + + alias Pigeon.{Configurable, Connection} + alias Pigeon.Http2.{Client, Stream} + alias Pigeon.Worker.NotificationQueue + + @limit 1_000_000_000 + + def handle_subscribe(:producer, _opts, from, state) do + demand = Configurable.max_demand(state.config) + GenStage.ask(from, demand) + state = + state + |> inc_requested(demand) + |> Map.put(:from, from) + {:manual, state} + end + + def start_link({config, from}) do + GenStage.start_link(__MODULE__, {config, from}) + end + + def init(config), do: initialize_worker(config) + + def initialize_worker({config, from}) do + state = %Connection{config: config, from: from} + case connect_socket(config, 0) do + {:ok, socket} -> + Configurable.schedule_ping(config) + {:consumer, %{state | socket: socket}, subscribe_to: [from]} + {:error, reason} -> {:stop, reason} + end + end + + def connect_socket(_config, 3), do: {:error, :timeout} + def connect_socket(config, tries) do + case Configurable.connect(config) do + {:ok, socket} -> {:ok, socket} + {:error, _reason} -> connect_socket(config, tries + 1) + end + end + + # Handle Cancels + + def handle_cancel({:down, :normal}, _from, state) do + {:stop, :normal, state} + end + + def handle_cancel({:down, :shutdown}, _from, state) do + {:stop, :normal, state} + end + + def handle_cancel({:cancel, :closed}, _from, state) do + {:stop, :normal, state} + end + + def handle_cancel({:cancel, :stream_id_exhausted}, _from, state) do + {:stop, :normal, state} + end + + # Info + + def handle_info(:ping, state) do + Client.default().send_ping(state.socket) + Configurable.schedule_ping(state.config) + + {:noreply, [], state} + end + + def handle_info({:closed, _}, %{from: from} = state) do + GenStage.cancel(from, :closed) + {:noreply, [], %{state | socket: nil}} + end + + def handle_info(msg, state) do + case Client.default().handle_end_stream(msg, state) do + {:ok, %Stream{} = stream} -> process_end_stream(stream, state) + _else -> {:noreply, [], state} + end + end + + def handle_events(events, _from, state) do + state = + Enum.reduce(events, state, fn({:push, notif, opts}, state) -> + send_push(state, notif, opts) + end) + + {:noreply, [], state} + end + + def process_end_stream(%Stream{id: stream_id} = stream, + %{queue: queue, config: config} = state) do + case NotificationQueue.pop(queue, stream_id) do + {nil, new_queue} -> + # Do nothing if no queued item for stream + {:noreply, [], %{state | queue: new_queue}} + {{notif, on_response}, new_queue} -> + Configurable.handle_end_stream(config, stream, notif, on_response) + state = + state + |> inc_completed(1) + |> dec_requested(1) + |> Map.put(:queue, new_queue) + + total_requests = state.completed + state.requested + max_demand = Configurable.max_demand(state.config) + state = + if total_requests < @limit and state.requested < max_demand do + to_ask = min(@limit - total_requests, max_demand - state.requested) + GenStage.ask(state.from, to_ask) + inc_requested(state, to_ask) + else + state + end + + if state.completed >= @limit do + GenStage.cancel(state.from, :stream_id_exhausted) + end + {:noreply, [], state} + end + end + + def send_push(%{config: config, queue: queue} = state, notification, opts) do + headers = Configurable.push_headers(config, notification, opts) + payload = Configurable.push_payload(config, notification, opts) + + Client.default().send_request(state.socket, headers, payload) + + new_q = NotificationQueue.add(queue, + state.stream_id, + notification, + opts[:on_response]) + + state + |> inc_stream_id() + |> Map.put(:queue, new_q) + end + + # Cast + + def handle_cast(_msg, state) do + {:noreply, [], state} + end + + # Helpers + + def inc_requested(state, count) do + %{state | requested: state.requested + count} + end + + def dec_requested(state, count) do + %{state | requested: state.requested - count} + end + + def inc_completed(state, count) do + %{state | completed: state.completed + count} + end + + def inc_stream_id(%{stream_id: stream_id} = state) do + %{state | stream_id: stream_id + 2} + end +end diff --git a/lib/pigeon/fcm.ex b/lib/pigeon/fcm.ex index 14eb260f..77add119 100644 --- a/lib/pigeon/fcm.ex +++ b/lib/pigeon/fcm.ex @@ -124,7 +124,7 @@ defmodule Pigeon.FCM do defp cast_request(worker_name, request, on_response, opts) do opts = Keyword.put(opts, :on_response, on_response) - GenServer.cast(worker_name, {:push, request, opts}) + Worker.send_push(worker_name, request, opts) end defp sync_push(notification, opts) do diff --git a/lib/pigeon/fcm/config.ex b/lib/pigeon/fcm/config.ex index cd37344a..6db6dbf5 100644 --- a/lib/pigeon/fcm/config.ex +++ b/lib/pigeon/fcm/config.ex @@ -58,6 +58,9 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do @spec worker_name(any) :: atom | nil def worker_name(%Config{name: name}), do: name + @spec max_demand(any) :: non_neg_integer + def max_demand(_config), do: 100 + @spec connect(any) :: {:ok, sock} | {:error, String.t} def connect(%Config{uri: uri} = config) do case connect_socket_options(config) do @@ -138,8 +141,6 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do def schedule_ping(_config), do: :ok - def reconnect?(_config), do: false - def close(_config) do end @@ -150,9 +151,14 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do ResultParser.parse(ids, results, on_response, notification) end - defp parse_error(data) do - {:ok, response} = Poison.decode(data) - response["reason"] |> Macro.underscore |> String.to_existing_atom + def parse_error(data) do + case Poison.decode(data) do + {:ok, response} -> + response["reason"] |> Macro.underscore |> String.to_existing_atom + error -> + "Poison parse failed: #{inspect(error)}, body: #{inspect(data)}" + |> Logger.error + end end defp log_error(code, reason) do diff --git a/lib/pigeon/worker.ex b/lib/pigeon/worker.ex index 03655842..c37beea9 100644 --- a/lib/pigeon/worker.ex +++ b/lib/pigeon/worker.ex @@ -1,133 +1,63 @@ defmodule Pigeon.Worker do @moduledoc false - defstruct [:socket, :config, queue: %{}, stream_id: 1] + defstruct config: nil, connections: 0 - use GenServer - require Logger + use GenStage - alias Pigeon.Configurable - alias Pigeon.Http2.{Client, Stream} - alias Pigeon.Worker.NotificationQueue - - def cast_push(pid, notification, opts) do - GenServer.cast(pid, {:push, notification, opts}) - end + alias Pigeon.{Configurable, Worker} def start_link(config) do case Configurable.worker_name(config) do - nil -> GenServer.start_link(__MODULE__, {:ok, config}) - name -> GenServer.start_link(__MODULE__, {:ok, config}, name: name) + nil -> GenStage.start_link(__MODULE__, {:ok, config}) + name -> GenStage.start_link(__MODULE__, {:ok, config}, name: name) end end def stop_connection(pid) do - GenServer.cast(pid, :stop) - end - - def init({:ok, config}), do: initialize_worker(config) - - def initialize_worker(config) do - case connect_socket(config, 0) do - {:ok, socket} -> - Configurable.schedule_ping(config) - {:ok, %{ - socket: socket, - config: config, - stream_id: 1, - queue: %{} - }} - {:error, reason} -> {:stop, reason} - end + GenStage.cast(pid, :stop) end - def connect_socket(_config, 3), do: {:error, :timeout} - def connect_socket(config, tries) do - case Configurable.connect(config) do - {:ok, socket} -> {:ok, socket} - {:error, _reason} -> connect_socket(config, tries + 1) - end + def send_push(name, notification, opts) do + GenStage.call(name, {:push, notification, opts}, 5000) end - # Info - - def handle_info(:ping, state) do - Client.default().send_ping(state.socket) - Configurable.schedule_ping(state.config) - - {:noreply, state} - end - - def handle_info({:closed, _}, %{config: config} = state) do - if Configurable.reconnect?(config) do - {:noreply, reconnect(state)} - else - {:noreply, %{state | socket: nil}} - end - end - - def handle_info(msg, state) do - case Client.default().handle_end_stream(msg, state) do - {:ok, %Stream{} = stream} -> process_end_stream(stream, state) - _else -> {:noreply, state} - end + def init({:ok, config}) do + #Pigeon.start_connection({config, self()}) + {:producer, %Worker{config: config, connections: 0}} end - def process_end_stream(%Stream{id: stream_id} = stream, - %{queue: queue, config: config} = state) do - case NotificationQueue.pop(queue, stream_id) do - {nil, new_queue} -> - # Do nothing if no queued item for stream - {:noreply, %{state | queue: new_queue}} - {{notif, on_response}, new_queue} -> - Configurable.handle_end_stream(config, stream, notif, on_response) - {:noreply, %{state | queue: new_queue}} - end + def handle_call({:push, _notification, _opts} = msg, _from, state) do + state = + if state.connections <= 0 do + Pigeon.start_connection({state.config, self()}) + %{state | connections: state.connections + 1} + else + state + end + {:reply, :ok, [msg], state} # Dispatch immediately end - def send_push(%{config: config, queue: queue} = state, notification, opts) do - state = reconnect_if_needed(state) - - headers = Configurable.push_headers(config, notification, opts) - payload = Configurable.push_payload(config, notification, opts) - - Client.default().send_request(state.socket, headers, payload) - - new_q = NotificationQueue.add(queue, - state.stream_id, - notification, - opts[:on_response]) - - new_stream_id = state.stream_id + 2 - - {:noreply, %{state | stream_id: new_stream_id, queue: new_q}} + def handle_cast(:stop, state) do + {:stop, :normal, state} end - defp reconnect_if_needed(%{socket: nil} = state), do: reconnect(state) - defp reconnect_if_needed(state), do: state - - def reconnect(%{config: config} = state) do - case connect_socket(config, 0) do - {:ok, new_socket} -> - Configurable.schedule_ping(state.config) - %{state | socket: new_socket, queue: %{}, stream_id: 1} - error -> - error |> inspect() |> Logger.error - state - end + def handle_demand(_demand, state) do + {:noreply, [], state} end - # Cast - - def handle_cast(:stop, state) do - {:stop, :normal, state} + def handle_cancel({:cancel, :stream_id_exhausted}, _from, state) do + Pigeon.start_connection({state.config, self()}) + {:noreply, [], state} end - def handle_cast({:push, notification, opts}, state) do - send_push(state, notification, opts) + def handle_cancel({:cancel, :closed}, _from, state) do + state = %{state | connections: state.connections - 1} + {:noreply, [], state} end - def handle_cast(_msg, state) do - {:noreply, state} + def handle_cancel({:down, _error}, _from, state) do + Pigeon.start_connection({state.config, self()}) + {:noreply, [], state} end end diff --git a/mix.exs b/mix.exs index 1cd296c9..d17057e4 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Pigeon.Mixfile do use Mix.Project - @version "1.1.1" + @version "1.1.2" def project do [ @@ -53,7 +53,8 @@ defmodule Pigeon.Mixfile do [ {:poison, "~> 2.0 or ~> 3.0"}, {:httpoison, "~> 0.7"}, - {:kadabra, "~> 0.3.4", optional: true}, + {:gen_stage, "~> 0.12.0"}, + {:kadabra, "~> 0.3.5", optional: true}, {:earmark, "~> 1.0", only: :dev}, {:ex_doc, "~> 0.2", only: :dev}, {:excoveralls, "~> 0.5", only: :test}, diff --git a/mix.lock b/mix.lock index 47de4e67..4f0da362 100644 --- a/mix.lock +++ b/mix.lock @@ -1,18 +1,19 @@ %{"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [], [], "hexpm"}, "certifi": {:hex, :certifi, "2.0.0", "a0c0e475107135f76b8c1d5bc7efb33cd3815cb3cf3dea7aefdd174dabead064", [:rebar3], []}, - "credo": {:hex, :credo, "0.8.6", "335f723772d35da499b5ebfdaf6b426bfb73590b6fcbc8908d476b75f8cbca3f", [], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm"}, + "credo": {:hex, :credo, "0.8.8", "990e7844a8d06ebacd88744a55853a83b74270b8a8461c55a4d0334b8e1736c9", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], []}, "dogma": {:hex, :dogma, "0.1.15", "5bceba9054b2b97a4adcb2ab4948ca9245e5258b883946e82d32f785340fd411", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, optional: false]}]}, - "earmark": {:hex, :earmark, "1.2.3", "206eb2e2ac1a794aa5256f3982de7a76bf4579ff91cb28d0e17ea2c9491e46a4", [:mix], []}, - "ex_doc": {:hex, :ex_doc, "0.16.2", "3b3e210ebcd85a7c76b4e73f85c5640c011d2a0b2f06dcdf5acdb2ae904e5084", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, - "excoveralls": {:hex, :excoveralls, "0.7.2", "f69ede8c122ccd3b60afc775348a53fc8c39fe4278aee2f538f0d81cc5e7ff3a", [:mix], [{:exjsx, ">= 3.0.0", [hex: :exjsx, optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, optional: false]}]}, + "earmark": {:hex, :earmark, "1.2.3", "206eb2e2ac1a794aa5256f3982de7a76bf4579ff91cb28d0e17ea2c9491e46a4", [:mix], [], "hexpm"}, + "ex_doc": {:hex, :ex_doc, "0.16.2", "3b3e210ebcd85a7c76b4e73f85c5640c011d2a0b2f06dcdf5acdb2ae904e5084", [:mix], [{:earmark, "~> 1.1", [repo: "hexpm", hex: :earmark, optional: false]}], "hexpm"}, + "excoveralls": {:hex, :excoveralls, "0.7.2", "f69ede8c122ccd3b60afc775348a53fc8c39fe4278aee2f538f0d81cc5e7ff3a", [:mix], [{:exjsx, ">= 3.0.0", [repo: "hexpm", hex: :exjsx, optional: false]}, {:hackney, ">= 0.12.0", [repo: "hexpm", hex: :hackney, optional: false]}], "hexpm"}, "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, optional: false]}]}, - "hackney": {:hex, :hackney, "1.9.0", "51c506afc0a365868469dcfc79a9d0b94d896ec741cfd5bd338f49a5ec515bfe", [:rebar3], [{:certifi, "2.0.0", [hex: :certifi, optional: false]}, {:idna, "5.1.0", [hex: :idna, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, optional: false]}]}, + "gen_stage": {:hex, :gen_stage, "0.12.2", "e0e347cbb1ceb5f4e68a526aec4d64b54ad721f0a8b30aa9d28e0ad749419cbb", [], [], "hexpm"}, + "hackney": {:hex, :hackney, "1.9.0", "51c506afc0a365868469dcfc79a9d0b94d896ec741cfd5bd338f49a5ec515bfe", [:rebar3], [{:certifi, "2.0.0", [repo: "hexpm", hex: :certifi, optional: false]}, {:idna, "5.1.0", [repo: "hexpm", hex: :idna, optional: false]}, {:metrics, "1.0.1", [repo: "hexpm", hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [repo: "hexpm", hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [repo: "hexpm", hex: :ssl_verify_fun, optional: false]}], "hexpm"}, "hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], []}, "httpoison": {:hex, :httpoison, "0.13.0", "bfaf44d9f133a6599886720f3937a7699466d23bb0cd7a88b6ba011f53c6f562", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, optional: false]}]}, "idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, optional: false]}]}, - "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], []}, - "kadabra": {:hex, :kadabra, "0.3.4", "677d0c5d28a016c7dec167b3f969fe72b4bf18e9568a399cc262efa1ef124925", [:mix], [{:hpack, "~> 0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}, {:scribe, "~> 0.4", [hex: :scribe, repo: "hexpm", optional: true]}], "hexpm"}, + "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"}, + "kadabra": {:hex, :kadabra, "0.3.5", "9b4965acf45d9c7906c15f435ce7647348e271c98393f3d861c5409aa1a2693c", [:mix], [{:hpack, "~> 0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}, {:scribe, "~> 0.4", [hex: :scribe, repo: "hexpm", optional: true]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []}, "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []}, "pane": {:hex, :pane, "0.1.1", "4a9b46957a02991acbce012169ab7e8ecff74ad24886f94b142680062b10f167", [], [], "hexpm"}, diff --git a/test/apns_test.exs b/test/apns_test.exs index f158aac5..ca110533 100644 --- a/test/apns_test.exs +++ b/test/apns_test.exs @@ -20,9 +20,9 @@ defmodule Pigeon.APNSTest do {:ok, pid} = Pigeon.APNS.start_connection(opts) assert is_pid(pid) - state = :sys.get_state(pid) - assert state.config.uri == "api.development.push.apple.com" - assert state.config.ping_period == 600_000 + worker = :sys.get_state(pid) + assert worker.state.config.uri == "api.development.push.apple.com" + assert worker.state.config.ping_period == 600_000 {:ok, pid} = Pigeon.APNS.start_connection(opts) assert is_pid(pid) @@ -37,8 +37,8 @@ defmodule Pigeon.APNSTest do ] {:ok, pid} = Pigeon.APNS.start_connection(opts) - state = :sys.get_state(pid) - assert state.config.ping_period == 30_000 + worker = :sys.get_state(pid) + assert worker.state.config.ping_period == 30_000 end end @@ -69,7 +69,7 @@ defmodule Pigeon.APNSTest do test_token(), test_topic() ) - Pigeon.APNS.stop_connection(:default) + #Pigeon.APNS.stop_connection(:apns_default) opts = [ cert: Application.get_env(:pigeon, :test)[:apns_cert], key: Application.get_env(:pigeon, :test)[:apns_key], @@ -79,7 +79,7 @@ defmodule Pigeon.APNSTest do assert Pigeon.APNS.push(n, to: worker_pid).response == :success - Pigeon.APNS.start_connection(:apns_default) + #Pigeon.APNS.start_connection(:apns_default) end test "pushes to worker's atom name" do @@ -89,7 +89,7 @@ defmodule Pigeon.APNSTest do test_token(), test_topic() ) - Pigeon.APNS.stop_connection(:default) + #Pigeon.APNS.stop_connection(:default) opts = [ cert: Application.get_env(:pigeon, :test)[:apns_cert], key: Application.get_env(:pigeon, :test)[:apns_key], @@ -100,7 +100,7 @@ defmodule Pigeon.APNSTest do assert Pigeon.APNS.push(n, to: :custom).response == :success - Pigeon.APNS.start_connection(:apns_default) + #Pigeon.APNS.start_connection(:apns_default) end end @@ -193,7 +193,7 @@ defmodule Pigeon.APNSTest do |> test_message() |> Pigeon.APNS.Notification.new(test_token(), test_topic()) - Pigeon.APNS.stop_connection(:default) + #Pigeon.APNS.stop_connection(:default) opts = [ cert: Application.get_env(:pigeon, :test)[:apns_cert], key: Application.get_env(:pigeon, :test)[:apns_key], @@ -206,7 +206,7 @@ defmodule Pigeon.APNSTest do assert_receive(%Pigeon.APNS.Notification{response: :success}, 5_000) - Pigeon.APNS.start_connection(:apns_default) + #Pigeon.APNS.start_connection(:apns_default) end end end diff --git a/test/fcm/worker_test.exs b/test/fcm/worker_test.exs index dceeb48d..4ce587aa 100644 --- a/test/fcm/worker_test.exs +++ b/test/fcm/worker_test.exs @@ -7,23 +7,23 @@ defmodule Pigeon.FCM.WorkerTest do Application.get_env(:pigeon, :test)[:valid_fcm_reg_id] end - test "reconnects on push send after disconnect" do - opts = [ - key: Application.get_env(:pigeon, :test)[:fcm_key] - ] - {:ok, pid} = FCM.start_connection(opts) - send(pid, {:closed, self()}) + # test "starts new connection on push send if none available" do + # opts = [ + # key: Application.get_env(:pigeon, :test)[:fcm_key] + # ] + # {:ok, pid} = FCM.start_connection(opts) + # send(pid, {:closed, self()}) - refute :sys.get_state(pid).socket + # refute :sys.get_state(pid).socket - n = FCM.Notification.new(valid_fcm_reg_id(), %{}, %{"message" => "Test"}) - expected = [success: valid_fcm_reg_id()] - assert Pigeon.FCM.push(n, to: pid).response == expected + # n = FCM.Notification.new(valid_fcm_reg_id(), %{}, %{"message" => "Test"}) + # expected = [success: valid_fcm_reg_id()] + # assert Pigeon.FCM.push(n, to: pid).response == expected - assert :sys.get_state(pid).socket - end + # assert :sys.get_state(pid).state.socket + # end - test "resets stream id after disconnect" do + test "decrements connection count after disconnect" do opts = [ key: Application.get_env(:pigeon, :test)[:fcm_key] ] @@ -34,13 +34,25 @@ defmodule Pigeon.FCM.WorkerTest do assert _notif = Pigeon.FCM.push(n, to: pid) assert _notif = Pigeon.FCM.push(n, to: pid) - send(pid, {:closed, self()}) - assert :sys.get_state(pid).stream_id == 7 + {conn_pid, _ref} = + pid + |> :sys.get_state() + |> Map.get(:consumers) + |> Map.values + |> List.first + + assert :sys.get_state(pid).state.connections == 1 + + send(conn_pid, {:closed, self()}) + + Process.sleep(500) + assert :sys.get_state(pid).state.connections == 0 n = FCM.Notification.new(valid_fcm_reg_id(), %{}, %{"message" => "Test"}) assert _notif = Pigeon.FCM.push(n, to: pid) assert _notif = Pigeon.FCM.push(n, to: pid) - assert :sys.get_state(pid).stream_id == 5 + Process.sleep(500) + assert :sys.get_state(pid).state.connections == 1 end end diff --git a/test/fcm_test.exs b/test/fcm_test.exs index b46a9f7b..a16022bc 100644 --- a/test/fcm_test.exs +++ b/test/fcm_test.exs @@ -21,9 +21,9 @@ defmodule Pigeon.FCMTest do {:ok, pid} = Pigeon.FCM.start_connection(opts) assert is_pid(pid) - state = :sys.get_state(pid) - assert state.config.key == fcm_key - assert is_pid(state.socket) + worker = :sys.get_state(pid) + assert worker.state.config.key == fcm_key + assert worker.state.connections == 0 end end