Skip to content

Commit

Permalink
fix: on_response callbacks now spawned as tasks (#102)
Browse files Browse the repository at this point in the history
* fix: on_response callbacks now spawned as tasks

Expensive callbacks can actually block the worker (as they are processed
sequentially). Also has the benefit of not crashing the worker if the
callback causes trouble. Unsure why this wasn't implemented properly in
the first place.

Also:
- Removed unused ADM nil `on_response` cast
- `process_on_response/2` refactored to `Pigeon.Tasks` module and 
imported by the workers
- Bumped version to v1.1.4
- Minor styling for CHANGELOG
- Updated dev/test dependencies
  • Loading branch information
hpopp authored Jan 3, 2018
1 parent 985621a commit 4a732e4
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 91 deletions.
43 changes: 30 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v1.1.4
* Fix: `:on_response` callbacks spawned as supervised task instead of running
in the `Worker` process

## v1.1.3
* More robust FCM/APNS backpressure
* Bumped minimum Kadabra version to `v0.3.6`
Expand All @@ -13,9 +17,10 @@
* Bumped minimum Kadabra version to `v0.3.4`

## v1.1.0
* Minimum requirements now Elixir v1.4 and OTP 19.2 (Kadabra bumped to `v0.3.0`)
* Minimum requirements now Elixir v1.4 and OTP 19.2 (Kadabra bumped
to `v0.3.0`)
* Runtime worker configs. Create a functions that return config
structs and specify them your `config.exs` with
structs and specify them your `config.exs` with

```elixir
config :pigeon, workers: [
Expand All @@ -36,8 +41,10 @@ config :pigeon, workers: [
* `:reconnect` now false by default

**FCM**
* `NotificationResponse` done away with in favor of a `:response` key on `Notification`
* Override push server endpoint with `:uri` and `:port` options in `FCM.Config.new/1`
* `NotificationResponse` done away with in favor of a `:response` key
on `Notification`
* Override push server endpoint with `:uri` and `:port` options
in `FCM.Config.new/1`
* `:uri` and `:port` config options for overriding push server endpoint

**ADM**
Expand All @@ -47,7 +54,8 @@ config :pigeon, workers: [
* `ADM.start_connection/1` and `ADM.stop_connection/1` added

## v1.0.4
* Fix: removed connection pinging from FCM.Worker (`:ping_period` option left in FCM config to not break API)
* Fix: removed connection pinging from FCM.Worker (`:ping_period` option
left in FCM config to not break API)

## v1.0.3
* Fixed proper handling of large FCM push batches
Expand All @@ -73,7 +81,8 @@ config :pigeon, workers: [
* Various `chatterbox` client adapter fixes

## V0.12.0
* Configurable `Pigeon.Http2.Client`. Currently supports `kadabra` and `chatterbox`
* Configurable `Pigeon.Http2.Client`. Currently supports `kadabra`
and `chatterbox`
* `kadabra` bumped to `v0.2.0`

## v0.11.0
Expand All @@ -93,7 +102,8 @@ config :pigeon, workers: [
## v0.10.0
* Migrated HTTP/2 client from `chatterbox` to `kadabra`
* Support for ADM (Amazon Android) push
* APNS pushes are now synchronous by default. For async pushes use the new `on_response` option. GCM and ADM will have it in the next major release.
* APNS pushes are now synchronous by default. For async pushes use the
new `on_response` option. GCM and ADM will have it in the next major release.
* Bulk APNS pushing
* Handling of multiple APNS worker connections with different configs
* Re-implemented APNS socket pings to keep connections open
Expand All @@ -103,7 +113,8 @@ config :pigeon, workers: [
* Fixed GCM error response atom conversion

## v0.9.1
* Fixed :eaddrinuse error when restarting Pigeon too quickly with :apns_2197 enabled
* Fixed :eaddrinuse error when restarting Pigeon too quickly with
:apns_2197 enabled

## v0.9.0
* APNS topic made optional
Expand All @@ -114,21 +125,27 @@ config :pigeon, workers: [
## v0.8.0
* Implemented Chatterbox as APNS HTTP2 client
* APNS server responses now caught asynchronously
* GCM support for `notification` and `data` payload keys (`Pigeon.GCM.Notification.new` API changes)
* GCM support for `notification` and `data` payload keys
(`Pigeon.GCM.Notification.new` API changes)

## v0.7.0
* APNS cert/key configs can now either be a file path, full-text string, or `{:your_app, "path/to/file.pem"}` (which looks in the `/priv` directory of your app)
* APNS cert/key configs can now either be a file path, full-text string,
or `{:your_app, "path/to/file.pem"}` (which looks in the `/priv` directory
of your app)
* Fixed APNSWorker crash on `:ssl.send/2` timeout
* Better error-handling for invalid APNS configs

## v0.6.0
* `Pigeon.APNS.Notification.new/3` returns `%Pigeon.APNS.Notification{}` struct
* Configure APNS to use SSL port 2197 with `apns_2197: true` in your `config.exs`
* Configure APNS to use SSL port 2197 with `apns_2197: true` in
your `config.exs`
* Error feedback symbols converted from `CamelCase` to `snake_case`
* APNS expiration values supported with `expiration` key in `%Pigeon.APNS.Notification{}`
* APNS expiration values supported with `expiration` key in
`%Pigeon.APNS.Notification{}`

## v0.5.2
* Fixed bug where APNSWorker would hang up if SSL connection failed. Now retries the connection twice more before gracefully shutting down.
* Fixed bug where APNSWorker would hang up if SSL connection failed. Now
retries the connection twice more before gracefully shutting down.

## v0.5.1
* GCM error responses return proper chunk of regstration IDs
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Add pigeon and kadabra as `mix.exs` dependencies:
```elixir
def deps do
[
{:pigeon, "~> 1.1.3"},
{:pigeon, "~> 1.1.4"},
{:kadabra, "~> 0.3.6"}
]
end
Expand Down
4 changes: 2 additions & 2 deletions docs/Getting Started.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Add pigeon and kadabra as `mix.exs` dependencies:
```elixir
def deps do
[
{:pigeon, "~> 1.1.2"},
{:kadabra, "~> 0.3.5"}
{:pigeon, "~> 1.1.4"},
{:kadabra, "~> 0.3.6"}
]
end
```
Expand Down
3 changes: 0 additions & 3 deletions lib/pigeon/adm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ defmodule Pigeon.ADM do
defp cast_push(worker_name, notifications, on_response) when is_list(notifications) do
for n <- notifications, do: cast_push(worker_name, n, on_response)
end
defp cast_push(worker_name, notification, nil) do
GenServer.cast(worker_name, {:push, :adm, notification})
end
defp cast_push(worker_name, notification, on_response) do
GenServer.cast(worker_name, {:push, :adm, notification, on_response})
end
Expand Down
20 changes: 5 additions & 15 deletions lib/pigeon/adm/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Pigeon.ADM.Worker do
use GenServer
require Logger

import Pigeon.Tasks, only: [process_on_response: 2]

alias Pigeon.ADM.ResultParser

@token_refresh_uri "https://api.amazon.com/auth/O2/token"
Expand Down Expand Up @@ -35,24 +37,14 @@ defmodule Pigeon.ADM.Worker do
{:stop, :normal, state}
end

def handle_cast({:push, :adm, notification}, state) do
case refresh_access_token_if_needed(state) do
{:ok, state} ->
:ok = do_push(notification, state, nil)
{:noreply, state}
{:error, _reason} ->
{:noreply, state}
end
end

def handle_cast({:push, :adm, notification, on_response}, state) do
case refresh_access_token_if_needed(state) do
{:ok, state} ->
:ok = do_push(notification, state, on_response)
{:noreply, state}
{:error, reason} ->
notification = %{notification | response: reason}
on_response.(notification)
process_on_response(on_response, notification)
{:noreply, state}
end
end
Expand Down Expand Up @@ -207,10 +199,8 @@ defmodule Pigeon.ADM.Worker do
{:ok, %{"reason" => _reason} = result_json} ->
parse_result(notification, result_json, on_response)
{:error, _} ->
unless on_response == nil do
n = %{notification | response: generic_error_reason(status)}
on_response.(n)
end
n = %{notification | response: generic_error_reason(status)}
process_on_response(on_response, n)
end
end

Expand Down
10 changes: 5 additions & 5 deletions lib/pigeon/apns/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ end
defimpl Pigeon.Configurable, for: Pigeon.APNS.Config do
@moduledoc false

import Pigeon.Tasks, only: [process_on_response: 2]

alias Pigeon.APNS.{Config, Error}

@type sock :: {:sslsocket, any, pid | {any, any}}
Expand Down Expand Up @@ -220,14 +222,12 @@ defimpl Pigeon.Configurable, for: Pigeon.APNS.Config do
case status do
200 ->
n = %{notification | id: get_apns_id(headers), response: :success}
unless on_response == nil, do: on_response.(n)
process_on_response(on_response, n)
_error ->
reason = Error.parse(body)
Error.log(reason, notification)
unless on_response == nil do
notification = %{notification | response: reason}
on_response.(notification)
end
notification = %{notification | response: reason}
process_on_response(on_response, notification)
end
end

Expand Down
12 changes: 7 additions & 5 deletions lib/pigeon/fcm/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do

require Logger

import Pigeon.Tasks, only: [process_on_response: 2]

alias Pigeon.Encodable
alias Pigeon.FCM.{Config, ResultParser}

Expand Down Expand Up @@ -109,7 +111,7 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do
def handle_end_stream(_config, %{error: _error}, _notif, nil), do: :ok
def handle_end_stream(_config, %{error: _error}, {_regids, notif}, on_response) do
notif = %{notif | status: :unavailable}
on_response.(notif)
process_on_response(on_response, notif)
end

defp do_handle_end_stream(200, body, notif, on_response) do
Expand All @@ -120,23 +122,23 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do
defp do_handle_end_stream(400, _body, notif, on_response) do
log_error("400", "Malformed JSON")
notif = %{notif | status: :malformed_json}
unless on_response == nil do on_response.(notif) end
process_on_response(on_response, notif)
end
defp do_handle_end_stream(401, _body, notif, on_response) do
log_error("401", "Unauthorized")
notif = %{notif | status: :unauthorized}
unless on_response == nil do on_response.(notif) end
process_on_response(on_response, notif)
end
defp do_handle_end_stream(500, _body, notif, on_response) do
log_error("500", "Internal server error")
notif = %{notif | status: :internal_server_error}
unless on_response == nil do on_response.(notif) end
process_on_response(on_response, notif)
end
defp do_handle_end_stream(code, body, notif, on_response) do
reason = parse_error(body)
log_error(code, reason)
notif = %{notif | response: reason}
unless on_response == nil do on_response.(notif) end
process_on_response(on_response, notif)
end

def schedule_ping(_config), do: :ok
Expand Down
6 changes: 4 additions & 2 deletions lib/pigeon/fcm/result_parser.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
defmodule Pigeon.FCM.ResultParser do
@moduledoc false

import Pigeon.Tasks, only: [process_on_response: 2]

alias Pigeon.FCM.Notification

def parse([], [], on_response, notification) do
on_response.(notification)
def parse([], [], on_response, notif) do
process_on_response(on_response, notif)
end

def parse(regid, results, on_response, notif) when is_binary(regid) do
Expand Down
8 changes: 8 additions & 0 deletions lib/pigeon/tasks.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule Pigeon.Tasks do
@moduledoc false

def process_on_response(nil, _notif), do: :ok
def process_on_response(on_response, notif) do
Task.Supervisor.start_child(Pigeon.Tasks, fn -> on_response.(notif) end)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Pigeon.Mixfile do
use Mix.Project

@version "1.1.3"
@version "1.1.4"

def project do
[
Expand Down
12 changes: 6 additions & 6 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
%{"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [], [], "hexpm"},
"certifi": {:hex, :certifi, "2.0.0", "a0c0e475107135f76b8c1d5bc7efb33cd3815cb3cf3dea7aefdd174dabead064", [:rebar3], []},
"credo": {:hex, :credo, "0.8.8", "990e7844a8d06ebacd88744a55853a83b74270b8a8461c55a4d0334b8e1736c9", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm"},
"credo": {:hex, :credo, "0.8.10", "261862bb7363247762e1063713bb85df2bbd84af8d8610d1272cd9c1943bba63", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], []},
"dogma": {:hex, :dogma, "0.1.15", "5bceba9054b2b97a4adcb2ab4948ca9245e5258b883946e82d32f785340fd411", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, optional: false]}]},
"earmark": {:hex, :earmark, "1.2.3", "206eb2e2ac1a794aa5256f3982de7a76bf4579ff91cb28d0e17ea2c9491e46a4", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.16.2", "3b3e210ebcd85a7c76b4e73f85c5640c011d2a0b2f06dcdf5acdb2ae904e5084", [:mix], [{:earmark, "~> 1.1", [repo: "hexpm", hex: :earmark, optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.7.2", "f69ede8c122ccd3b60afc775348a53fc8c39fe4278aee2f538f0d81cc5e7ff3a", [:mix], [{:exjsx, ">= 3.0.0", [repo: "hexpm", hex: :exjsx, optional: false]}, {:hackney, ">= 0.12.0", [repo: "hexpm", hex: :hackney, optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.2.4", "99b637c62a4d65a20a9fb674b8cffb8baa771c04605a80c911c4418c69b75439", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.1", "37c69d2ef62f24928c1f4fdc7c724ea04aecfdf500c4329185f8e3649c915baf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.8.0", "99d2691d3edf8612f128be3f9869c4d44b91c67cec92186ce49470ae7a7404cf", [:mix], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, optional: false]}]},
"gen_stage": {:hex, :gen_stage, "0.12.2", "e0e347cbb1ceb5f4e68a526aec4d64b54ad721f0a8b30aa9d28e0ad749419cbb", [], [], "hexpm"},
"hackney": {:hex, :hackney, "1.9.0", "51c506afc0a365868469dcfc79a9d0b94d896ec741cfd5bd338f49a5ec515bfe", [:rebar3], [{:certifi, "2.0.0", [repo: "hexpm", hex: :certifi, optional: false]}, {:idna, "5.1.0", [repo: "hexpm", hex: :idna, optional: false]}, {:metrics, "1.0.1", [repo: "hexpm", hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [repo: "hexpm", hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [repo: "hexpm", hex: :ssl_verify_fun, optional: false]}], "hexpm"},
"hackney": {:hex, :hackney, "1.10.1", "c38d0ca52ea80254936a32c45bb7eb414e7a96a521b4ce76d00a69753b157f21", [:rebar3], [{:certifi, "2.0.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], []},
"httpoison": {:hex, :httpoison, "0.13.0", "bfaf44d9f133a6599886720f3937a7699466d23bb0cd7a88b6ba011f53c6f562", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, optional: false]}]},
"idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, optional: false]}]},
"jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"},
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm"},
"kadabra": {:hex, :kadabra, "0.3.6", "22deab8b2a7b7aa65692a5b3fc566229b32c6036095bf423ebbeb999df987669", [:mix], [{:hpack, "~> 0.2.3", [hex: :hpack_erl, repo: "hexpm", optional: false]}, {:scribe, "~> 0.4", [hex: :scribe, repo: "hexpm", optional: true]}], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []},
Expand Down
Loading

0 comments on commit 4a732e4

Please sign in to comment.