diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index 72c570bd..c13d5c97 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -506,103 +506,18 @@ defmodule Supavisor.ClientHandler do {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}} end - # handle Terminate message - def handle_event(:info, {proto, _, <>}, :idle, _) - when proto in @proto do - Logger.info("ClientHandler: Terminate received from client") - {:stop, {:shutdown, :terminate_received}} - end - - # handle Sync message - def handle_event(:info, {proto, _, <> = msg}, :idle, data) - when proto in @proto do - Logger.debug("ClientHandler: Receive sync") - - # db_pid can be nil in transaction mode, so we will send ready_for_query - # without checking out a direct connection. If there is a linked db_pid, - # we will forward the message to it - if data.db_pid != nil, - do: :ok = sock_send_maybe_active_once(msg, data), - else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query()) - - {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} - end - - def handle_event(:info, {proto, _, <> = msg}, _, data) - when proto in @proto do - Logger.debug("ClientHandler: Receive sync while not idle") - :ok = sock_send_maybe_active_once(msg, data) - {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} - end - - # handle Flush message - def handle_event(:info, {proto, _, <> = msg}, _, data) - when proto in @proto do - Logger.debug("ClientHandler: Receive flush while not idle") - :ok = sock_send_maybe_active_once(msg, data) - {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} - end - - # incoming query with a single pool - def handle_event(:info, {proto, _, bin}, :idle, %{pool: pid} = data) - when is_binary(bin) and is_pid(pid) and proto in @proto do - Logger.debug("ClientHandler: Receive query #{inspect(bin)}") - db_pid = db_checkout(:both, :on_query, data) - handle_prepared_statements(db_pid, bin, data) - - {:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()}, - {:next_event, :internal, {proto, nil, bin}}} - end - - def handle_event(:info, {proto, _, bin}, _, %{mode: :proxy} = data) when proto in @proto do - {:next_state, :busy, %{data | query_start: System.monotonic_time()}, - {:next_event, :internal, {proto, nil, bin}}} - end - - # incoming query with read/write pools - def handle_event(:info, {proto, _, bin}, :idle, data) when proto in @proto do - query_type = - with {:ok, payload} <- Client.get_payload(bin), - {:ok, statements} <- Supavisor.PgParser.statements(payload) do - Logger.debug( - "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}" - ) - - case statements do - # naive check for read only queries - ["SelectStmt"] -> :read - _ -> :write - end - else - other -> - Logger.error("ClientHandler: Receive query error: #{inspect(other)}") - :write - end - - ts = System.monotonic_time() - db_pid = db_checkout(query_type, :on_query, data) - - {:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin}, - {:next_event, :internal, {proto, nil, bin}}} - end - - # forward query to db - def handle_event(_, {proto, _, bin}, :busy, data) when proto in @proto do - Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}") - - case sock_send_maybe_active_once(bin, data) do - :ok -> - {:keep_state, %{data | active_count: data.active_count + 1}} - - error -> - Logger.error("ClientHandler: error while sending query: #{inspect(error)}") - - HandlerHelpers.sock_send( - data.sock, - Server.error_message("XX000", "Error while sending query") - ) - - {:stop, {:shutdown, :send_query_error}} + def handle_event(kind, {proto, socket, msg}, state, data) + when proto in @proto and is_binary(msg) do + with {:next_state, next_state, new_data, actions} <- handle_data(kind, msg, state, data) do + new_actions = + actions + |> List.wrap() + |> Enum.map(fn + {:next_event, type, bin} when is_binary(bin) -> {:next_event, type, {proto, socket, bin}} + other -> other + end) + + {:next_state, next_state, new_data, new_actions} end end @@ -1048,6 +963,104 @@ defmodule Supavisor.ClientHandler do end end + @spec handle_data(kind :: atom(), data :: binary(), state, data) :: :gen_statem.event_handler_result(data) + when state: atom() | term(), + data: term() + + # handle Terminate message + defp handle_data(:info, <>, :idle, _data) do + Logger.info("ClientHandler: Terminate received from client") + {:stop, {:shutdown, :terminate_received}} + end + + defp handle_data(:info, <> <> _ = msg, :idle, data) do + Logger.debug("ClientHandler: Receive sync") + + # db_pid can be nil in transaction mode, so we will send ready_for_query + # without checking out a direct connection. If there is a linked db_pid, + # we will forward the message to it + if data.db_pid != nil, + do: :ok = sock_send_maybe_active_once(msg, data), + else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query()) + + {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} + end + + defp handle_data(:info, <> = msg, _, data) do + Logger.debug("ClientHandler: Receive sync while not idle") + :ok = sock_send_maybe_active_once(msg, data) + {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} + end + + # handle Flush message + defp handle_data(:info, <> = msg, _, data) do + Logger.debug("ClientHandler: Receive flush while not idle") + :ok = sock_send_maybe_active_once(msg, data) + {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} + end + + # incoming query with a single pool + defp handle_data(:info, bin, :idle, %{pool: pid} = data) when is_pid(pid) do + Logger.debug("ClientHandler: Receive query #{inspect(bin)}") + db_pid = db_checkout(:both, :on_query, data) + handle_prepared_statements(db_pid, bin, data) + + {:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()}, + {:next_event, :internal, bin}} + end + + defp handle_data(:info, bin, _, %{mode: :proxy} = data) do + {:next_state, :busy, %{data | query_start: System.monotonic_time()}, + {:next_event, :internal, bin}} + end + + # incoming query with read/write pools + defp handle_data(:info, bin, :idle, data) do + query_type = + with {:ok, payload} <- Client.get_payload(bin), + {:ok, statements} <- Supavisor.PgParser.statements(payload) do + Logger.debug( + "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}" + ) + + case statements do + # naive check for read only queries + ["SelectStmt"] -> :read + _ -> :write + end + else + other -> + Logger.error("ClientHandler: Receive query error: #{inspect(other)}") + :write + end + + ts = System.monotonic_time() + db_pid = db_checkout(query_type, :on_query, data) + + {:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin}, + {:next_event, :internal, bin}} + end + + # forward query to db + defp handle_data(_, bin, :busy, data) do + Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}") + + case sock_send_maybe_active_once(bin, data) do + :ok -> + {:keep_state, %{data | active_count: data.active_count + 1}} + + error -> + Logger.error("ClientHandler: error while sending query: #{inspect(error)}") + + HandlerHelpers.sock_send( + data.sock, + Server.error_message("XX000", "Error while sending query") + ) + + {:stop, {:shutdown, :send_query_error}} + end + end + @spec handle_prepared_statements({pid, pid, Supavisor.sock()}, binary, map) :: :ok | nil defp handle_prepared_statements({_, pid, _}, bin, %{mode: :transaction} = data) do with {:ok, payload} <- Client.get_payload(bin),