Skip to content

Commit

Permalink
chore: extract handling network data to separate function
Browse files Browse the repository at this point in the history
This approach make data handling more concise and less prone to errors.
This is also will make extraction of different working modes easier in
the future refactoring.
  • Loading branch information
hauleth committed Dec 4, 2024
1 parent 8de5284 commit 36722ca
Showing 1 changed file with 112 additions and 95 deletions.
207 changes: 112 additions & 95 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -506,103 +506,21 @@ defmodule Supavisor.ClientHandler do
{:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
end

# handle Terminate message
def handle_event(:info, {proto, _, <<?X, 4::32>>}, :idle, _)
when proto in @proto do
Logger.info("ClientHandler: Terminate received from client")
{:stop, {:shutdown, :terminate_received}}
end

# handle Sync message
def handle_event(:info, {proto, _, <<?S, 4::32, _::binary>> = msg}, :idle, data)
when proto in @proto do
Logger.debug("ClientHandler: Receive sync")

# db_pid can be nil in transaction mode, so we will send ready_for_query
# without checking out a direct connection. If there is a linked db_pid,
# we will forward the message to it
if data.db_pid != nil,
do: :ok = sock_send_maybe_active_once(msg, data),
else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())

{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

def handle_event(:info, {proto, _, <<?S, 4::32, _::binary>> = msg}, _, data)
when proto in @proto do
Logger.debug("ClientHandler: Receive sync while not idle")
:ok = sock_send_maybe_active_once(msg, data)
{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

# handle Flush message
def handle_event(:info, {proto, _, <<?H, 4::32, _::binary>> = msg}, _, data)
when proto in @proto do
Logger.debug("ClientHandler: Receive flush while not idle")
:ok = sock_send_maybe_active_once(msg, data)
{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

# incoming query with a single pool
def handle_event(:info, {proto, _, bin}, :idle, %{pool: pid} = data)
when is_binary(bin) and is_pid(pid) and proto in @proto do
Logger.debug("ClientHandler: Receive query #{inspect(bin)}")
db_pid = db_checkout(:both, :on_query, data)
handle_prepared_statements(db_pid, bin, data)

{:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()},
{:next_event, :internal, {proto, nil, bin}}}
end

def handle_event(:info, {proto, _, bin}, _, %{mode: :proxy} = data) when proto in @proto do
{:next_state, :busy, %{data | query_start: System.monotonic_time()},
{:next_event, :internal, {proto, nil, bin}}}
end

# incoming query with read/write pools
def handle_event(:info, {proto, _, bin}, :idle, data) when proto in @proto do
query_type =
with {:ok, payload} <- Client.get_payload(bin),
{:ok, statements} <- Supavisor.PgParser.statements(payload) do
Logger.debug(
"ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}"
)
def handle_event(kind, {proto, socket, msg}, state, data)
when proto in @proto and is_binary(msg) do
with {:next_state, next_state, new_data, actions} <- handle_data(kind, msg, state, data) do
new_actions =
actions
|> List.wrap()
|> Enum.map(fn
{:next_event, type, bin} when is_binary(bin) ->
{:next_event, type, {proto, socket, bin}}

case statements do
# naive check for read only queries
["SelectStmt"] -> :read
_ -> :write
end
else
other ->
Logger.error("ClientHandler: Receive query error: #{inspect(other)}")
:write
end

ts = System.monotonic_time()
db_pid = db_checkout(query_type, :on_query, data)

{:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin},
{:next_event, :internal, {proto, nil, bin}}}
end

# forward query to db
def handle_event(_, {proto, _, bin}, :busy, data) when proto in @proto do
Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}")

case sock_send_maybe_active_once(bin, data) do
:ok ->
{:keep_state, %{data | active_count: data.active_count + 1}}

error ->
Logger.error("ClientHandler: error while sending query: #{inspect(error)}")

HandlerHelpers.sock_send(
data.sock,
Server.error_message("XX000", "Error while sending query")
)
other ->
other
end)

{:stop, {:shutdown, :send_query_error}}
{:next_state, next_state, new_data, new_actions}
end
end

Expand Down Expand Up @@ -1048,6 +966,105 @@ defmodule Supavisor.ClientHandler do
end
end

@spec handle_data(kind :: atom(), data :: binary(), state, data) ::
:gen_statem.event_handler_result(data)
when state: atom() | term(),
data: term()

# handle Terminate message
defp handle_data(:info, <<?X, 4::32>>, :idle, _data) do
Logger.info("ClientHandler: Terminate received from client")
{:stop, {:shutdown, :terminate_received}}
end

defp handle_data(:info, <<?S, 4::32>> <> _ = msg, :idle, data) do
Logger.debug("ClientHandler: Receive sync")

# db_pid can be nil in transaction mode, so we will send ready_for_query
# without checking out a direct connection. If there is a linked db_pid,
# we will forward the message to it
if data.db_pid != nil,
do: :ok = sock_send_maybe_active_once(msg, data),
else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())

{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

defp handle_data(:info, <<?S, 4::32, _::binary>> = msg, _, data) do
Logger.debug("ClientHandler: Receive sync while not idle")
:ok = sock_send_maybe_active_once(msg, data)
{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

# handle Flush message
defp handle_data(:info, <<?H, 4::32, _::binary>> = msg, _, data) do
Logger.debug("ClientHandler: Receive flush while not idle")
:ok = sock_send_maybe_active_once(msg, data)
{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

# incoming query with a single pool
defp handle_data(:info, bin, :idle, %{pool: pid} = data) when is_pid(pid) do
Logger.debug("ClientHandler: Receive query #{inspect(bin)}")
db_pid = db_checkout(:both, :on_query, data)
handle_prepared_statements(db_pid, bin, data)

{:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()},
{:next_event, :internal, bin}}
end

defp handle_data(:info, bin, _, %{mode: :proxy} = data) do
{:next_state, :busy, %{data | query_start: System.monotonic_time()},
{:next_event, :internal, bin}}
end

# incoming query with read/write pools
defp handle_data(:info, bin, :idle, data) do
query_type =
with {:ok, payload} <- Client.get_payload(bin),
{:ok, statements} <- Supavisor.PgParser.statements(payload) do
Logger.debug(
"ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}"
)

case statements do
# naive check for read only queries
["SelectStmt"] -> :read
_ -> :write
end
else
other ->
Logger.error("ClientHandler: Receive query error: #{inspect(other)}")
:write
end

ts = System.monotonic_time()
db_pid = db_checkout(query_type, :on_query, data)

{:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin},
{:next_event, :internal, bin}}
end

# forward query to db
defp handle_data(_, bin, :busy, data) do
Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}")

case sock_send_maybe_active_once(bin, data) do
:ok ->
{:keep_state, %{data | active_count: data.active_count + 1}}

error ->
Logger.error("ClientHandler: error while sending query: #{inspect(error)}")

HandlerHelpers.sock_send(
data.sock,
Server.error_message("XX000", "Error while sending query")
)

{:stop, {:shutdown, :send_query_error}}
end
end

@spec handle_prepared_statements({pid, pid, Supavisor.sock()}, binary, map) :: :ok | nil
defp handle_prepared_statements({_, pid, _}, bin, %{mode: :transaction} = data) do
with {:ok, payload} <- Client.get_payload(bin),
Expand Down

0 comments on commit 36722ca

Please sign in to comment.