Skip to content

Commit

Permalink
feat: add support for read replicas (#162)
Browse files Browse the repository at this point in the history
* call pg_query.statement_types() via rustler

* handle READ ONLY SQL TRANSACTION error

* clusters crud

* update migrations and pool starting

* safe statement_types

* configs validation

* feat: handling of named prepared statements (#207)

* fix caller unlinking
  • Loading branch information
abc3 authored Dec 8, 2023
1 parent 216bb3d commit 02bf93e
Show file tree
Hide file tree
Showing 53 changed files with 2,386 additions and 220 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Release on Dockerhub
on:
push:
branches:
- main
- main_v0.9
paths:
- ".github/workflows/publish_docker.yml"
- "VERSION"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Publish upgrade artifacts to staging
on:
push:
branches:
- main
- main_v0.9
env:
INCLUDE_ERTS: false
MIX_ENV: prod
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ npm-debug.log
burrito_out/*

supavisor-*.tar.gz

priv/native/*
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.32
1.0.0
7 changes: 7 additions & 0 deletions bench/pg_parser.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
alias Supavisor.PgParser, as: Parser

Benchee.run(%{
"statement_types/1" => fn ->
Parser.statement_types("insert into table1 values ('a', 'b')")
end
})
9 changes: 9 additions & 0 deletions bench/protocol.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
alias Supavisor.Protocol.Client

bin_select_1 = <<81, 0, 0, 0, 14, 115, 101, 108, 101, 99, 116, 32, 49, 59, 0>>

Benchee.run(%{
"Client.decode_pkt/1" => fn ->
Client.decode_pkt(bin_select_1)
end
})
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ config :supavisor, SupavisorWeb.Endpoint,
# Configures Elixir's Logger
config :logger, :console,
format: "$time $metadata[$level] $message\n",
metadata: [:request_id, :project, :user, :region, :instance_id, :mode]
metadata: [:request_id, :project, :user, :region, :instance_id, :mode, :type]

# Use Jason for JSON parsing in Phoenix
config :phoenix, :json_library, Jason
Expand Down
2 changes: 1 addition & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ config :logger, :console,
format: "$time [$level] $message $metadata\n",
level: :debug,
# level: :error,
metadata: [:error_code, :file, :line, :pid, :project, :user, :mode]
metadata: [:error_code, :file, :line, :pid, :project, :user, :mode, :type]

# Set a higher stacktrace during development. Avoid configuring such
# in production as building large stacktraces may be expensive.
Expand Down
172 changes: 103 additions & 69 deletions lib/supavisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ defmodule Supavisor do
@moduledoc false
require Logger
alias Supavisor.Helpers, as: H
alias Supavisor.{Tenants, Tenants.Tenant, Manager}
alias Supavisor.Tenants, as: T
alias Supavisor.Manager

@type sock :: tcp_sock() | ssl_sock()
@type ssl_sock :: {:ssl, :ssl.sslsocket()}
@type tcp_sock :: {:gen_tcp, :gen_tcp.socket()}
@type workers :: %{manager: pid, pool: pid}
@type secrets :: {:password | :auth_query, fun()}
@type mode :: :transaction | :session | :native
@type id :: {String.t(), String.t(), mode}
@type id :: {{:single | :cluster, String.t()}, String.t(), mode}
@type subscribe_opts :: %{workers: workers, ps: list, idle_timeout: integer}

@registry Supavisor.Registry.Tenants
Expand Down Expand Up @@ -125,11 +126,22 @@ defmodule Supavisor do
}
end

@spec get_local_pool(id) :: pid | nil
@spec get_local_pool(id) :: map | pid | nil
def get_local_pool(id) do
case Registry.lookup(@registry, {:pool, id}) do
[{pid, _}] -> pid
_ -> nil
match = {{:pool, :_, :_, id}, :"$2", :"$3"}
body = [{{:"$2", :"$3"}}]

case Registry.select(@registry, [{match, [], body}]) do
[{pool, _}] ->
pool

[_ | _] = pools ->
# transform [{pid1, :read}, {pid2, :read}, {pid3, :write}]
# to %{read: [pid1, pid2], write: [pid3]}
Enum.group_by(pools, &elem(&1, 1), &elem(&1, 0))

_ ->
nil
end
end

Expand All @@ -141,7 +153,7 @@ defmodule Supavisor do
end
end

@spec id(String.t(), String.t(), mode, mode) :: id
@spec id({:single | :cluster, String.t()}, String.t(), mode, mode) :: id
def id(tenant, user, port_mode, user_mode) do
# temporary hack
mode =
Expand All @@ -157,84 +169,106 @@ defmodule Supavisor do
## Internal functions

@spec start_local_pool(id, secrets, String.t() | nil) :: {:ok, pid} | {:error, any}
defp start_local_pool({tenant, user, mode} = id, {method, secrets}, db_name) do
Logger.debug("Starting pool for #{inspect(id)}")
defp start_local_pool({{type, tenant}, _user, _mode} = id, secrets, db_name) do
Logger.debug("Starting pool(s) for #{inspect(id)}")

case Tenants.get_pool_config(tenant, secrets.().alias) do
%Tenant{} = tenant_record ->
%{
db_host: db_host,
db_port: db_port,
db_database: db_database,
default_parameter_status: ps,
ip_version: ip_ver,
default_pool_size: def_pool_size,
default_max_clients: def_max_clients,
client_idle_timeout: client_idle_timeout,
default_pool_strategy: default_pool_strategy,
users: [
%{
db_user: db_user,
db_password: db_pass,
pool_size: pool_size,
# mode_type: mode_type,
max_clients: max_clients
}
]
} = tenant_record

{pool_size, max_clients} =
if method == :auth_query do
{def_pool_size, def_max_clients}
else
{pool_size, max_clients}
end

auth = %{
host: String.to_charlist(db_host),
port: db_port,
user: db_user,
database: if(db_name != nil, do: db_name, else: db_database),
password: fn -> db_pass end,
application_name: "Supavisor",
ip_version: H.ip_version(ip_ver, db_host),
upstream_ssl: tenant_record.upstream_ssl,
upstream_verify: tenant_record.upstream_verify,
upstream_tls_ca: H.upstream_cert(tenant_record.upstream_tls_ca),
require_user: tenant_record.require_user,
method: method,
secrets: secrets
}
user = elem(secrets, 1).().alias

args = %{
id: id,
tenant: tenant,
user: user,
auth: auth,
pool_size: pool_size,
mode: mode,
default_parameter_status: ps,
max_clients: max_clients,
client_idle_timeout: client_idle_timeout,
default_pool_strategy: default_pool_strategy
}
case type do
:single -> T.get_pool_config(tenant, user)
:cluster -> T.get_cluster_config(tenant, user)
end
|> case do
[_ | _] = replicas ->
opts =
Enum.map(replicas, fn replica ->
case replica do
%T.ClusterTenants{tenant: tenant, type: type} ->
Map.put(tenant, :replica_type, type)

%T.Tenant{} = tenant ->
Map.put(tenant, :replica_type, :write)
end
|> supervisor_args(id, secrets, db_name)
end)

DynamicSupervisor.start_child(
{:via, PartitionSupervisor, {Supavisor.DynamicSupervisor, id}},
{Supavisor.TenantSupervisor, args}
{Supavisor.TenantSupervisor, %{id: id, replicas: opts}}
)
|> case do
{:error, {:already_started, pid}} -> {:ok, pid}
resp -> resp
end

_ ->
Logger.error("Can't find tenant with external_id #{inspect(id)}")
error ->
Logger.error("Can't find tenant with external_id #{inspect(id)} #{inspect(error)}")

{:error, :tenant_not_found}
end
end

defp supervisor_args(tenant_record, {tenant, user, mode} = id, {method, secrets}, db_name) do
%{
db_host: db_host,
db_port: db_port,
db_database: db_database,
default_parameter_status: ps,
ip_version: ip_ver,
default_pool_size: def_pool_size,
default_max_clients: def_max_clients,
client_idle_timeout: client_idle_timeout,
replica_type: replica_type,
default_pool_strategy: default_pool_strategy,
users: [
%{
db_user: db_user,
db_password: db_pass,
pool_size: pool_size,
# mode_type: mode_type,
max_clients: max_clients
}
]
} = tenant_record

{pool_size, max_clients} =
if method == :auth_query do
{def_pool_size, def_max_clients}
else
{pool_size, max_clients}
end

auth = %{
host: String.to_charlist(db_host),
port: db_port,
user: db_user,
database: if(db_name != nil, do: db_name, else: db_database),
password: fn -> db_pass end,
application_name: "Supavisor",
ip_version: H.ip_version(ip_ver, db_host),
upstream_ssl: tenant_record.upstream_ssl,
upstream_verify: tenant_record.upstream_verify,
upstream_tls_ca: H.upstream_cert(tenant_record.upstream_tls_ca),
require_user: tenant_record.require_user,
method: method,
secrets: secrets
}

%{
id: id,
tenant: tenant,
replica_type: replica_type,
user: user,
auth: auth,
pool_size: pool_size,
mode: mode,
default_parameter_status: ps,
max_clients: max_clients,
client_idle_timeout: client_idle_timeout,
default_pool_strategy: default_pool_strategy
}
end

@spec set_parameter_status(id, [{binary, binary}]) ::
:ok | {:error, :not_found}
def set_parameter_status(id, ps) do
Expand Down
Loading

0 comments on commit 02bf93e

Please sign in to comment.