Skip to content

Commit

Permalink
feat: recheck auth secrets on connection error (#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
abc3 authored Feb 15, 2024
1 parent 4a81b5e commit eb92f01
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 24 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dev:
FLY_ALLOC_ID=111e4567-e89b-12d3-a456-426614174000 \
SECRET_KEY_BASE="dev" \
CLUSTER_POSTGRES="true" \
DB_POOL_SIZE="5" \
ERL_AFLAGS="-kernel shell_history enabled" \
iex --name [email protected] --cookie cookie -S mix run --no-halt

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.33
1.1.35
2 changes: 1 addition & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ if config_env() != :test do

config :supavisor, Supavisor.Repo,
url: System.get_env("DATABASE_URL", "ecto://postgres:postgres@localhost:6432/postgres"),
pool_size: System.get_env("DB_POOL_SIZE", "5") |> String.to_integer(),
pool_size: System.get_env("DB_POOL_SIZE", "50") |> String.to_integer(),
parameters: [
application_name: "supavisor_meta"
]
Expand Down
21 changes: 16 additions & 5 deletions lib/supavisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,23 @@ defmodule Supavisor do
|> :erpc.multicall(Supavisor, :dirty_terminate, [tenant], 60_000)
end

@spec del_all_cache(String.t(), String.t()) :: map()
@spec del_all_cache(String.t(), String.t()) :: [map()]
def del_all_cache(tenant, user) do
%{
secrets: Cachex.del(Supavisor.Cache, {:secrets, tenant, user}),
metrics: Cachex.del(Supavisor.Cache, {:metrics, tenant})
}
Logger.info("Deleting all cache for tenant #{tenant} and user #{user}")
{:ok, keys} = Cachex.keys(Supavisor.Cache)

del = fn key, acc ->
result = Cachex.del(Supavisor.Cache, key)
[%{inspect(key) => inspect(result)} | acc]
end

Enum.reduce(keys, [], fn
{:metrics, ^tenant} = key, acc -> del.(key, acc)
{:secrets, ^tenant, ^user} = key, acc -> del.(key, acc)
{:user_cache, _, ^user, ^tenant, _} = key, acc -> del.(key, acc)
{:tenant_cache, ^tenant, _} = key, acc -> del.(key, acc)
_, acc -> acc
end)
end

@spec get_local_pool(id) :: map | pid | nil
Expand Down
1 change: 1 addition & 0 deletions lib/supavisor/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ defmodule Supavisor.Application do
topologies = Application.get_env(:libcluster, :topologies) || []

children = [
Supavisor.ErlSysMon,
PromEx,
{Registry, keys: :unique, name: Supavisor.Registry.Tenants},
{Registry, keys: :unique, name: Supavisor.Registry.ManagerTables},
Expand Down
61 changes: 47 additions & 14 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,22 @@ defmodule Supavisor.ClientHandler do
true ->
new_data = update_user_data(data, info, user, id, db_name, mode)

case auth_secrets(info, user) do
key = {:secrets, tenant_or_alias, user}

case auth_secrets(info, user, key, :timer.hours(24)) do
{:ok, auth_secrets} ->
Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")

{:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets}}}
{:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets, info}}}

{:error, reason} ->
Logger.error(
"ClientHandler: Authentication auth_secrets error: #{inspect(reason)}"
)

:ok = HH.send_error(sock, "XX000", "Authentication error")
:ok =
HH.send_error(sock, "XX000", "Authentication error, reason: #{inspect(reason)}")

Telem.client_join(:fail, id)
{:stop, {:shutdown, :auth_secrets_error}}
end
Expand All @@ -249,7 +253,7 @@ defmodule Supavisor.ClientHandler do

def handle_event(
:internal,
{:handle, {method, secrets}},
{:handle, {method, secrets}, info},
_,
%{sock: sock} = data
) do
Expand All @@ -268,6 +272,33 @@ defmodule Supavisor.ClientHandler do
Server.exchange_message(:final, "e=#{reason}")
end

key = {:secrets_check, data.tenant, data.user}

if method != :password and reason == "Wrong password" and
Cachex.get(Supavisor.Cache, key) == {:ok, nil} do
case auth_secrets(info, data.user, key, 15_000) do
{:ok, {method2, secrets2}} = value ->
if method != method2 || Map.delete(secrets.(), :client_key) != secrets2.() do
Logger.warning("ClientHandler: Update secrets and terminate pool")

Cachex.update(
Supavisor.Cache,
{:secrets, data.tenant, data.user},
{:cached, value}
)

