From 0e2a7e5fa158a8ca6445362837e719fc38e2362a Mon Sep 17 00:00:00 2001 From: Stas Date: Wed, 25 Oct 2023 14:50:57 +0200 Subject: [PATCH 1/3] fix: prevent client <-> db locking --- VERSION | 2 +- lib/supavisor/client_handler.ex | 16 ++++++++-------- lib/supavisor/db_handler.ex | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/VERSION b/VERSION index ea3f0d7a..68a59457 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.22 +0.9.23 diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index 311df525..52b67ad3 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -26,8 +26,8 @@ defmodule Supavisor.ClientHandler do @impl true def callback_mode, do: [:handle_event_function] - def client_call(pid, bin, ready?) do - :gen_statem.call(pid, {:client_call, bin, ready?}, 5000) + def client_cast(pid, bin, ready?) do + :gen_statem.cast(pid, {:client_cast, bin, ready?}) end @impl true @@ -366,11 +366,11 @@ defmodule Supavisor.ClientHandler do end end - # emulate handle_call - def handle_event({:call, from}, {:client_call, bin, ready?}, _, data) do + # emulate handle_cast + def handle_event(:cast, {:client_cast, bin, ready?}, _, data) do Logger.debug("--> --> bin #{inspect(byte_size(bin))} bytes") - reply = {:reply, from, HH.sock_send(data.sock, bin)} + :ok = HH.sock_send(data.sock, bin) if ready? do Logger.debug("Client is ready") @@ -382,15 +382,15 @@ defmodule Supavisor.ClientHandler do actions = if data.idle_timeout > 0 do - [reply, idle_check(data.idle_timeout)] + idle_check(data.idle_timeout) else - reply + [] end {:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, actions} else Logger.debug("Client is not ready") - {:keep_state_and_data, reply} + :keep_state_and_data end end diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index efee8abe..b1c94145 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -21,7 +21,7 @@ defmodule Supavisor.DbHandler do @spec call(pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()} def call(pid, msg) do - :gen_statem.call(pid, {:db_call, msg}) + :gen_statem.call(pid, {:db_call, msg}, 15_000) end @impl true @@ -243,7 +243,7 @@ defmodule Supavisor.DbHandler do def handle_event(:info, {_proto, _, bin}, _, data) do # check if the response ends with "ready for query" ready = String.ends_with?(bin, Server.ready_for_query()) - :ok = Client.client_call(data.caller, bin, ready) + :ok = Client.client_cast(data.caller, bin, ready) if ready do {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats) From 8cec02209dac9b5b266797a1ceb154e488e08c51 Mon Sep 17 00:00:00 2001 From: Stas Date: Wed, 25 Oct 2023 16:01:03 +0200 Subject: [PATCH 2/3] pass a client pid --- lib/supavisor/client_handler.ex | 2 +- lib/supavisor/db_handler.ex | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index 52b67ad3..f2da2f26 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -303,7 +303,7 @@ defmodule Supavisor.ClientHandler do end def handle_event(_, {proto, _, bin}, :busy, data) when proto in [:tcp, :ssl] do - case Db.call(data.db_pid, bin) do + case Db.call(data.db_pid, bin, self()) do :ok -> Logger.debug("DB call success") :keep_state_and_data diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index b1c94145..20570365 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -19,9 +19,9 @@ defmodule Supavisor.DbHandler do :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000) end - @spec call(pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()} - def call(pid, msg) do - :gen_statem.call(pid, {:db_call, msg}, 15_000) + @spec call(pid(), binary(), pid()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()} + def call(pid, msg, caller) do + :gen_statem.call(pid, {:db_call, msg, caller}, 15_000) end @impl true @@ -240,32 +240,33 @@ defmodule Supavisor.DbHandler do {:keep_state, %{data | buffer: []}} end - def handle_event(:info, {_proto, _, bin}, _, data) do + def handle_event(:info, {_proto, _, bin}, _, %{caller: caller} = data) when is_pid(caller) do # check if the response ends with "ready for query" ready = String.ends_with?(bin, Server.ready_for_query()) - :ok = Client.client_cast(data.caller, bin, ready) + Logger.debug("Db ready #{inspect(ready)}") + :ok = Client.client_cast(caller, bin, ready) if ready do {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats) - {:keep_state, %{data | stats: stats}} + {:keep_state, %{data | stats: stats, caller: nil}} else :keep_state_and_data end end - def handle_event({:call, {pid, _} = from}, {:db_call, bin}, :idle, %{sock: sock} = data) do + def handle_event({:call, from}, {:db_call, bin, caller}, :idle, %{sock: sock} = data) do reply = {:reply, from, sock_send(sock, bin)} - {:keep_state, %{data | caller: pid}, reply} + {:keep_state, %{data | caller: caller}, reply} end - def handle_event({:call, {pid, _} = from}, {:db_call, bin}, state, %{buffer: buff} = data) do + def handle_event({:call, from}, {:db_call, bin, caller}, state, %{buffer: buff} = data) do Logger.debug( - "state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(pid)}" + "state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(caller)}" ) new_buff = [bin | buff] reply = {:reply, from, {:buffering, IO.iodata_length(new_buff)}} - {:keep_state, %{data | caller: pid, buffer: new_buff}, reply} + {:keep_state, %{data | caller: caller, buffer: new_buff}, reply} end def handle_event(:info, {:tcp_closed, sock}, state, %{sock: sock} = data) do From 0663fa06086e8449a9a94ab520913482e24ffe52 Mon Sep 17 00:00:00 2001 From: Stas Date: Wed, 25 Oct 2023 16:31:36 +0200 Subject: [PATCH 3/3] update db_handler test --- lib/supavisor/client_handler.ex | 2 +- lib/supavisor/db_handler.ex | 10 +++++----- test/supavisor/db_handler_test.exs | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index f2da2f26..50fa5209 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -303,7 +303,7 @@ defmodule Supavisor.ClientHandler do end def handle_event(_, {proto, _, bin}, :busy, data) when proto in [:tcp, :ssl] do - case Db.call(data.db_pid, bin, self()) do + case Db.call(data.db_pid, self(), bin) do :ok -> Logger.debug("DB call success") :keep_state_and_data diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index 20570365..1a932a4e 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -19,9 +19,9 @@ defmodule Supavisor.DbHandler do :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000) end - @spec call(pid(), binary(), pid()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()} - def call(pid, msg, caller) do - :gen_statem.call(pid, {:db_call, msg, caller}, 15_000) + @spec call(pid(), pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()} + def call(pid, caller, msg) do + :gen_statem.call(pid, {:db_call, caller, msg}, 15_000) end @impl true @@ -254,12 +254,12 @@ defmodule Supavisor.DbHandler do end end - def handle_event({:call, from}, {:db_call, bin, caller}, :idle, %{sock: sock} = data) do + def handle_event({:call, from}, {:db_call, caller, bin}, :idle, %{sock: sock} = data) do reply = {:reply, from, sock_send(sock, bin)} {:keep_state, %{data | caller: caller}, reply} end - def handle_event({:call, from}, {:db_call, bin, caller}, state, %{buffer: buff} = data) do + def handle_event({:call, from}, {:db_call, caller, bin}, state, %{buffer: buff} = data) do Logger.debug( "state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(caller)}" ) diff --git a/test/supavisor/db_handler_test.exs b/test/supavisor/db_handler_test.exs index a75a573f..4b2ed693 100644 --- a/test/supavisor/db_handler_test.exs +++ b/test/supavisor/db_handler_test.exs @@ -97,7 +97,7 @@ defmodule Supavisor.DbHandlerTest do data = %{sock: {:gen_tcp, sock}, caller: nil, buffer: []} from = {self(), :test_ref} event = {:call, from} - payload = {:db_call, "test_data"} + payload = {:db_call, self(), "test_data"} {:keep_state, new_data, reply} = Db.handle_event(event, payload, :idle, data) @@ -107,10 +107,10 @@ defmodule Supavisor.DbHandlerTest do end test "handle_event/4 with non-idle state" do - data = %{sock: nil, caller: nil, buffer: []} + data = %{sock: nil, caller: self(), buffer: []} from = {self(), :test_ref} event = {:call, from} - payload = {:db_call, "test_data"} + payload = {:db_call, self(), "test_data"} state = :non_idle {:keep_state, new_data, reply} = Db.handle_event(event, payload, state, data)