From 57088b5d0c20ea47dceaafb246b9ee67b7a6aa9c Mon Sep 17 00:00:00 2001 From: Stas Date: Wed, 31 Jan 2024 19:05:00 +0100 Subject: [PATCH] chore: improve logging (#292) --- VERSION | 2 +- lib/supavisor/client_handler.ex | 169 +++++++++++++++++--------------- lib/supavisor/db_handler.ex | 77 +++++++++------ 3 files changed, 139 insertions(+), 109 deletions(-) diff --git a/VERSION b/VERSION index 0bfff3dc..5166d134 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.1.25 +1.1.26 diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index ad1f9c73..b6d14ee7 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -71,22 +71,22 @@ defmodule Supavisor.ClientHandler do @impl true def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, data) do - Logger.debug("Client is trying to request HTTP") + Logger.debug("ClientHandler: Client is trying to request HTTP") HH.sock_send(data.sock, "HTTP/1.1 204 OK\r\n\r\n") - {:stop, :normal} + {:stop, {:shutdown, :http_request}} end # cancel request def handle_event(:info, {_, _, <<16::32, 1234::16, 5678::16, pid::32, key::32>>}, _, _) do - Logger.debug("Got cancel query for #{inspect({pid, key})}") + Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}") :ok = HH.send_cancel_query(pid, key) - {:stop, :normal} + {:stop, {:shutdown, :cancel_query}} end # send cancel request to db def handle_event(:info, :cancel_query, :busy, data) do key = {data.tenant, data.db_pid} - Logger.debug("Cancel query for #{inspect(key)}") + Logger.debug("ClientHandler: Cancel query for #{inspect(key)}") {_pool, db_pid} = data.db_pid case db_pid_meta(key) do @@ -94,14 +94,16 @@ defmodule Supavisor.ClientHandler do :ok = HH.cancel_query(meta.host, meta.port, meta.ip_ver, meta.pid, meta.key) error -> - Logger.error("Received cancel but no proc was found #{inspect(key)} #{inspect(error)}") + Logger.error( + "ClientHandler: Received cancel but no proc was found #{inspect(key)} #{inspect(error)}" + ) end :keep_state_and_data end def handle_event(:info, {:tcp, _, <<_::64>>}, :exchange, %{sock: sock} = data) do - Logger.debug("Client is trying to connect with SSL") + Logger.debug("ClientHandler: Client is trying to connect with SSL") downstream_cert = H.downstream_cert() downstream_key = H.downstream_key() @@ -123,12 +125,15 @@ defmodule Supavisor.ClientHandler do {:keep_state, %{data | sock: socket, ssl: true}} error -> - Logger.error("SSL handshake error: #{inspect(error)}") + Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}") Telem.client_join(:fail, data.id) - {:stop, :normal} + {:stop, {:shutdown, :ssl_handshake_error}} end else - Logger.error("User requested SSL connection but no downstream cert/key found") + Logger.error( + "ClientHandler: User requested SSL connection but no downstream cert/key found" + ) + :ok = HH.sock_send(data.sock, "N") :keep_state_and_data end @@ -137,7 +142,7 @@ defmodule Supavisor.ClientHandler do def handle_event(:info, {_, _, bin}, :exchange, data) do case Server.decode_startup_packet(bin) do {:ok, hello} -> - Logger.debug("Client startup message: #{inspect(hello)}") + Logger.debug("ClientHandler: Client startup message: #{inspect(hello)}") {type, {user, tenant_or_alias, db_name}} = HH.parse_user_info(hello.payload) log_level = @@ -152,9 +157,9 @@ defmodule Supavisor.ClientHandler do {:next_event, :internal, {:hello, {type, {user, tenant_or_alias, db_name}}}}} {:error, error} -> - Logger.error("Client startup message error: #{inspect(error)}") + Logger.error("ClientHandler: Client startup message error: #{inspect(error)}") Telem.client_join(:fail, data.id) - {:stop, :normal} + {:stop, {:shutdown, :startup_packet_error}} end end @@ -195,46 +200,50 @@ defmodule Supavisor.ClientHandler do cond do info.tenant.enforce_ssl and !data.ssl -> - Logger.error("Tenant is not allowed to connect without SSL, user #{user}") + Logger.error( + "ClientHandler: Tenant is not allowed to connect without SSL, user #{user}" + ) :ok = HH.send_error(sock, "XX000", "SSL connection is required") Telem.client_join(:fail, id) - {:stop, :normal} + {:stop, {:shutdown, :ssl_required}} HH.filter_cidrs(info.tenant.allow_list, addr) == [] -> message = "Address not in tenant allow_list: " <> inspect(addr) - Logger.error(message) + Logger.error("ClientHandler: #{message}") :ok = HH.send_error(sock, "XX000", message) Telem.client_join(:fail, id) - {:stop, :normal} + {:stop, {:shutdown, :address_not_allowed}} true -> new_data = update_user_data(data, info, user, id, db_name, mode) case auth_secrets(info, user) do {:ok, auth_secrets} -> - Logger.debug("Authentication method: #{inspect(auth_secrets)}") + Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}") {:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets}}} {:error, reason} -> - Logger.error("Authentication auth_secrets error: #{inspect(reason)}") + Logger.error( + "ClientHandler: Authentication auth_secrets error: #{inspect(reason)}" + ) :ok = HH.send_error(sock, "XX000", "Authentication error") Telem.client_join(:fail, id) - {:stop, :normal} + {:stop, {:shutdown, :auth_secrets_error}} end end {:error, reason} -> Logger.error( - "User not found: #{inspect(reason)} #{inspect({type, user, tenant_or_alias})}" + "ClientHandler: User not found: #{inspect(reason)} #{inspect({type, user, tenant_or_alias})}" ) :ok = HH.send_error(sock, "XX000", "Tenant or user not found") Telem.client_join(:fail, data.id) - {:stop, :normal} + {:stop, {:shutdown, :user_not_found}} end end @@ -244,11 +253,13 @@ defmodule Supavisor.ClientHandler do _, %{sock: sock} = data ) do - Logger.debug("Handle exchange, auth method: #{inspect(method)}") + Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}") case handle_exchange(sock, {method, secrets}) do {:error, reason} -> - Logger.error("Exchange error: #{inspect(reason)} when method #{inspect(method)}") + Logger.error( + "ClientHandler: Exchange error: #{inspect(reason)} when method #{inspect(method)}" + ) msg = if method == :auth_query_md5 do @@ -259,7 +270,7 @@ defmodule Supavisor.ClientHandler do HH.sock_send(sock, msg) Telem.client_join(:fail, data.id) - {:stop, :normal} + {:stop, {:shutdown, :exchange_error}} {:ok, client_key} -> secrets = @@ -271,7 +282,7 @@ defmodule Supavisor.ClientHandler do secrets end - Logger.debug("Exchange success") + Logger.debug("ClientHandler: Exchange success") :ok = HH.sock_send(sock, Server.authentication_ok()) Telem.client_join(:ok, data.id) @@ -281,7 +292,7 @@ defmodule Supavisor.ClientHandler do end def handle_event(:internal, :subscribe, _, data) do - Logger.debug("Subscribe to tenant #{inspect(data.id)}") + Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}") with {:ok, sup} <- Supavisor.start_dist(data.id, data.auth_secrets, log_level: data.log_level), @@ -302,13 +313,13 @@ defmodule Supavisor.ClientHandler do else {:error, :max_clients_reached} -> msg = "Max client connections reached" - Logger.error(msg) + Logger.error("ClientHandler: #{msg}") :ok = HH.send_error(data.sock, "XX000", msg) Telem.client_join(:fail, data.id) - {:stop, :normal} + {:stop, {:shutdown, :max_clients_reached}} error -> - Logger.error("Subscribe error: #{inspect(error)}") + Logger.error("ClientHandler: Subscribe error: #{inspect(error)}") {:keep_state_and_data, {:timeout, 1000, :subscribe}} end end @@ -327,19 +338,21 @@ defmodule Supavisor.ClientHandler do end def handle_event(:timeout, :wait_ps, _, data) do - Logger.error("Wait parameter status timeout, send default #{inspect(data.ps)}}") + Logger.error( + "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}" + ) ps = Server.encode_parameter_status(data.ps) {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}} end def handle_event(:timeout, :idle_terminate, _, data) do - Logger.warning("Terminate an idle connection by #{data.idle_timeout} timeout") - {:stop, :normal, data} + Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout") + {:stop, {:shutdown, :idle_terminate}} end def handle_event(:timeout, :heartbeat_check, _, data) do - Logger.debug("Send heartbeat to client") + Logger.debug("ClientHandler: Send heartbeat to client") HH.sock_send(data.sock, Server.application_name()) {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}} end @@ -347,14 +360,14 @@ defmodule Supavisor.ClientHandler do # handle Terminate message def handle_event(:info, {proto, _, <>}, :idle, _) when proto in [:tcp, :ssl] do - Logger.debug("Receive termination") - {:stop, :normal} + Logger.debug("ClientHandler: Terminate received from client") + {:stop, {:shutdown, :terminate_received}} end # handle Sync message def handle_event(:info, {proto, _, <>}, :idle, data) when proto in [:tcp, :ssl] do - Logger.debug("Receive sync") + Logger.debug("ClientHandler: Receive sync") :ok = HH.sock_send(data.sock, Server.ready_for_query()) {:keep_state_and_data, handle_actions(data)} end @@ -376,7 +389,7 @@ defmodule Supavisor.ClientHandler do with {:ok, payload} <- Client.get_payload(bin), {:ok, statements} <- Supavisor.PgParser.statements(payload) do Logger.debug( - "Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}" + "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}" ) case statements do @@ -386,7 +399,7 @@ defmodule Supavisor.ClientHandler do end else other -> - Logger.error("Receive query error: #{inspect(other)}") + Logger.error("ClientHandler: Receive query error: #{inspect(other)}") :write end @@ -404,33 +417,33 @@ defmodule Supavisor.ClientHandler do case Db.call(db_pid, self(), bin) do :ok -> - Logger.debug("DbHandler call success") + Logger.debug("ClientHandler: DbHandler call success") :keep_state_and_data {:buffering, size} -> - Logger.debug("DbHandler call buffering #{size}") + Logger.debug("ClientHandler: DbHandler call buffering #{size}") if size > 1_000_000 do msg = "DbHandler buffer size is too big: #{size}" - Logger.error(msg) + Logger.error("ClientHandler: #{msg}") HH.sock_send(data.sock, Server.error_message("XX000", msg)) - {:stop, :normal} + {:stop, {:shutdown, :buffer_size}} else - Logger.debug("DbHandler call buffering") + Logger.debug("ClientHandler: DbHandler call buffering") :keep_state_and_data end {:error, reason} -> msg = "DbHandler error: #{inspect(reason)}" - Logger.error(msg) + Logger.error("ClientHandler: #{msg}") HH.sock_send(data.sock, Server.error_message("XX000", msg)) - {:stop, :normal} + {:stop, {:shutdown, :db_handler_error}} end end def handle_event(:info, {:parameter_status, :updated}, _, _) do - Logger.warning("Parameter status is updated") - {:stop, :normal} + Logger.warning("ClientHandler: Parameter status is updated") + {:stop, {:shutdown, :parameter_status_updated}} end def handle_event(:info, {:parameter_status, ps}, :exchange, _) do @@ -438,27 +451,22 @@ defmodule Supavisor.ClientHandler do end # client closed connection - def handle_event(_, {:tcp_closed, _}, _, data) do - Logger.debug("tcp soket closed for #{inspect(data.tenant)}") - {:stop, :normal} - end - def handle_event(_, {closed, _}, _, data) when closed in [:tcp_closed, :ssl_closed] do - Logger.debug("#{closed} soket closed for #{inspect(data.tenant)}") - {:stop, :normal} + Logger.debug("ClientHandler: #{closed} socket closed for #{inspect(data.tenant)}") + {:stop, {:shutdown, :socket_closed}} end # linked DbHandler went down def handle_event(:info, {:EXIT, db_pid, reason}, _, _) do - Logger.error("DB handler #{inspect(db_pid)} exited #{inspect(reason)}") - {:stop, :normal} + Logger.error("ClientHandler: DbHandler #{inspect(db_pid)} exited #{inspect(reason)}") + {:stop, {:shutdown, :db_handler_exit}} end # pool's manager went down def handle_event(:info, {:DOWN, _, _, _, reason}, state, data) do Logger.error( - "Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}" + "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}" ) case state do @@ -466,17 +474,17 @@ defmodule Supavisor.ClientHandler do {:keep_state_and_data, {:next_event, :internal, :subscribe}} :busy -> - {:stop, :normal} + {:stop, {:shutdown, :manager_down}} end end # emulate handle_cast def handle_event(:cast, {:client_cast, bin, status}, _, data) do - Logger.debug("--> --> bin #{inspect(byte_size(bin))} bytes") + Logger.debug("ClientHandler: --> --> bin #{inspect(byte_size(bin))} bytes") case status do :ready_for_query -> - Logger.debug("Client is ready") + Logger.debug("ClientHandler: Client is ready") db_pid = handle_db_pid(data.mode, data.pool, data.db_pid) @@ -488,12 +496,12 @@ defmodule Supavisor.ClientHandler do {:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, actions} :continue -> - Logger.debug("Client is not ready") + Logger.debug("ClientHandler: Client is not ready") :ok = HH.sock_send(data.sock, bin) :keep_state_and_data :read_sql_error -> - Logger.error("read only sql transaction, reruning the query to write pool") + Logger.error("ClientHandler: read only sql transaction, reruning the query to write pool") # release the read pool _ = handle_db_pid(data.mode, data.pool, data.db_pid) @@ -514,7 +522,7 @@ defmodule Supavisor.ClientHandler do {"data", data} ] - Logger.debug("Undefined msg: #{inspect(msg, pretty: true)}") + Logger.debug("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}") :keep_state_and_data end @@ -528,30 +536,35 @@ defmodule Supavisor.ClientHandler do msg = case data.mode do :session -> - "Too many clients already" + "Max client connections reached" :transaction -> "Unable to check out process from the pool due to timeout" end - Logger.error(msg) + Logger.error("ClientHandler: #{msg}") HH.sock_send(data.sock, Server.error_message("XX000", msg)) :ok end def terminate(reason, _state, %{db_pid: {_, pid}}) do - if Db.get_state(pid) == :busy do - Logger.warning("Kill DbHandler #{inspect(pid)}, reason #{inspect(reason)}") - Db.stop(pid) - else - Logger.warning("ClientHanlder termination with reason #{inspect(reason)}") + case Db.get_state(pid) do + {:ok, :busy} -> + Logger.warning( + "ClientHandler: socket closed and kill DbHandler #{inspect(pid)}, reason #{inspect(reason)}" + ) + + Db.stop(pid) + + _ -> + Logger.warning("ClientHandler: socket closed with reason #{inspect(reason)}") end :ok end def terminate(reason, _state, _data) do - Logger.warning("ClientHanlder termination with reason #{inspect(reason)}") + Logger.warning("ClientHandler: socket closed with reason #{inspect(reason)}") :ok end @@ -854,24 +867,24 @@ defmodule Supavisor.ClientHandler do with {:ok, payload} <- Client.get_payload(bin), {:ok, statamets} <- Supavisor.PgParser.statements(payload), true <- Enum.member?([["PrepareStmt"], ["DeallocateStmt"]], statamets) do - Logger.info("Handle prepared statement #{inspect(payload)}") + Logger.info("ClientHandler: Handle prepared statement #{inspect(payload)}") GenServer.call(data.pool, :get_all_workers) |> Enum.each(fn {_, ^pid, _, [Supavisor.DbHandler]} -> - Logger.debug("Linked DbHandler #{inspect(pid)}") + Logger.debug("ClientHandler: Linked DbHandler #{inspect(pid)}") nil {_, pool_proc, _, [Supavisor.DbHandler]} -> Logger.debug( - "Sending prepared statement change #{inspect(payload)} to #{inspect(pool_proc)}" + "ClientHandler: Sending prepared statement change #{inspect(payload)} to #{inspect(pool_proc)}" ) send(pool_proc, {:handle_ps, payload, bin}) end) else error -> - Logger.debug("Skip prepared statement #{inspect(error)}") + Logger.debug("ClientHandler: Skip prepared statement #{inspect(error)}") end end @@ -881,11 +894,11 @@ defmodule Supavisor.ClientHandler do defp handle_actions(data) do Enum.flat_map(data, fn {:heartbeat_interval, v} = t when v > 0 -> - Logger.debug("Call timeout #{inspect(t)}") + Logger.debug("ClientHandler: Call timeout #{inspect(t)}") [timeout_check(:heartbeat_check, v)] {:idle_timeout, v} = t when v > 0 -> - Logger.debug("Call timeout #{inspect(t)}") + Logger.debug("ClientHandler: Call timeout #{inspect(t)}") [timeout_check(:idle_terminate, v)] _ -> diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index f09faa83..d8f57850 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -26,8 +26,14 @@ defmodule Supavisor.DbHandler do @spec call(pid(), pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()} def call(pid, caller, msg), do: :gen_statem.call(pid, {:db_call, caller, msg}, 15_000) - @spec get_state(pid()) :: state - def get_state(pid), do: :gen_statem.call(pid, :get_state, 5_000) + @spec get_state(pid()) :: {:ok, state} | {:error, term()} + def get_state(pid) do + try do + {:ok, :gen_statem.call(pid, :get_state, 5_000)} + catch + error, reason -> {:error, {error, reason}} + end + end @spec stop(pid()) :: :ok def stop(pid), do: :gen_statem.stop(pid, :client_termination, 5_000) @@ -70,7 +76,7 @@ defmodule Supavisor.DbHandler do @impl true def handle_event(:internal, _, :connect, %{auth: auth} = data) do - Logger.debug("Try to connect to DB") + Logger.debug("DbHandler: Try to connect to DB") sock_opts = [ :binary, @@ -85,7 +91,7 @@ defmodule Supavisor.DbHandler do case :gen_tcp.connect(auth.host, auth.port, sock_opts) do {:ok, sock} -> - Logger.debug("auth #{inspect(auth, pretty: true)}") + Logger.debug("DbHandler: auth #{inspect(auth, pretty: true)}") case try_ssl_handshake({:gen_tcp, sock}, auth) do {:ok, sock} -> @@ -95,18 +101,18 @@ defmodule Supavisor.DbHandler do {:next_state, :authentication, %{data | sock: sock}} {:error, reason} -> - Logger.error("Send startup error #{inspect(reason)}") + Logger.error("DbHandler: Send startup error #{inspect(reason)}") reconnect_callback end {:error, error} -> - Logger.error("Handshake error #{inspect(error)}") + Logger.error("DbHandler: Handshake error #{inspect(error)}") reconnect_callback end other -> Logger.error( - "Connection failed #{inspect(other)} to #{inspect(auth.host)}:#{inspect(auth.port)}" + "DbHandler: Connection failed #{inspect(other)} to #{inspect(auth.host)}:#{inspect(auth.port)}" ) reconnect_callback @@ -114,13 +120,13 @@ defmodule Supavisor.DbHandler do end def handle_event(:state_timeout, :connect, _state, _) do - Logger.warning("Reconnect") + Logger.warning("DbHandler: Reconnect") {:keep_state_and_data, {:next_event, :internal, :connect}} end def handle_event(:info, {proto, _, bin}, :authentication, data) when proto in @proto do dec_pkt = Server.decode(bin) - Logger.debug("dec_pkt, #{inspect(dec_pkt, pretty: true)}") + Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}") resp = Enum.reduce(dec_pkt, {%{}, nil}, fn @@ -134,14 +140,14 @@ defmodule Supavisor.DbHandler do key = self() conn = %{host: data.auth.host, port: data.auth.port, ip_ver: data.auth.ip_version} Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn)) - Logger.debug("Backend #{inspect(key)} data: #{inspect(payload)}") + Logger.debug("DbHandler: Backend #{inspect(key)} data: #{inspect(payload)}") acc %{payload: {:authentication_sasl_password, methods_b}}, {ps, _} -> nonce = case Server.decode_string(methods_b) do {:ok, req_method, _} -> - Logger.debug("SASL method #{inspect(req_method)}") + Logger.debug("DbHandler: SASL method #{inspect(req_method)}") nonce = :pgo_scram.get_nonce(16) user = get_user(data.auth) client_first = :pgo_scram.get_client_first(user, nonce) @@ -159,7 +165,7 @@ defmodule Supavisor.DbHandler do nonce other -> - Logger.error("Undefined sasl method #{inspect(other)}") + Logger.error("DbHandler: Undefined sasl method #{inspect(other)}") nil end @@ -206,7 +212,7 @@ defmodule Supavisor.DbHandler do acc %{payload: {:authentication_md5_password, salt}}, {ps, _} -> - Logger.debug("dec_pkt, #{inspect(dec_pkt, pretty: true)}") + Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}") digest = if data.auth.method == :password do @@ -238,18 +244,21 @@ defmodule Supavisor.DbHandler do {:keep_state, data} {:error_response, error} -> - Logger.error("Error auth response #{inspect(error)}") + Logger.error("DbHandler: Error auth response #{inspect(error)}") {:keep_state, data} {:ready_for_query, ps, db_state} -> - Logger.debug("DB ready_for_query: #{inspect(db_state)} #{inspect(ps, pretty: true)}") + Logger.debug( + "DbHandler: DB ready_for_query: #{inspect(db_state)} #{inspect(ps, pretty: true)}" + ) + Supavisor.set_parameter_status(data.id, ps) {:next_state, :idle, %{data | parameter_status: ps}, {:next_event, :internal, :check_buffer}} other -> - Logger.error("Undefined auth response #{inspect(other)}") + Logger.error("DbHandler: Undefined auth response #{inspect(other)}") {:stop, :auth_error, data} end end @@ -257,7 +266,7 @@ defmodule Supavisor.DbHandler do def handle_event(:internal, :check_buffer, :idle, %{buffer: buff, caller: caller} = data) when is_pid(caller) do if buff != [] do - Logger.debug("Buffer is not empty, try to send #{IO.iodata_length(buff)} bytes") + Logger.debug("DbHandler: Buffer is not empty, try to send #{IO.iodata_length(buff)} bytes") buff = Enum.reverse(buff) :ok = sock_send(data.sock, buff) end @@ -268,7 +277,10 @@ defmodule Supavisor.DbHandler do # check if it needs to apply queries from the anon buffer def handle_event(:internal, :check_anon_buffer, _, %{anon_buffer: buff, caller: nil} = data) do if buff != [] do - Logger.debug("Anon buffer is not empty, try to send #{IO.iodata_length(buff)} bytes") + Logger.debug( + "DbHandler: Anon buffer is not empty, try to send #{IO.iodata_length(buff)} bytes" + ) + buff = Enum.reverse(buff) :ok = sock_send(data.sock, buff) end @@ -278,19 +290,19 @@ defmodule Supavisor.DbHandler do # the process received message from db without linked caller def handle_event(:info, {proto, _, bin}, _, %{caller: nil}) when proto in @proto do - Logger.warning("Got db response #{inspect(bin)} when caller was nil") + Logger.warning("DbHandler: Got db response #{inspect(bin)} when caller was nil") :keep_state_and_data end def handle_event(:info, {proto, _, bin}, _, %{replica_type: :read} = data) when proto in @proto do - Logger.debug("Got read replica message #{inspect(bin)}") + Logger.debug("DbHandler: Got read replica message #{inspect(bin)}") pkts = Server.decode(bin) resp = cond do Server.has_read_only_error?(pkts) -> - Logger.error("read only error") + Logger.error("DbHandler: read only error") with [_] <- pkts do # need to flush ready_for_query if it's not in same packet @@ -318,7 +330,7 @@ defmodule Supavisor.DbHandler do def handle_event(:info, {proto, _, bin}, _, %{caller: caller} = data) when is_pid(caller) and proto in @proto do - Logger.debug("Got write replica message #{inspect(bin)}") + Logger.debug("DbHandler: Got write replica message #{inspect(bin)}") # check if the response ends with "ready for query" ready = if String.ends_with?(bin, Server.ready_for_query()) do @@ -342,7 +354,7 @@ defmodule Supavisor.DbHandler do end def handle_event(:info, {:handle_ps, payload, bin}, _state, data) do - Logger.notice("Apply prepare statement change #{inspect(payload)}") + Logger.notice("DbHandler: Apply prepare statement change #{inspect(payload)}") {:keep_state, %{data | anon_buffer: [bin | data.anon_buffer]}, {:next_event, :internal, :check_anon_buffer}} @@ -360,7 +372,7 @@ defmodule Supavisor.DbHandler do def handle_event({:call, from}, {:db_call, caller, bin}, state, %{buffer: buff} = data) do Logger.debug( - "state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(caller)}" + "DbHandler: state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(caller)}" ) new_buff = [bin | buff] @@ -373,7 +385,7 @@ defmodule Supavisor.DbHandler do end def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do - Logger.error("Connection closed when state was #{state}") + Logger.error("DbHandler: Connection closed when state was #{state}") {:next_state, :connect, data, {:state_timeout, 2_500, :connect}} end @@ -384,13 +396,15 @@ defmodule Supavisor.DbHandler do # linked client_handler went down def handle_event(_, {:EXIT, pid, reason}, state, data) do if reason != :normal do - Logger.error("Client handler #{inspect(pid)} went down with reason #{inspect(reason)}") + Logger.error( + "DbHandler: ClientHandler #{inspect(pid)} went down with reason #{inspect(reason)}" + ) end if state == :busy || data.mode == :session do :ok = sock_send(data.sock, <>) - :ok = :gen_tcp.close(elem(data.sock, 1)) - {:stop, :normal, data} + :gen_tcp.close(elem(data.sock, 1)) + {:stop, {:client_handler_down, data.mode}} else {:keep_state, %{data | caller: nil, buffer: []}} end @@ -404,7 +418,7 @@ defmodule Supavisor.DbHandler do {"data", data} ] - Logger.debug("Undefined msg: #{inspect(msg, pretty: true)}") + Logger.debug("DbHandler: Undefined msg: #{inspect(msg, pretty: true)}") :keep_state_and_data end @@ -417,7 +431,10 @@ defmodule Supavisor.DbHandler do def terminate(reason, state, data) do Telem.handler_action(:db_handler, :stopped, data.id) - Logger.error("Terminating with reason #{inspect(reason)} when state was #{inspect(state)}") + + Logger.error( + "DbHandler: Terminating with reason #{inspect(reason)} when state was #{inspect(state)}" + ) end @spec try_ssl_handshake(S.tcp_sock(), map) :: {:ok, S.sock()} | {:error, term()}