Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics for connection success/failure and improve error handling #200

Merged
merged 4 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.24
0.9.25
2 changes: 1 addition & 1 deletion lib/supavisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ defmodule Supavisor do
user: db_user,
database: if(db_name != nil, do: db_name, else: db_database),
password: fn -> db_pass end,
application_name: "supavisor",
application_name: "Supavisor",
ip_version: H.ip_version(ip_ver, db_host),
upstream_ssl: tenant_record.upstream_ssl,
upstream_verify: tenant_record.upstream_verify,
Expand Down
58 changes: 36 additions & 22 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ defmodule Supavisor.ClientHandler do
def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, data) do
Logger.debug("Client is trying to request HTTP")
HH.sock_send(data.sock, "HTTP/1.1 204 OK\r\n\r\n")
{:stop, :normal, data}
{:stop, :normal}
end

# cancel request
def handle_event(:info, {_, _, <<16::32, 1234::16, 5678::16, pid::32, key::32>>}, _, data) do
def handle_event(:info, {_, _, <<16::32, 1234::16, 5678::16, pid::32, key::32>>}, _, _) do
Logger.debug("Got cancel query for #{inspect({pid, key})}")
:ok = HH.send_cancel_query(pid, key)
{:stop, :normal, data}
{:stop, :normal}
end

# send cancel request to db
Expand Down Expand Up @@ -120,7 +120,8 @@ defmodule Supavisor.ClientHandler do

error ->
Logger.error("SSL handshake error: #{inspect(error)}")
{:stop, :normal, data}
Telem.client_join(:fail, data.id)
{:stop, :normal}
end
else
Logger.error("User requested SSL connection but no downstream cert/key found")
Expand All @@ -140,7 +141,8 @@ defmodule Supavisor.ClientHandler do

{:error, error} ->
Logger.error("Client startup message error: #{inspect(error)}")
{:stop, :normal, data}
Telem.client_join(:fail, data.id)
{:stop, :normal}
end
end

Expand All @@ -155,7 +157,8 @@ defmodule Supavisor.ClientHandler do
if info.tenant.enforce_ssl and !data.ssl do
Logger.error("Tenant is not allowed to connect without SSL, user #{user}")
:ok = HH.send_error(sock, "XX000", "SSL connection is required")
{:stop, :normal, data}
Telem.client_join(:fail, data.id)
{:stop, :normal}
else
new_data = update_user_data(data, info, user, id)

Expand All @@ -168,15 +171,17 @@ defmodule Supavisor.ClientHandler do
Logger.error("Authentication auth_secrets error: #{inspect(reason)}")

:ok = HH.send_error(sock, "XX000", "Authentication error")
{:stop, :normal, data}
Telem.client_join(:fail, data.id)
{:stop, :normal}
end
end

{:error, reason} ->
Logger.error("User not found: #{inspect(reason)} #{inspect({user, external_id})}")

:ok = HH.send_error(sock, "XX000", "Tenant or user not found")
{:stop, :normal, data}
Telem.client_join(:fail, data.id)
{:stop, :normal}
end
end

Expand All @@ -195,8 +200,8 @@ defmodule Supavisor.ClientHandler do
end

HH.sock_send(sock, msg)

{:stop, :normal, data}
Telem.client_join(:fail, data.id)
{:stop, :normal}

{:ok, client_key} ->
secrets =
Expand All @@ -210,6 +215,7 @@ defmodule Supavisor.ClientHandler do

Logger.debug("Exchange success")
:ok = HH.sock_send(sock, Server.authentication_ok())
Telem.client_join(:ok, data.id)

{:keep_state, %{data | auth_secrets: {method, secrets}},
{:next_event, :internal, :subscribe}}
Expand Down Expand Up @@ -239,7 +245,8 @@ defmodule Supavisor.ClientHandler do
msg = "Max client connections reached"
Logger.error(msg)
:ok = HH.send_error(data.sock, "XX000", msg)
{:stop, :normal, data}
Telem.client_join(:fail, data.id)
{:stop, :normal}

error ->
Logger.error("Subscribe error: #{inspect(error)}")
Expand Down Expand Up @@ -277,9 +284,9 @@ defmodule Supavisor.ClientHandler do
end

# handle Terminate message
def handle_event(:info, {proto, _, <<?X, 4::32>>}, :idle, data) when proto in [:tcp, :ssl] do
def handle_event(:info, {proto, _, <<?X, 4::32>>}, :idle, _) when proto in [:tcp, :ssl] do
Logger.debug("Receive termination")
{:stop, :normal, data}
{:stop, :normal}
end

