Skip to content

Commit

Permalink
[client] move request timeout handling to socket process
Browse files Browse the repository at this point in the history
There was no logic in place to remove entries from the pending
registry once the timeout hits. The client side timeout would only
handle re-transmission, but not the cleanup.

Move the timeout logic to the sender and let it also cleanup the
pending registry.
  • Loading branch information
RoadRunnr committed Apr 16, 2024
1 parent 345f11b commit d25b13a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/eradius_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ send_request_loop(Socket, Peer = {_ServerName, {IP, Port}}, ReqId, Authenticator
end,

case Result of
{response, ReqId, Response} ->
{ok, Response} ->
{ok, Response, Secret, Authenticator};
{error, close} ->
{error, socket_down};
Expand Down
94 changes: 61 additions & 33 deletions src/eradius_client_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-record(state, {socket, active_n, pending, mode, counter}).
-record(state, {family, socket, active_n, pending, mode, counter}).

%%%=========================================================================
%%% API
Expand All @@ -27,10 +27,8 @@ start_link(Config) ->

send_request(Socket, Peer, ReqId, Request, Timeout) ->
try
gen_server:call(Socket, {send_request, Peer, ReqId, Request}, Timeout)
gen_server:call(Socket, {send_request, Peer, ReqId, Request, Timeout}, infinity)
catch
exit:{timeout, _} ->
{error, timeout};
exit:{noproc, _} ->
{error, closed};
{nodedown, _} ->
Expand Down Expand Up @@ -58,22 +56,27 @@ init([#{family := Family, ip := IP, active_n := ActiveN,
{ok, Socket} = gen_udp:open(0, Opts),

State = #state{
family = Family,
socket = Socket,
active_n = ActiveN,
pending = #{},
mode = active
},
{ok, State}.

handle_call({send_request, {IP, Port}, ReqId, Request}, From,
#state{socket = Socket, pending = Pending} = State) ->
case gen_udp:send(Socket, IP, Port, Request) of
ok ->
ReqKey = {IP, Port, ReqId},
NPending = Pending#{ReqKey => From},
{noreply, State#state{pending = NPending}};
{error, Reason} ->
{reply, {error, Reason}, State}
handle_call({send_request, {IP, Port}, ReqId, Request, Timeout}, From,
#state{family = Family, socket = Socket} = State) ->
case send_ip(Family, IP) of
{ok, SendIP} ->
case gen_udp:send(Socket, SendIP, Port, Request) of
ok ->
ReqKey = {SendIP, Port, ReqId},
{noreply, pending_request(ReqKey, From, Timeout, State)};
{error, _} = Error ->
{reply, Error, State}
end;
{error, _} = Error ->
{reply, Error, State}
end;

handle_call(_Request, _From, State) ->
Expand All @@ -92,34 +95,35 @@ handle_info({udp_passive, _Socket}, #state{socket = Socket, active_n = ActiveN}
inet:setopts(Socket, [{active, ActiveN}]),
{noreply, State};

handle_info({udp, Socket, FromIP, FromPort, Request},
State = #state{socket = Socket, pending = Pending, mode = Mode}) ->
case eradius_lib:decode_request_id(Request) of
{ReqId, Request} ->
case Pending of
#{{FromIP, FromPort, ReqId} := From} ->
gen_server:reply(From, {response, ReqId, Request}),

flow_control(State),
NPending = maps:remove({FromIP, FromPort, ReqId}, Pending),
NState = State#state{pending = NPending},
case Mode of
inactive when map_size(NPending) =:= 0 ->
{stop, normal, NState};
_ ->
{noreply, NState}
end;
handle_info({udp, Socket, FromIP, FromPort, Response},
State = #state{socket = Socket, mode = Mode}) ->
case eradius_lib:decode_request_id(Response) of
{ReqId, Response} ->
NState = request_done({FromIP, FromPort, ReqId}, {ok, Response}, State),
case Mode of
inactive when map_size(State#state.pending) =:= 0 ->
{stop, normal, NState};
_ ->
%% discard reply because we didn't expect it
flow_control(State),
{noreply, State}
flow_control(NState),
{noreply, NState}
end;
{bad_pdu, _} ->
%% discard reply because it was malformed
flow_control(State),
{noreply, State}
end;

handle_info({timeout, TRef, ReqKey}, #state{pending = Pending} = State) ->
NState =
case Pending of
#{ReqKey := {From, TRef}} ->
gen_server:reply(From, {error, timeout}),
State#state{pending = maps:remove(ReqKey, Pending)};
_ ->
State
end,
{noreply, NState};

handle_info(_Info, State) ->
{noreply, State}.

Expand All @@ -137,3 +141,27 @@ flow_control(#state{socket = Socket, active_n = once}) ->
inet:setopts(Socket, [{active, once}]);
flow_control(_) ->
ok.

pending_request(ReqKey, From, Timeout,
#state{pending = Pending} = State) ->
TRef = erlang:start_timer(Timeout, self(), ReqKey),
State#state{pending = Pending#{ReqKey => {From, TRef}}}.

request_done(ReqKey, Reply, #state{pending = Pending} = State) ->
case Pending of
#{ReqKey := {From, TRef}} ->
gen_server:reply(From, Reply),
erlang:cancel_timer(TRef),
State#state{pending = maps:remove(ReqKey, Pending)};
_ ->
State
end.

send_ip(inet, {_, _, _, _} = IP) ->
{ok, IP};
send_ip(inet6, {_, _, _, _} = IP) ->
{ok, inet:ipv4_mapped_ipv6_address(IP)};
send_ip(inet6, {_, _, _, _,_, _, _, _} = IP) ->
{ok, IP};
send_ip(_, _) ->
{error, eafnosupport}.

0 comments on commit d25b13a

Please sign in to comment.