Skip to content

Commit

Permalink
async streams, max streams from settings http/2 frame
Browse files Browse the repository at this point in the history
  • Loading branch information
sedinin committed Apr 17, 2024
1 parent bf2d115 commit 8ca1c1b
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 36 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

{deps, [
{gun, "1.3.3"},
{jsx, "3.0.0"},
{jsx, "3.1.0"},
{base64url, "1.0.1"}
]}.

Expand Down
6 changes: 3 additions & 3 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
[{<<"base64url">>,{pkg,<<"base64url">>,<<"1.0.1">>},0},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.7.3">>},1},
{<<"gun">>,{pkg,<<"gun">>,<<"1.3.3">>},0},
{<<"jsx">>,{pkg,<<"jsx">>,<<"3.0.0">>},0}]}.
{<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},0}]}.
[
{pkg_hash,[
{<<"base64url">>, <<"F8C7F2DA04CA9A5D0F5F50258F055E1D699F0E8BF4CFDB30B750865368403CF6">>},
{<<"cowlib">>, <<"A7FFCD0917E6D50B4D5FB28E9E2085A0CEB3C97DEA310505F7460FF5ED764CE9">>},
{<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>},
{<<"jsx">>, <<"20A170ABD4335FC6DB24D5FAD1E5D677C55DADF83D1B20A8A33B5FE159892A39">>}]},
{<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}]},
{pkg_hash_ext,[
{<<"base64url">>, <<"F9B3ADD4731A02A9B0410398B475B33E7566A695365237A6BDEE1BB447719F5C">>},
{<<"cowlib">>, <<"1E1A3D176D52DAEBBECBBCDFD27C27726076567905C2A9D7398C54DA9D225761">>},
{<<"gun">>, <<"3106CE167F9C9723F849E4FB54EA4A4D814E3996AE243A1C828B256E749041E0">>},
{<<"jsx">>, <<"37BECA0435F5CA8A2F45F76A46211E76418FBEF80C36F0361C249FC75059DC6D">>}]}
{<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}]}
].
243 changes: 211 additions & 32 deletions src/apns_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
, code_change/4
]).

%% for spawn/3
-export([ reply_errors_and_cancel_timers/2 ]).

-export_type([ name/0
, host/0
, port/0
Expand Down Expand Up @@ -89,8 +92,18 @@
, proxy_info => proxy_info()
}.

-type stream_data() :: #{ from := {pid(), term()}
, stream := gun:stream_ref()
, timer := reference()
, status := non_neg_integer()
, headers := gun:headers()
, body := binary()
}.

-type state() :: #{ connection := connection()
, gun_pid => pid()
, gun_streams => #{gun:stream_ref() => stream_data()}
, max_gun_streams := non_neg_integer()
, gun_monitor => reference()
, gun_connect_ref => reference()
, client := pid()
Expand Down Expand Up @@ -207,6 +220,8 @@ callback_mode() -> state_functions.
init({Connection, Client}) ->
StateData = #{ connection => Connection
, client => Client
, gun_streams => #{}
, max_gun_streams => default_max_gun_streams(Connection)
, backoff => 1
, backoff_ceiling => application:get_env(apns, backoff_ceiling, 10)
},
Expand All @@ -231,7 +246,7 @@ open_origin(internal, _, #{connection := Connection} = StateData) ->
{next_event, internal, { Host
, Port
, #{ protocols => [http2]
, transport_opts => TransportOpts
, http2_opts => TransportOpts
, retry => 0
}}}}.

