Skip to content

Commit

Permalink
Merge pull request #307 from supabase/main
Browse files Browse the repository at this point in the history
Prod deploy
  • Loading branch information
abc3 authored Feb 18, 2024
2 parents fbb8636 + d155590 commit 007707c
Show file tree
Hide file tree
Showing 15 changed files with 119 additions and 40 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dev:
FLY_ALLOC_ID=111e4567-e89b-12d3-a456-426614174000 \
SECRET_KEY_BASE="dev" \
CLUSTER_POSTGRES="true" \
DB_POOL_SIZE="5" \
ERL_AFLAGS="-kernel shell_history enabled" \
iex --name [email protected] --cookie cookie -S mix run --no-halt

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.33
1.1.38
2 changes: 1 addition & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ if config_env() != :test do

config :supavisor, Supavisor.Repo,
url: System.get_env("DATABASE_URL", "ecto://postgres:postgres@localhost:6432/postgres"),
pool_size: System.get_env("DB_POOL_SIZE", "5") |> String.to_integer(),
pool_size: System.get_env("DB_POOL_SIZE", "25") |> String.to_integer(),
parameters: [
application_name: "supavisor_meta"
]
Expand Down
2 changes: 0 additions & 2 deletions docs/configuration/tenants.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,4 @@ All `tenant` fields and their types are defined in the `Supavisor.Tenants.Tenant

`client_idle_timeout` - the maximum duration of an idle client connection

`default_pool_strategy` - the default strategy when pulling available connections from the pool queue

`allow_list` - a list of CIDR ranges which are allowed to connect
23 changes: 16 additions & 7 deletions lib/supavisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,23 @@ defmodule Supavisor do
|> :erpc.multicall(Supavisor, :dirty_terminate, [tenant], 60_000)
end

@spec del_all_cache(String.t(), String.t()) :: map()
@spec del_all_cache(String.t(), String.t()) :: [map()]
def del_all_cache(tenant, user) do
%{
secrets: Cachex.del(Supavisor.Cache, {:secrets, tenant, user}),
metrics: Cachex.del(Supavisor.Cache, {:metrics, tenant})
}
Logger.info("Deleting all cache for tenant #{tenant} and user #{user}")
{:ok, keys} = Cachex.keys(Supavisor.Cache)

del = fn key, acc ->
result = Cachex.del(Supavisor.Cache, key)
[%{inspect(key) => inspect(result)} | acc]
end

Enum.reduce(keys, [], fn
{:metrics, ^tenant} = key, acc -> del.(key, acc)
{:secrets, ^tenant, ^user} = key, acc -> del.(key, acc)
{:user_cache, _, ^user, ^tenant, _} = key, acc -> del.(key, acc)
{:tenant_cache, ^tenant, _} = key, acc -> del.(key, acc)
_, acc -> acc
end)
end

@spec get_local_pool(id) :: map | pid | nil
Expand Down Expand Up @@ -261,7 +272,6 @@ defmodule Supavisor do
default_max_clients: def_max_clients,
client_idle_timeout: client_idle_timeout,
replica_type: replica_type,
default_pool_strategy: default_pool_strategy,
users: [
%{
db_user: db_user,
Expand Down Expand Up @@ -307,7 +317,6 @@ defmodule Supavisor do
default_parameter_status: ps,
max_clients: max_clients,
client_idle_timeout: client_idle_timeout,
default_pool_strategy: default_pool_strategy,
log_level: log_level
}
end
Expand Down
1 change: 1 addition & 0 deletions lib/supavisor/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ defmodule Supavisor.Application do
topologies = Application.get_env(:libcluster, :topologies) || []

children = [
Supavisor.ErlSysMon,
PromEx,
{Registry, keys: :unique, name: Supavisor.Registry.Tenants},
{Registry, keys: :unique, name: Supavisor.Registry.ManagerTables},
Expand Down
63 changes: 49 additions & 14 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,22 @@ defmodule Supavisor.ClientHandler do
true ->
new_data = update_user_data(data, info, user, id, db_name, mode)

case auth_secrets(info, user) do
key = {:secrets, tenant_or_alias, user}

case auth_secrets(info, user, key, :timer.hours(24)) do
{:ok, auth_secrets} ->
Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")

{:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets}}}
{:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets, info}}}

{:error, reason} ->
Logger.error(
"ClientHandler: Authentication auth_secrets error: #{inspect(reason)}"
)

:ok = HH.send_error(sock, "XX000", "Authentication error")
:ok =
HH.send_error(sock, "XX000", "Authentication error, reason: #{inspect(reason)}")

Telem.client_join(:fail, id)
{:stop, {:shutdown, :auth_secrets_error}}
end
Expand All @@ -249,7 +253,7 @@ defmodule Supavisor.ClientHandler do

def handle_event(
:internal,
{:handle, {method, secrets}},
{:handle, {method, secrets}, info},
_,
%{sock: sock} = data
) do
Expand All @@ -268,6 +272,33 @@ defmodule Supavisor.ClientHandler do
Server.exchange_message(:final, "e=#{reason}")
end

key = {:secrets_check, data.tenant, data.user}

if method != :password and reason == "Wrong password" and
Cachex.get(Supavisor.Cache, key) == {:ok, nil} do
case auth_secrets(info, data.user, key, 15_000) do
{:ok, {method2, secrets2}} = value ->
if method != method2 || Map.delete(secrets.(), :client_key) != secrets2.() do
Logger.warning("ClientHandler: Update secrets and terminate pool")

Cachex.update(
Supavisor.Cache,
{:secrets, data.tenant, data.user},
{:cached, value}
)

Supavisor.stop(data.id)
else
Logger.debug("ClientHandler: Cache the same #{inspect(key)}")
end

other ->
Logger.error("ClientHandler: Auth secrets check error: #{inspect(other)}")
end
else
Logger.debug("ClientHandler: Cache hit for #{inspect(key)}")
end

HH.sock_send(sock, msg)
Telem.client_join(:fail, data.id)
{:stop, {:shutdown, :exchange_error}}
Expand Down Expand Up @@ -469,11 +500,14 @@ defmodule Supavisor.ClientHandler do
"ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
)

case state do
:idle ->
case {state, reason} do
{_, :shutdown} ->
{:stop, {:shutdown, :manager_shutdown}}

{:idle, _} ->
{:keep_state_and_data, {:next_event, :internal, :subscribe}}

:busy ->
{:busy, _} ->
{:stop, {:shutdown, :manager_down}}
end
end
Expand Down Expand Up @@ -730,26 +764,25 @@ defmodule Supavisor.ClientHandler do
}
end

@spec auth_secrets(map, String.t()) :: {:ok, S.secrets()} | {:error, term()}
@spec auth_secrets(map, String.t(), term(), non_neg_integer()) ::
{:ok, S.secrets()} | {:error, term()}
## password secrets
def auth_secrets(%{user: user, tenant: %{require_user: true}}, _) do
def auth_secrets(%{user: user, tenant: %{require_user: true}}, _, _, _) do
secrets = %{db_user: user.db_user, password: user.db_password, alias: user.db_user_alias}

{:ok, {:password, fn -> secrets end}}
end

## auth_query secrets
def auth_secrets(%{tenant: tenant} = info, db_user) do
cache_key = {:secrets, tenant.external_id, db_user}

def auth_secrets(info, db_user, key, ttl) do
fetch = fn _key ->
case get_secrets(info, db_user) do
{:ok, _} = resp -> {:commit, {:cached, resp}, ttl: 600_000}
{:ok, _} = resp -> {:commit, {:cached, resp}, ttl: ttl}
{:error, _} = resp -> {:ignore, resp}
end
end

case Cachex.fetch(Supavisor.Cache, cache_key, fetch) do
case Cachex.fetch(Supavisor.Cache, key, fetch) do
{:ok, {:cached, value}} -> value
{:commit, {:cached, value}, _opts} -> value
{:ignore, resp} -> resp
Expand Down Expand Up @@ -780,6 +813,8 @@ defmodule Supavisor.ClientHandler do
socket_options: [
H.ip_version(tenant.ip_version, tenant.db_host)
],
queue_target: 1_000,
queue_interval: 5_000,
ssl_opts: ssl_opts || []
)

Expand Down
2 changes: 1 addition & 1 deletion lib/supavisor/db_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ defmodule Supavisor.DbHandler do

# the process received message from db without linked caller
def handle_event(:info, {proto, _, bin}, _, %{caller: nil}) when proto in @proto do
Logger.warning("DbHandler: Got db response #{inspect(bin)} when caller was nil")
Logger.debug("DbHandler: Got db response #{inspect(bin)} when caller was nil")
:keep_state_and_data
end

Expand Down
30 changes: 30 additions & 0 deletions lib/supavisor/erl_sys_mon.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Supavisor.ErlSysMon do
@moduledoc """
Logs Erlang System Monitor events.
"""

use GenServer

require Logger

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end

def init(_args) do
:erlang.system_monitor(self(), [
:busy_dist_port,
:busy_port,
{:long_gc, 250},
{:long_schedule, 100}
])

{:ok, []}
end

def handle_info(msg, state) do
Logger.warning("#{__MODULE__} message: " <> inspect(msg))

{:noreply, state}
end
end
2 changes: 2 additions & 0 deletions lib/supavisor/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ defmodule Supavisor.Helpers do
socket_options: [
ip_version(params["ip_version"], params["db_host"])
],
queue_target: 1_000,
queue_interval: 5_000,
ssl_opts: ssl_opts || []
)

Expand Down
19 changes: 11 additions & 8 deletions lib/supavisor/tenant_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,24 @@ defmodule Supavisor.TenantSupervisor do

@spec pool_spec(tuple, map) :: Keyword.t()
defp pool_spec(id, args) do
{size, overflow} =
case args.mode do
:session ->
{1, args.pool_size}
# {size, overflow} =
# case args.mode do
# :session ->
# {1, args.pool_size}

:transaction ->
if args.pool_size < 10, do: {args.pool_size, 0}, else: {10, args.pool_size - 10}
end
# :transaction ->
# if args.pool_size < 10, do: {args.pool_size, 0}, else: {10, args.pool_size - 10}
# end

{size, overflow} = {1, args.pool_size}

[
name: {:via, Registry, {Supavisor.Registry.Tenants, id, args.replica_type}},
worker_module: Supavisor.DbHandler,
size: size,
max_overflow: overflow,
strategy: :fifo
strategy: :lifo,
idle_timeout: :timer.minutes(5)
]
end
end
5 changes: 3 additions & 2 deletions lib/supavisor/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ defmodule Supavisor.Tenants do
cache_key = {:tenant_cache, external_id, sni_hostname}

case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
{:commit, {:cached, get_tenant(external_id, sni_hostname)}, ttl: 5_000}
{:commit, {:cached, get_tenant(external_id, sni_hostname)}, ttl: :timer.hours(24)}
end) do
{_, {:cached, value}} -> value
{_, {:cached, value}, _} -> value
Expand All @@ -79,7 +79,8 @@ defmodule Supavisor.Tenants do
cache_key = {:user_cache, type, user, external_id, sni_hostname}

case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
{:commit, {:cached, get_user(type, user, external_id, sni_hostname)}, ttl: 5_000}
{:commit, {:cached, get_user(type, user, external_id, sni_hostname)},
ttl: :timer.hours(24)}
end) do
{_, {:cached, value}} -> value
{_, {:cached, value}, _} -> value
Expand Down
2 changes: 0 additions & 2 deletions lib/supavisor/tenants/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ defmodule Supavisor.Tenants.Tenant do
field(:default_max_clients, :integer, default: 1000)
field(:client_idle_timeout, :integer, default: 0)
field(:client_heartbeat_interval, :integer, default: 60)
field(:default_pool_strategy, Ecto.Enum, values: [:fifo, :lifo], default: :fifo)
field(:allow_list, {:array, :string}, default: ["0.0.0.0/0", "::/0"])

has_many(:users, User,
Expand Down Expand Up @@ -64,7 +63,6 @@ defmodule Supavisor.Tenants.Tenant do
:default_max_clients,
:client_idle_timeout,
:client_heartbeat_interval,
:default_pool_strategy,
:allow_list
])
|> check_constraint(:upstream_ssl, name: :upstream_constraints, prefix: "_supavisor")
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ defmodule Supavisor.MixProject do
{:observer_cli, "~> 1.7"},

# pooller
{:poolboy, "~> 1.5.2"},
# {:poolboy, "~> 1.5.2"},
{:poolboy, git: "https://github.com/abc3/poolboy.git", tag: "v0.0.2"},
{:syn, "~> 3.3"},
{:pgo, "~> 0.13"},
{:rustler, "~> 0.29.1"}
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"plug": {:hex, :plug, "1.15.1", "b7efd81c1a1286f13efb3f769de343236bd8b7d23b4a9f40d3002fc39ad8f74c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "459497bd94d041d98d948054ec6c0b76feacd28eec38b219ca04c0de13c79d30"},
"plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"},
"plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"poolboy": {:git, "https://github.com/abc3/poolboy.git", "999ec7f5c7282d515020bb058b4832029d6d07bc", [tag: "v0.0.2"]},
"postgrex": {:hex, :postgrex, "0.17.3", "c92cda8de2033a7585dae8c61b1d420a1a1322421df84da9a82a6764580c503d", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "946cf46935a4fdca7a81448be76ba3503cff082df42c6ec1ff16a4bdfbfb098d"},
"prom_ex": {:hex, :prom_ex, "1.8.0", "662615e1d2f2ab3e0dc13a51c92ad0ccfcab24336a90cb9b114ee1bce9ef88aa", [:mix], [{:absinthe, ">= 1.6.0", [hex: :absinthe, repo: "hexpm", optional: true]}, {:broadway, ">= 1.0.2", [hex: :broadway, repo: "hexpm", optional: true]}, {:ecto, ">= 3.5.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:finch, "~> 0.15", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, ">= 2.4.0", [hex: :oban, repo: "hexpm", optional: true]}, {:octo_fetch, "~> 0.3", [hex: :octo_fetch, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, ">= 0.14.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, ">= 1.12.1", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.5", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry, ">= 1.0.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "3eea763dfa941e25de50decbf17a6a94dbd2270e7b32f88279aa6e9bbb8e23e7"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
Expand Down

0 comments on commit 007707c

Please sign in to comment.