# handle Sync message
Expand Down Expand Up @@ -315,7 +322,7 @@ defmodule Supavisor.ClientHandler do
msg = "Db buffer size is too big: #{size}"
Logger.error(msg)
HH.sock_send(data.sock, Server.error_message("XX000", msg))
{:stop, :normal, data}
{:stop, :normal}
else
Logger.debug("DB call buffering")
:keep_state_and_data
Expand All @@ -325,13 +332,13 @@ defmodule Supavisor.ClientHandler do
msg = "DB call error: #{inspect(reason)}"
Logger.error(msg)
HH.sock_send(data.sock, Server.error_message("XX000", msg))
{:stop, :normal, data}
{:stop, :normal}
end
end

def handle_event(:info, {:parameter_status, :updated}, _, data) do
def handle_event(:info, {:parameter_status, :updated}, _, _) do
Logger.warning("Parameter status is updated")
{:stop, :normal, data}
{:stop, :normal}
end

def handle_event(:info, {:parameter_status, ps}, :exchange, _) do
Expand Down Expand Up @@ -568,11 +575,17 @@ defmodule Supavisor.ClientHandler do
def auth_secrets(%{tenant: tenant} = info, db_user) do
cache_key = {:secrets, tenant.external_id, db_user}

case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
{:commit, {:cached, get_secrets(info, db_user)}, ttl: 15_000}
end) do
{_, {:cached, value}} -> value
{_, {:cached, value}, _} -> value
fetch = fn _key ->
case get_secrets(info, db_user) do
{:ok, _} = resp -> {:commit, {:cached, resp}, ttl: 30_000}
{:error, _} = resp -> {:ignore, resp}
end
end

case Cachex.fetch(Supavisor.Cache, cache_key, fetch) do
{:ok, {:cached, value}} -> value
{:commit, {:cached, value}, _opts} -> value
{:ignore, resp} -> resp
end
end

Expand All @@ -595,6 +608,7 @@ defmodule Supavisor.ClientHandler do
database: tenant.db_database,
password: user.db_password,
username: user.db_user,
parameters: [application_name: "Supavisor auth_query"],
ssl: tenant.upstream_ssl,
socket_options: [
H.ip_version(tenant.ip_version, tenant.db_host)
Expand Down
7 changes: 1 addition & 6 deletions lib/supavisor/db_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@

case try_ssl_handshake({:gen_tcp, sock}, auth) do
{:ok, sock} ->
case send_startup(sock, auth) do

Check warning on line 75 in lib/supavisor/db_handler.ex

View workflow job for this annotation

GitHub Actions / Formatting Checks

Function body is nested too deep (max depth is 2, was 3).
:ok ->
:ok = activate(sock)
{:next_state, :authentication, %{data | sock: sock}}
Expand Down Expand Up @@ -269,12 +269,7 @@
{:keep_state, %{data | caller: caller, buffer: new_buff}, reply}
end

def handle_event(:info, {:tcp_closed, sock}, state, %{sock: sock} = data) do
Logger.error("Connection closed when state was #{state}")
{:next_state, :connect, data, {:state_timeout, 2_500, :connect}}
end

def handle_event(:info, {:ssl_closed, sock}, state, %{sock: sock} = data) do
def handle_event(_, {closed, _}, state, data) when closed in [:tcp_closed, :ssl_closed] do
Logger.error("Connection closed when state was #{state}")
{:next_state, :connect, data, {:state_timeout, 2_500, :connect}}
end
Expand Down
9 changes: 9 additions & 0 deletions lib/supavisor/monitoring/telem.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,13 @@ defmodule Supavisor.Monitoring.Telem do
%{tenant: tenant, user: user, mode: mode}
)
end

@spec client_join(:ok | :fail, S.id()) :: :ok
def client_join(status, {tenant, user, mode}) do
:telemetry.execute(
[:supavisor, :client, :joins, status],
%{},
%{tenant: tenant, user: user, mode: mode}
)
end
end
12 changes: 12 additions & 0 deletions lib/supavisor/monitoring/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ defmodule Supavisor.PromEx.Plugins.Tenant do
event_name: [:supavisor, :client, :query, :stop],
description: "The total number of queries received by clients.",
tags: @tags
),
counter(
[:supavisor, :client, :joins, :ok],
event_name: [:supavisor, :client, :joins, :ok],
description: "The total number of successful joins.",
tags: @tags
),
counter(
[:supavisor, :client, :joins, :fail],
event_name: [:supavisor, :client, :joins, :fail],
description: "The total number of failed joins.",
tags: @tags
)
]
)
Expand Down
1 change: 1 addition & 0 deletions lib/supavisor_web/views/tenant_view.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule SupavisorWeb.TenantView do
require_user: tenant.require_user,
auth_query: tenant.auth_query,
sni_hostname: tenant.sni_hostname,
default_pool_size: tenant.default_pool_size,
default_max_clients: tenant.default_max_clients,
client_idle_timeout: tenant.client_idle_timeout,
default_pool_strategy: tenant.default_pool_strategy,
Expand Down
Loading