Expand Down Expand Up @@ -304,27 +319,153 @@ await_tunnel_up(EventType, EventContent, StateData) ->
connected(internal, on_connect, #{client := Client}) ->
Client ! {connection_up, self()},
keep_state_and_data;
connected( {call, {Client, _} = From}
connected( {call, From}
, {push_notification, DeviceId, Notification, Headers}
, #{client := Client} = StateData) ->
#{connection := Connection, gun_pid := GunPid} = StateData,
#{timeout := Timeout} = Connection,
Response = push(GunPid, DeviceId, Headers, Notification, Timeout),
{keep_state_and_data, {reply, From, Response}};
connected( {call, {Client, _} = From}
, StateData) ->
#{ connection := Connection
, gun_pid := GunPid
, gun_streams := Streams0
, max_gun_streams := MaxStreams} = StateData,
StreamAllowed = stream_allowed(maps:size(Streams0), MaxStreams),
if
not StreamAllowed ->
{keep_state_and_data, {reply, From, {error, {overload, maps:size(Streams0), MaxStreams}}}};
true ->
#{timeout := Timeout} = Connection,
StreamRef = send_push(GunPid, DeviceId, Headers, Notification),
Tmr = erlang:send_after(Timeout, self(), {timeout, GunPid, StreamRef}),
StreamData = #{ from => From
, stream => StreamRef
, timer => Tmr
, status => 200 %% b4 we know real status
, headers => []
, body => <<>> },
Streams1 = Streams0#{StreamRef => StreamData},
{keep_state, StateData#{gun_streams => Streams1}}
end;
connected( {call, From}
, {push_notification, Token, DeviceId, Notification, Headers0}
, #{client := Client} = StateData) ->
#{connection := Connection, gun_pid := GunConn} = StateData,
#{timeout := Timeout} = Connection,
Headers = add_authorization_header(Headers0, Token),
Response = push(GunConn, DeviceId, Headers, Notification, Timeout),
{keep_state_and_data, {reply, From, Response}};
connected({call, From}, Event, _) when element(1, Event) =:= push_notification ->
{keep_state_and_data, {reply, From, {error, not_connection_owner}}};
, StateData0) ->
#{ connection := Connection
, gun_pid := GunPid
, gun_streams := Streams0
, max_gun_streams := MaxStreams} = StateData0,
StreamAllowed = stream_allowed(maps:size(Streams0), MaxStreams),
if
not StreamAllowed ->
{keep_state_and_data, {reply, From, {error, {overload, maps:size(Streams0), MaxStreams}}}};
true ->
#{timeout := Timeout} = Connection,
Headers = add_authorization_header(Headers0, Token),
StreamRef = send_push(GunPid, DeviceId, Headers, Notification),
Tmr = erlang:send_after(Timeout, self(), {timeout, GunPid, StreamRef}),
StreamData = #{ from => From
, stream => StreamRef
, timer => Tmr
, status => 200 %% b4 we know real status
, headers => []
, body => <<>> },
Streams1 = Streams0#{StreamRef => StreamData},
{keep_state, StateData0#{gun_streams => Streams1}}
end;
connected({call, From}, wait_apns_connection_up, _) ->
{keep_state_and_data, {reply, From, ok}};
connected({call, From}, Event, _) when Event =/= gun_pid ->
{keep_state_and_data, {reply, From, {error, bad_call}}};
connected( info
, {gun_response, GunPid, StreamRef, fin, Status, Headers}
, #{gun_pid := GunPid} = StateData0) ->
%% got response without body
#{gun_streams := Streams0} = StateData0,
#{StreamRef := StreamData} = Streams0,
#{from := From} = StreamData,
Streams1 = maps:remove(StreamRef, Streams0),
gun:cancel(GunPid, StreamRef), %% final response, closing stream
gen_statem:reply(From, {Status, Headers, no_body}),
{keep_state, StateData0#{gun_streams => Streams1}};
connected( info
, {gun_response, GunPid, StreamRef, nofin, Status, Headers}
, #{gun_pid := GunPid} = StateData0) ->
%% update status & headers
#{gun_streams := Streams0} = StateData0,
#{StreamRef := StreamState0} = Streams0,
StreamState1 = StreamState0#{status => Status, headers => Headers},
Streams1 = Streams0#{StreamRef => StreamState1},
{keep_state, StateData0#{gun_streams => Streams1}};
connected( info
, {gun_data, GunPid, StreamRef, fin, Data}
, #{gun_pid := GunPid} = StateData0) ->
%% got data, finally
#{gun_streams := Streams0} = StateData0,
#{StreamRef := StreamData} = Streams0,
#{from := From, status := Status, headers := H, body := B0} = StreamData,
Streams1 = maps:remove(StreamRef, Streams0),
gun:cancel(GunPid, StreamRef), %% final, closing stream
gen_statem:reply(From, {Status, H, <<B0/binary, Data/binary>>}),
{keep_state, StateData0#{gun_streams => Streams1}};
connected( info
, {gun_data, GunPid, StreamRef, nofin, Data}
, #{gun_pid := GunPid} = StateData0) ->
%% add data to buffer, still waiting
#{gun_streams := Streams0} = StateData0,
#{StreamRef := StreamState0} = Streams0,
#{body := B0} = StreamState0,
StreamState1 = StreamState0#{body => <<B0/binary, Data/binary>>},
Streams1 = Streams0#{StreamRef => StreamState1},
{keep_state, StateData0#{gun_streams => Streams1}};
connected( info
, {gun_error, GunPid, StreamRef, Reason}
, #{gun_pid := GunPid} = StateData0) ->
%% answering with error, remove entry
#{gun_streams := Streams0} = StateData0,
case maps:get(StreamRef, Streams0, null) of
null ->
%% nothing todo
{keep_state, StateData0};
StreamData ->
#{from := From} = StreamData,
gen_statem:reply(From, {error, Reason}),
Streams1 = maps:remove(StreamRef, Streams0),
gun:cancel(GunPid, StreamRef),
{keep_state, StateData0#{gun_streams => Streams1}}
end;
connected( info
, {gun_error, GunPid, Reason}
, #{gun_pid := GunPid} = StateData0) ->
%% answer with error for all streams, remove all entries, going to reconnect
#{gun_streams := Streams} = StateData0,
spawn(apns_connection, reply_errors_and_cancel_timers, [Streams, Reason]),
{next_state, down, StateData0#{gun_streams => #{}},
{next_event, internal, {down, ?FUNCTION_NAME, Reason}}};
connected( info
, {timeout, GunPid, StreamRef}
, #{gun_pid := GunPid, gun_streams := Streams0} = StateData0) ->
%% gun pid matches, we have to answer {error, timeout}
case maps:find(StreamRef, Streams0) of
{ok, StreamData} ->
#{from := From} = StreamData,
gen_statem:reply(From, {error, timeout}),
Streams1 = maps:remove(StreamRef, Streams0),
gun:cancel(GunPid, StreamRef),
{keep_state, StateData0#{gun_streams => Streams1}};
error ->
%% cant find stream data by stream ref?
%% may be just answered and removed,
%% ignoring
{keep_state, StateData0}
end;
connected(info,
{timeout, _GunPid, _StreamRef},
StateData0) ->
%% timeout from different connection?
%% ignoring
{keep_state, StateData0};
connected( info
, {gun_notify, GunPid, settings_changed, Settings}
, #{gun_pid := GunPid, max_gun_streams := MaxStreams0} = StateData0) ->
%% settings received, if contains max_concurrent_streams, update it
MaxStreams1 = maps:get(max_concurrent_streams, Settings, MaxStreams0),
{keep_state, StateData0#{max_gun_streams => MaxStreams1}};
connected(EventType, EventContent, StateData) ->
handle_common(EventType, EventContent, ?FUNCTION_NAME, StateData, drop).

Expand Down Expand Up @@ -356,9 +497,12 @@ handle_common(cast, stop, _, _, _) ->
handle_common( info
, {'DOWN', GunMon, process, GunPid, Reason}
, StateName
, #{gun_pid := GunPid, gun_monitor := GunMon} = StateData
, #{gun_pid := GunPid, gun_monitor := GunMon} = StateData0
, _) ->
{next_state, down, StateData,
%% gun died, answering with errors, cleanup entries
#{gun_streams := Streams} = StateData0,
spawn(apns_connection, reply_errors_and_cancel_timers, [Streams, Reason]),
{next_state, down, StateData0#{gun_streams => #{}},
{next_event, internal, {down, StateName, Reason}}};
handle_common( state_timeout
, EventContent
Expand Down Expand Up @@ -423,24 +567,43 @@ proxy(#{proxy_info := Proxy}) ->
proxy(_) ->
undefined.

-spec default_max_gun_streams(connection()) -> non_neg_integer() | infinity.
default_max_gun_streams(Setts) ->
case type(Setts) of
token -> 1; %% at start, for token we should set 1
_ -> 100
end.


transport_opts(Connection) ->
case type(Connection) of
certdata ->
Cert = certdata(Connection),
Key = keydata(Connection),
%% XXX: why is proplist here?
[{cert, Cert}, {key, Key}];
cert ->
Certfile = certfile(Connection),
Keyfile = keyfile(Connection),
%% XXX: why is proplist here?
[{certfile, Certfile}, {keyfile, Keyfile}];
token ->
[]
%% we need to know settings, http2 opt
#{notify_settings_changed => true}
end.

%%%===================================================================
%%% Internal Functions
%%%===================================================================

-spec(stream_allowed(StreamsCount :: non_neg_integer(),
MaxStreams :: non_neg_integer() | infinity) ->
boolean()).
stream_allowed(_StreamsCount, infinity) -> true;
stream_allowed(StreamsCount, MaxStreams) ->
StreamsCount < MaxStreams.


-spec get_headers(apns:headers()) -> list().
get_headers(Headers) ->
List = [ {<<"apns-id">>, apns_id}
Expand All @@ -467,21 +630,22 @@ get_device_path(DeviceId) ->
add_authorization_header(Headers, Token) ->
Headers#{apns_auth_token => <<"bearer ", Token/binary>>}.

-spec push(pid(), apns:device_id(), apns:headers(), notification(), integer()) ->
apns:stream_id().
push(GunConn, DeviceId, HeadersMap, Notification, Timeout) ->
-spec send_push(pid(), apns:device_id(), apns:headers(), notification()) ->
gun:stream_ref().
send_push(GunPid, DeviceId, HeadersMap, Notification) ->
Headers = get_headers(HeadersMap),
Path = get_device_path(DeviceId),
StreamRef = gun:post(GunConn, Path, Headers, Notification),
case gun:await(GunConn, StreamRef, Timeout) of
{response, fin, Status, ResponseHeaders} ->
{Status, ResponseHeaders, no_body};
{response, nofin, Status, ResponseHeaders} ->
{ok, Body} = gun:await_body(GunConn, StreamRef, Timeout),
DecodedBody = jsx:decode(Body, [{return_maps, false}]),
{Status, ResponseHeaders, DecodedBody};
{error, timeout} -> timeout
end.
gun:post(GunPid, Path, Headers, Notification).

%% case gun:await(GunPid, StreamRef, Timeout) of
%% {response, fin, Status, ResponseHeaders} ->
%% {Status, ResponseHeaders, no_body};
%% {response, nofin, Status, ResponseHeaders} ->
%% {ok, Body} = gun:await_body(GunPid, StreamRef, Timeout),
%% DecodedBody = jsx:decode(Body, [{return_maps, false}]),
%% {Status, ResponseHeaders, DecodedBody};
%% {error, timeout} -> timeout
%% end.

-spec backoff(non_neg_integer(), non_neg_integer()) -> non_neg_integer().
backoff(N, Ceiling) ->
Expand All @@ -492,3 +656,18 @@ backoff(N, Ceiling) ->
NString = float_to_list(NextN, [{decimals, 0}]),
list_to_integer(NString)
end.

%%%===================================================================
%%% spawn/3 functions
%%%===================================================================
-spec reply_errors_and_cancel_timers([stream_data()], term()) -> ok.
reply_errors_and_cancel_timers(Streams, Reason) ->
[reply_error_and_cancel_timer(From, Reason, Tmr) ||
#{from := From, timer := Tmr} <- maps:values(Streams)],
ok.

-spec reply_error_and_cancel_timer(From :: {pid(), term()}, Reason :: term(),
Tmr :: reference()) -> ok.
reply_error_and_cancel_timer(From, Reason, Tmr) ->
erlang:cancel_timer(Tmr),
gen_statem:reply(From, {error, Reason}).

0 comments on commit 8ca1c1b

Please sign in to comment.