Skip to content

Commit

Permalink
feat: limit client connections (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
abc3 authored Aug 2, 2023
1 parent 32d72ed commit 2f36696
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 18 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.22
0.2.23
13 changes: 8 additions & 5 deletions lib/supavisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,23 @@ defmodule Supavisor do
default_parameter_status: ps,
ip_version: ip_ver,
default_pool_size: def_pool_size,
default_max_clients: def_max_clients,
users: [
%{
db_user: db_user,
db_password: db_pass,
pool_size: pool_size,
mode_type: mode_type
mode_type: mode_type,
max_clients: max_clients
}
]
} = tenant_record

{id, mode, pool_size} =
{id, mode, pool_size, max_clients} =
if method == :auth_query do
{{tenant, secrets.().user}, def_mode_type, def_pool_size}
{{tenant, secrets.().user}, def_mode_type, def_pool_size, def_max_clients}
else
{{tenant, user_alias}, mode_type, pool_size}
{{tenant, user_alias}, mode_type, pool_size, max_clients}
end

auth = %{
Expand All @@ -152,7 +154,8 @@ defmodule Supavisor do
auth: auth,
pool_size: pool_size,
mode: mode,
default_parameter_status: ps
default_parameter_status: ps,
max_clients: max_clients
}

DynamicSupervisor.start_child(
Expand Down
5 changes: 5 additions & 0 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ defmodule Supavisor.ClientHandler do
{:keep_state, data, {:next_event, :internal, {:greetings, ps}}}
end
else
{:error, :max_clients_reached} ->
Logger.error("Max client connections reached")
:ok = send_error(data.sock, "XX000", "Max client connections reached")
{:stop, :normal, data}

error ->
Logger.error("Subscribe error: #{inspect(error)}")
{:keep_state_and_data, {:timeout, 1000, :subscribe}}
Expand Down
28 changes: 19 additions & 9 deletions lib/supavisor/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Supavisor.Manager do
GenServer.start_link(__MODULE__, args, name: name)
end

@spec subscribe(pid, pid) :: {:ok, iodata() | []}
@spec subscribe(pid, pid) :: {:ok, iodata() | []} | {:error, :max_clients_reached}
def subscribe(manager, pid) do
GenServer.call(manager, {:subscribe, pid})
end
Expand Down Expand Up @@ -42,7 +42,8 @@ defmodule Supavisor.Manager do
user_alias: args.user_alias,
parameter_status: [],
wait_ps: [],
default_parameter_status: args.default_parameter_status
default_parameter_status: args.default_parameter_status,
max_clients: args.max_clients
}

Logger.metadata(project: args.tenant, user: args.user_alias)
Expand All @@ -54,15 +55,24 @@ defmodule Supavisor.Manager do
@impl true
def handle_call({:subscribe, pid}, _, %{tenant: tenant, user_alias: user_alias} = state) do
Logger.info("Subscribing #{inspect(pid)} to tenant #{inspect({tenant, user_alias})}")
:ets.insert(state.tid, {Process.monitor(pid), pid, now()})

case state.parameter_status do
[] ->
{:reply, {:ok, []}, update_in(state.wait_ps, &[pid | &1])}
# don't limit if max_clients is null
{reply, new_state} =
if :ets.info(state.tid, :size) < state.max_clients do
:ets.insert(state.tid, {Process.monitor(pid), pid, now()})

ps ->
{:reply, {:ok, ps}, state}
end
case state.parameter_status do
[] ->
{{:ok, []}, update_in(state.wait_ps, &[pid | &1])}

ps ->
{{:ok, ps}, state}
end
else
{{:error, :max_clients_reached}, state}
end

{:reply, reply, new_state}
end

def handle_call({:set_parameter_status, ps}, _, %{parameter_status: []} = state) do
Expand Down
4 changes: 3 additions & 1 deletion lib/supavisor/tenants/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Supavisor.Tenants.Tenant do
field(:auth_query, :string)
field(:default_pool_size, :integer, default: 15)
field(:sni_hostname, :string)
field(:default_max_clients, :integer, default: 1000)

