Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix channel close procedure when the peer dies or our handler goes down #9103

Open
wants to merge 2 commits into
base: maint
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions lib/ssh/src/ssh_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -762,17 +762,25 @@ handle_msg(#ssh_msg_channel_open_confirmation{recipient_channel = ChannelId,
maximum_packet_size = PacketSz},
#connection{channel_cache = Cache} = Connection0, _, _SSH) ->

#channel{remote_id = undefined} = Channel =
#channel{remote_id = undefined, user = U} = Channel =
ssh_client_channel:cache_lookup(Cache, ChannelId),

ssh_client_channel:cache_update(Cache, Channel#channel{
remote_id = RemoteId,
recv_packet_size = max(32768, % rfc4254/5.2
min(PacketSz, Channel#channel.recv_packet_size)
),
send_window_size = WindowSz,
send_packet_size = PacketSz}),
reply_msg(Channel, Connection0, {open, ChannelId});
if U /= undefined ->
ssh_client_channel:cache_update(Cache, Channel#channel{
remote_id = RemoteId,
recv_packet_size = max(32768, % rfc4254/5.2
min(PacketSz, Channel#channel.recv_packet_size)
),
send_window_size = WindowSz,
send_packet_size = PacketSz}),
reply_msg(Channel, Connection0, {open, ChannelId});
true ->
%% There is no user process so nobody cares about the channel
%% close it
CloseMsg = channel_close_msg(RemoteId),
ssh_client_channel:cache_update(Cache, Channel#channel{sent_close = true}),
{[{connection_reply, CloseMsg}], Connection0}
end;

handle_msg(#ssh_msg_channel_open_failure{recipient_channel = ChannelId,
reason = Reason,
Expand Down Expand Up @@ -1036,14 +1044,24 @@ handle_msg(#ssh_msg_channel_request{recipient_channel = ChannelId,
?DEC_BIN(Err, _ErrLen),
?DEC_BIN(Lang, _LangLen)>> = Data,
case ssh_client_channel:cache_lookup(Cache, ChannelId) of
#channel{remote_id = RemoteId} = Channel ->
#channel{remote_id = RemoteId, sent_close = SentClose} = Channel ->
{Reply, Connection} = reply_msg(Channel, Connection0,
{exit_signal, ChannelId,
binary_to_list(SigName),
binary_to_list(Err),
binary_to_list(Lang)}),
ChannelCloseMsg = channel_close_msg(RemoteId),
{[{connection_reply, ChannelCloseMsg}|Reply], Connection};
%% Send 'channel-close' only if it has not been sent yet
%% by e.g. our side also closing the channel or going down
%% and(!) update the cache
%% so that the 'channel-close' is not sent twice
if not SentClose ->
CloseMsg = channel_close_msg(RemoteId),
ssh_client_channel:cache_update(Cache,
Channel#channel{sent_close = true}),
{[{connection_reply, CloseMsg}|Reply], Connection};
true ->
{Reply, Connection}
end;
_ ->
%% Channel already closed by peer
{[], Connection0}
Expand Down
42 changes: 33 additions & 9 deletions lib/ssh/src/ssh_connection_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1096,12 +1096,20 @@ handle_event({call,From}, {recv_window, ChannelId}, StateName, D)

handle_event({call,From}, {close, ChannelId}, StateName, D0)
when ?CONNECTED(StateName) ->
%% Send 'channel-close' only if it has not been sent yet
%% e.g. when 'exit-signal' was received from the peer
%% and(!) we update the cache so that we remember what we've done
case ssh_client_channel:cache_lookup(cache(D0), ChannelId) of
#channel{remote_id = Id} = Channel ->
#channel{remote_id = Id, sent_close = false} = Channel ->
D1 = send_msg(ssh_connection:channel_close_msg(Id), D0),
ssh_client_channel:cache_update(cache(D1), Channel#channel{sent_close = true}),
ssh_client_channel:cache_update(cache(D1),
Channel#channel{sent_close = true}),
{keep_state, D1, [cond_set_idle_timer(D1), {reply,From,ok}]};
undefined ->
_ ->
%% Here we match a channel which has already sent 'channel-close'
%% AND possible cases of 'broken cache' i.e. when a channel
%% disappeared from the cache, but has not been properly shut down
%% The latter would be a bug, but hard to chase
{keep_state_and_data, [{reply,From,ok}]}
end;

Expand Down Expand Up @@ -1259,15 +1267,31 @@ handle_event(info, {timeout, {_, From} = Request}, _,
%%% Handle that ssh channels user process goes down
handle_event(info, {'DOWN', _Ref, process, ChannelPid, _Reason}, _, D) ->
Cache = cache(D),
ssh_client_channel:cache_foldl(
fun(#channel{user=U,
local_id=Id}, Acc) when U == ChannelPid ->
ssh_client_channel:cache_delete(Cache, Id),
Acc;
%% Here we first collect the list of channel id's handled by the process
%% Do NOT remove them from the cache - they are not closed yet!
Channels = ssh_client_channel:cache_foldl(
fun(#channel{user=U} = Channel, Acc) when U == ChannelPid ->
[Channel | Acc];
(_,Acc) ->
Acc
end, [], Cache),
{keep_state, D, cond_set_idle_timer(D)};
%% Then for each channel where 'channel-close' has not been sent yet
%% we send 'channel-close' and(!) update the cache so that we remember
%% what we've done.
%% Also set user as 'undefined' as there is no such process anyway
D2 = lists:foldl(
fun(#channel{remote_id = Id, sent_close = false} = Channel, D0) when Id /= undefined ->
D1 = send_msg(ssh_connection:channel_close_msg(Id), D0),
ssh_client_channel:cache_update(cache(D1),
Channel#channel{sent_close = true,
user = undefined}),
D1;
(Channel, D0) ->
ssh_client_channel:cache_update(cache(D0),
Channel#channel{user = undefined}),
D0
end, D, Channels),
{keep_state, D2, cond_set_idle_timer(D2)};

handle_event({timeout,idle_time}, _Data, _StateName, D) ->
case ssh_client_channel:cache_info(num_entries, cache(D)) of
Expand Down
132 changes: 131 additions & 1 deletion lib/ssh/test/ssh_connection_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
stop_listener/1,
trap_exit_connect/1,
trap_exit_daemon/1,
handler_down_before_open/1,
ssh_exec_echo/2 % called as an MFA
]).

Expand Down Expand Up @@ -180,7 +181,8 @@ all() ->
stop_listener,
no_sensitive_leak,
start_subsystem_on_closed_channel,
max_channels_option
max_channels_option,
handler_down_before_open
].
groups() ->
[{openssh, [], payload() ++ ptty() ++ sock()}].
Expand Down Expand Up @@ -1942,6 +1944,134 @@ max_channels_option(Config) when is_list(Config) ->
ssh:close(ConnectionRef),
ssh:stop_daemon(Pid).

handler_down_before_open(Config) ->
%% Start echo subsystem with a delay in init() - until a signal is received
%% One client opens a channel on the connection
%% the other client requests the echo subsystem on the second channel and then immediately goes down
%% the test monitors the client and when receiving 'DOWN' signals 'echo' to proceed
%% a) there should be no crash after 'channel-open-confirmation'
%% b) there should be proper 'channel-close' exchange
%% c) the 'exec' channel should not be affected after the 'echo' channel goes down
PrivDir = proplists:get_value(priv_dir, Config),
UserDir = filename:join(PrivDir, nopubkey), % to make sure we don't use public-key-auth
file:make_dir(UserDir),
SysDir = proplists:get_value(data_dir, Config),
Parent = self(),
EchoSS_spec = {ssh_echo_server, [8, [{dbg, true}, {parent, Parent}]]},
{Pid, Host, Port} = ssh_test_lib:daemon([{system_dir, SysDir},
{user_dir, UserDir},
{password, "morot"},
{exec, fun ssh_exec_echo/1},
{subsystems, [{"echo_n",EchoSS_spec}]}]),
ct:log("~p:~p connect", [?MODULE,?LINE]),
ConnectionRef = ssh_test_lib:connect(Host, Port, [{silently_accept_hosts, true},
{user, "foo"},
{password, "morot"},
{user_interaction, false},
{user_dir, UserDir}]),
ct:log("~p:~p connected", [?MODULE,?LINE]),

ExecChannelPid = spawn(
fun() ->
{ok, ChannelId0} = ssh_connection:session_channel(ConnectionRef, infinity),

%% This is to get peer's connection handler PID ({conn_peer ...} below) and suspend it
{ok, ChannelId1} = ssh_connection:session_channel(ConnectionRef, infinity),
ssh_connection:subsystem(ConnectionRef, ChannelId1, "echo_n", infinity),
ssh_connection:close(ConnectionRef, ChannelId1),
receive
{ssh_cm, ConnectionRef, {closed, 1}} -> ok
end,

Parent ! {self(), channelId, ChannelId0},
Result = receive
cmd ->
ct:log("~p:~p Channel ~p executing", [?MODULE, ?LINE, ChannelId0]),
success = ssh_connection:exec(ConnectionRef, ChannelId0, "testing", infinity),
Expect = <<"echo testing\n">>,
ExpSz = size(Expect),
receive
{ssh_cm, ConnectionRef, {data, ChannelId0, 0,
<<Expect:ExpSz/binary, _/binary>>}} = R ->
ct:log("~p:~p Got expected ~p",[?MODULE,?LINE, R]),
ok;
Other ->
ct:log("~p:~p Got unexpected ~p~nExpect: ~p~n",
[?MODULE,?LINE, Other, {ssh_cm, ConnectionRef,
{data, ChannelId0, 0, Expect}}]),
{fail, "Unexpected data"}
after 5000 ->
{fail, "Exec Timeout"}
end;
stop -> {fail, "Stopped"}
end,
Parent ! {self(), Result}
end),
try
TestResult = receive
{ExecChannelPid, channelId, ExId} ->
ct:log("~p:~p Channel that should stay: ~p pid ~p", [?MODULE, ?LINE, ExId, ExecChannelPid]),
ConnPeer = receive {conn_peer, CM} -> CM end,
%% The sole purpose of this channel is to go down before the opening procedure is complete
DownChannelPid = spawn(
fun() ->
ct:log("~p:~p open channel (incomplete)",[?MODULE,?LINE]),
Parent ! {self(), channelId, ok},
%% This is to prevent the peer from answering our 'channel-open' in time
sys:suspend(ConnPeer),
{ok, _ChannelId} = ssh_connection:session_channel(ConnectionRef, infinity),
ct:log("~p:~p open incomplete channel done - should not have happened",[?MODULE,?LINE]),
Parent ! {self(), {fail, "Unexpected channel success"}}
end),
MonRef = erlang:monitor(process, DownChannelPid),
receive
{DownChannelPid, channelId, ok} ->
ct:log("~p:~p Channel handler that won't continue: pid ~p", [?MODULE, ?LINE, DownChannelPid]),
ensure_channels(ConnectionRef, 2),
channel_down_sequence(DownChannelPid, ExecChannelPid, ExId, MonRef, ConnectionRef, ConnPeer)
end
end,
ensure_channels(ConnectionRef, 0)
after
ssh:close(ConnectionRef),
ssh:stop_daemon(Pid)
end.

ensure_channels(ConnRef, Expected) ->
{ok, ChannelList} = ssh_connection_handler:info(ConnRef),
do_ensure_channels(ConnRef, Expected, length(ChannelList)).

do_ensure_channels(_ConnRef, NumExpected, NumExpected) ->
ok;
do_ensure_channels(ConnRef, NumExpected, _ChannelListLen) ->
receive after 100 -> ok end,
{ok, ChannelList} = ssh_connection_handler:info(ConnRef),
do_ensure_channels(ConnRef, NumExpected, length(ChannelList)).

channel_down_sequence(DownChannelPid, ExecChannelPid, ExecChannelId, MonRef, ConnRef, Peer) ->
ct:log("~p:~p sending order to go down", [?MODULE, ?LINE]),
exit(DownChannelPid, die),
receive {'DOWN', MonRef, _, _, _} -> ok end,
ct:log("~p:~p order executed, sending order to proceed", [?MODULE, ?LINE]),
%% Resume the peer connection to let it clean up among its channels
sys:resume(Peer),
ensure_channels(ConnRef, 1),
ExecChannelPid ! cmd,
try
receive
{ExecChannelPid, ok} ->
ct:log("~p:~p expected exec result: ~p", [?MODULE, ?LINE, ok]),
ok;
{ExecChannelPid, Result} ->
ct:log("~p:~p Unexpected exec result: ~p", [?MODULE, ?LINE, Result]),
{fail, "Unexpected exec result"}
after 5000 ->
{fail, "Exec result timeout"}
end
after
ssh_connection:close(ConnRef, ExecChannelId)
end.

%%--------------------------------------------------------------------
%% Internal functions ------------------------------------------------
%%--------------------------------------------------------------------
Expand Down
11 changes: 9 additions & 2 deletions lib/ssh/test/ssh_echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
n,
id,
cm,
dbg = false
dbg = false,
parent
}).
-export([init/1, handle_msg/2, handle_ssh_msg/2, terminate/2]).

Expand All @@ -42,13 +43,19 @@ init([N]) ->
{ok, #state{n = N}};
init([N,Opts]) ->
State = #state{n = N,
dbg = proplists:get_value(dbg,Opts,false)
dbg = proplists:get_value(dbg,Opts,false),
parent = proplists:get_value(parent, Opts)
},
?DBG(State, "init([~p])",[N]),
{ok, State}.

handle_msg({ssh_channel_up, ChannelId, ConnectionManager}, State) ->
?DBG(State, "ssh_channel_up Cid=~p ConnMngr=~p",[ChannelId,ConnectionManager]),
Pid = State#state.parent,
if Pid /= undefined ->
Pid ! {conn_peer, ConnectionManager};
true -> ok
end,
{ok, State#state{id = ChannelId,
cm = ConnectionManager}}.

Expand Down
Loading