Supavisor.stop(data.id)
else
Logger.debug("ClientHandler: Cache the same #{inspect(key)}")
end

other ->
Logger.error("ClientHandler: Auth secrets check error: #{inspect(other)}")
end
else
Logger.debug("ClientHandler: Cache hit for #{inspect(key)}")
end

HH.sock_send(sock, msg)
Telem.client_join(:fail, data.id)
{:stop, {:shutdown, :exchange_error}}
Expand Down Expand Up @@ -469,11 +500,14 @@ defmodule Supavisor.ClientHandler do
"ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
)

case state do
:idle ->
case {state, reason} do
{_, :shutdown} ->
{:stop, {:shutdown, :manager_shutdown}}

{:idle, _} ->
{:keep_state_and_data, {:next_event, :internal, :subscribe}}

:busy ->
{:busy, _} ->
{:stop, {:shutdown, :manager_down}}
end
end
Expand Down Expand Up @@ -730,26 +764,25 @@ defmodule Supavisor.ClientHandler do
}
end

@spec auth_secrets(map, String.t()) :: {:ok, S.secrets()} | {:error, term()}
@spec auth_secrets(map, String.t(), term(), non_neg_integer()) ::
{:ok, S.secrets()} | {:error, term()}
## password secrets
def auth_secrets(%{user: user, tenant: %{require_user: true}}, _) do
def auth_secrets(%{user: user, tenant: %{require_user: true}}, _, _, _) do
secrets = %{db_user: user.db_user, password: user.db_password, alias: user.db_user_alias}

{:ok, {:password, fn -> secrets end}}
end

## auth_query secrets
def auth_secrets(%{tenant: tenant} = info, db_user) do
cache_key = {:secrets, tenant.external_id, db_user}

def auth_secrets(info, db_user, key, ttl) do
fetch = fn _key ->
case get_secrets(info, db_user) do
{:ok, _} = resp -> {:commit, {:cached, resp}, ttl: 600_000}
{:ok, _} = resp -> {:commit, {:cached, resp}, ttl: ttl}
{:error, _} = resp -> {:ignore, resp}
end
end

case Cachex.fetch(Supavisor.Cache, cache_key, fetch) do
case Cachex.fetch(Supavisor.Cache, key, fetch) do
{:ok, {:cached, value}} -> value
{:commit, {:cached, value}, _opts} -> value
{:ignore, resp} -> resp
Expand Down
2 changes: 1 addition & 1 deletion lib/supavisor/db_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ defmodule Supavisor.DbHandler do

# the process received message from db without linked caller
def handle_event(:info, {proto, _, bin}, _, %{caller: nil}) when proto in @proto do
Logger.warning("DbHandler: Got db response #{inspect(bin)} when caller was nil")
Logger.debug("DbHandler: Got db response #{inspect(bin)} when caller was nil")
:keep_state_and_data
end

Expand Down
30 changes: 30 additions & 0 deletions lib/supavisor/erl_sys_mon.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Supavisor.ErlSysMon do
@moduledoc """
Logs Erlang System Monitor events.
"""

use GenServer

require Logger

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end

def init(_args) do
:erlang.system_monitor(self(), [
:busy_dist_port,
:busy_port,
{:long_gc, 250},
{:long_schedule, 100}
])

{:ok, []}
end

def handle_info(msg, state) do
Logger.warning("#{__MODULE__} message: " <> inspect(msg))

{:noreply, state}
end
end
5 changes: 3 additions & 2 deletions lib/supavisor/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ defmodule Supavisor.Tenants do
cache_key = {:tenant_cache, external_id, sni_hostname}

case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
{:commit, {:cached, get_tenant(external_id, sni_hostname)}, ttl: 5_000}
{:commit, {:cached, get_tenant(external_id, sni_hostname)}, ttl: :timer.hours(24)}
end) do
{_, {:cached, value}} -> value
{_, {:cached, value}, _} -> value
Expand All @@ -79,7 +79,8 @@ defmodule Supavisor.Tenants do
cache_key = {:user_cache, type, user, external_id, sni_hostname}

case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
{:commit, {:cached, get_user(type, user, external_id, sni_hostname)}, ttl: 5_000}
{:commit, {:cached, get_user(type, user, external_id, sni_hostname)},
ttl: :timer.hours(24)}
end) do
{_, {:cached, value}} -> value
{_, {:cached, value}, _} -> value
Expand Down

0 comments on commit eb92f01

Please sign in to comment.