has_many(:users, User,
foreign_key: :tenant_external_id,
Expand Down Expand Up @@ -53,7 +54,8 @@ defmodule Supavisor.Tenants.Tenant do
:require_user,
:auth_query,
:default_pool_size,
:sni_hostname
:sni_hostname,
:default_max_clients
])
|> check_constraint(:upstream_ssl, name: :upstream_constraints, prefix: "_supavisor")
|> check_constraint(:upstream_verify, name: :upstream_constraints, prefix: "_supavisor")
Expand Down
4 changes: 3 additions & 1 deletion lib/supavisor/tenants/user.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Supavisor.Tenants.User do
field(:mode_type, Ecto.Enum, values: [:transaction, :session])
field(:pool_size, :integer)
field(:pool_checkout_timeout, :integer, default: 60_000)
field(:max_clients, :integer)
belongs_to(:tenant, Supavisor.Tenants.Tenant, foreign_key: :tenant_external_id, type: :string)
timestamps()
end
Expand All @@ -37,7 +38,8 @@ defmodule Supavisor.Tenants.User do
:pool_size,
:mode_type,
:is_manager,
:pool_checkout_timeout
:pool_checkout_timeout,
:max_clients
])
|> validate_required([
:db_user_alias,
Expand Down
1 change: 1 addition & 0 deletions lib/supavisor_web/views/tenant_view.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule SupavisorWeb.TenantView do
require_user: tenant.require_user,
auth_query: tenant.auth_query,
sni_hostname: tenant.sni_hostname,
default_max_clients: tenant.default_max_clients,
users: render_many(tenant.users, UserView, "user.json")
}
end
Expand Down
3 changes: 2 additions & 1 deletion lib/supavisor_web/views/user_view.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule SupavisorWeb.UserView do
pool_size: user.pool_size,
is_manager: user.is_manager,
mode_type: user.mode_type,
pool_checkout_timeout: user.pool_checkout_timeout
pool_checkout_timeout: user.pool_checkout_timeout,
max_clients: user.max_clients
}
end
end
9 changes: 9 additions & 0 deletions priv/repo/migrations/20230801090256_add_user_max_clients.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Supavisor.Repo.Migrations.AddUserMaxClients do
use Ecto.Migration

def change do
alter table("users", prefix: "_supavisor") do
add(:max_clients, :integer, null: true)
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Supavisor.Repo.Migrations.AddTenantDefMaxClients do
use Ecto.Migration

def change do
alter table("tenants", prefix: "_supavisor") do
add(:default_max_clients, :integer, null: false, default: 1000)
end
end
end
9 changes: 9 additions & 0 deletions priv/repo/seeds_after_migration.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ end
"pool_size" => 1,
"mode_type" => "session",
"pool_checkout_timeout" => 500
},
%{
"db_user_alias" => "max_clients",
"db_user" => db_conf[:username],
"db_password" => db_conf[:password],
"pool_size" => 2,
"max_clients" => -1,
"mode_type" => "transaction",
"pool_checkout_timeout" => 500
}
]
}
Expand Down
10 changes: 10 additions & 0 deletions test/integration/proxy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,14 @@ defmodule Supavisor.Integration.ProxyTest do
assert {:idle, nil} = {state, db_pid}
Process.exit(psql_pid, :kill)
end

test "limit client connections" do
db_conf = Application.get_env(:supavisor, Repo)

url =
"postgresql://max_clients.proxy_tenant:#{db_conf[:password]}@#{db_conf[:hostname]}:#{Application.get_env(:supavisor, :proxy_port_transaction)}/postgres?sslmode=disable"

{result, _} = System.cmd("psql", [url], stderr_to_stdout: true)
assert result =~ "FATAL: Max client connections reached"
end
end

0 comments on commit 2f36696

Please sign in to comment.