Skip to content

Commit

Permalink
fix: sync sending after reaching the threshold (#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
abc3 authored Mar 4, 2024
1 parent 7281330 commit 8722b7f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.39
1.1.40
10 changes: 10 additions & 0 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ defmodule Supavisor.ClientHandler do
:gen_statem.cast(pid, {:client_cast, bin, status})
end

@spec client_call(pid, iodata(), atom()) :: :ok | {:error, term()}
def client_call(pid, bin, status),
do: :gen_statem.call(pid, {:client_call, bin, status}, 30_000)

@impl true
def init(_), do: :ignore

Expand Down Expand Up @@ -548,6 +552,12 @@ defmodule Supavisor.ClientHandler do
end
end

# emulate handle_call
def handle_event({:call, from}, {:client_call, bin, _}, _, data) do
Logger.debug("ClientHandler: --> --> bin call #{inspect(byte_size(bin))} bytes")
{:keep_state_and_data, {:reply, from, HH.sock_send(data.sock, bin)}}
end

def handle_event(type, content, state, data) do
msg = [
{"type", type},
Expand Down
27 changes: 18 additions & 9 deletions lib/supavisor/db_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Supavisor.DbHandler do
@reconnect_timeout 2_500
@sock_closed [:tcp_closed, :ssl_closed]
@proto [:tcp, :ssl]
@async_send_limit 1_000

def start_link(config) do
:gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
Expand Down Expand Up @@ -334,26 +335,27 @@ defmodule Supavisor.DbHandler do
Logger.debug("DbHandler: Got write replica message #{inspect(bin)}")
HH.setopts(data.sock, active: :once)
# check if the response ends with "ready for query"
ready =
if String.ends_with?(bin, Server.ready_for_query()) do
:ready_for_query
else
:continue
end
ready = check_ready(bin)
sent = data.sent || 0

send_via =
if ready == :ready_for_query || sent < @async_send_limit,
do: :client_cast,
else: :client_call

:ok = Client.client_cast(data.caller, bin, ready)
:ok = apply(Client, send_via, [data.caller, bin, ready])

case ready do
:ready_for_query ->
{_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)

HH.setopts(data.sock, active: true)

{:next_state, :idle, %{data | stats: stats, caller: handler_caller(data)},
{:next_state, :idle, %{data | stats: stats, caller: handler_caller(data), sent: false},
{:next_event, :internal, :check_anon_buffer}}

:continue ->
:keep_state_and_data
{:keep_state, %{data | sent: sent + 1}}
end
end

Expand Down Expand Up @@ -539,4 +541,11 @@ defmodule Supavisor.DbHandler do
@spec handler_caller(map()) :: pid() | nil
defp handler_caller(%{mode: :session} = data), do: data.caller
defp handler_caller(_), do: nil

@spec check_ready(binary()) :: :ready_for_query | :continue
def check_ready(bin) do
if String.ends_with?(bin, Server.ready_for_query()),
do: :ready_for_query,
else: :continue
end
end
6 changes: 4 additions & 2 deletions test/supavisor/db_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ defmodule Supavisor.DbHandlerTest do
caller: caller_pid,
sock: {:gen_tcp, nil},
stats: %{},
mode: :session
mode: :session,
sent: false
}

state = :some_state
Expand Down Expand Up @@ -213,7 +214,8 @@ defmodule Supavisor.DbHandlerTest do
caller: caller_pid,
sock: {:gen_tcp, nil},
stats: %{},
mode: :transaction
mode: :transaction,
sent: false
}

state = :some_state
Expand Down

0 comments on commit 8722b7f

Please sign in to comment.