diff --git a/lib/exq/redis/connection.ex b/lib/exq/redis/connection.ex index 5831c458..cfbd86ec 100644 --- a/lib/exq/redis/connection.ex +++ b/lib/exq/redis/connection.ex @@ -155,14 +155,68 @@ defmodule Exq.Redis.Connection do end def q(redis, command) do - Redix.command(redis, command, timeout: Config.get(:redis_timeout)) + redis + |> Redix.command(command, timeout: Config.get(:redis_timeout)) + |> handle_response(redis) end def qp(redis, command) do - Redix.pipeline(redis, command, timeout: Config.get(:redis_timeout)) + redis + |> Redix.pipeline(command, timeout: Config.get(:redis_timeout)) + |> handle_responses(redis) end def qp!(redis, command) do - Redix.pipeline!(redis, command, timeout: Config.get(:redis_timeout)) + redis + |> Redix.pipeline!(command, timeout: Config.get(:redis_timeout)) + |> handle_responses(redis) + end + + defp handle_response({:error, %{message: "READONLY" <> _rest}} = error, redis) do + disconnect(redis) + error + end + + defp handle_response({:error, message} = error, _) do + Logger.error(inspect(message)) + error + end + + defp handle_response(response, _) do + response + end + + defp handle_responses({:ok, responses} = result, redis) do + # Disconnect once for multiple readonly redis node errors. + if Enum.any?(responses, &readonly_error?/1) do + disconnect(redis) + end + result + end + + defp handle_responses(responses, redis) when is_list(responses) do + # Disconnect once for multiple readonly redis node errors. + if Enum.any?(responses, &readonly_error?/1) do + disconnect(redis) + end + responses + end + + defp handle_responses(responses, _) do + responses + end + + defp readonly_error?(%{message: "READONLY" <> _rest}), do: true + defp readonly_error?(_), do: false + + defp disconnect(redis) do + pid = Process.whereis(redis) + if !is_nil(pid) && Process.alive?(pid) do + # Let the supervisor restart the process with a new connection. + Logger.error("Redis failover - forcing a reconnect") + Process.exit(pid, :kill) + # Give the process some time to be restarted. + :timer.sleep(100) + end end end diff --git a/test/readonly_reconnect_test.exs b/test/readonly_reconnect_test.exs new file mode 100644 index 00000000..98030279 --- /dev/null +++ b/test/readonly_reconnect_test.exs @@ -0,0 +1,40 @@ +defmodule ReadonlyReconnectTest do + use ExUnit.Case + import ExqTestUtil + + setup do + Process.flag(:trap_exit, true) + {:ok, redis} = Redix.start_link(host: "127.0.0.1", port: 6556) + Process.register(redis, :testredis) + + on_exit(fn -> + wait() + TestRedis.teardown() + end) + + {:ok, redis: redis} + end + + test "disconnect on read-only errors with single command", %{redis: redis} do + Exq.Redis.Connection.q(:testredis, ["SET", "key", "value"]) + assert_received({:EXIT, pid, :killed}) + assert redis == pid + end + + test "disconnect on read-only errors with command pipeline", %{redis: redis} do + Exq.Redis.Connection.qp(:testredis, [["GET", "key"], ["SET", "key", "value"]]) + assert_received({:EXIT, pid, :killed}) + assert redis == pid + end + + test "disconnect on read-only errors with command pipeline returning values", %{redis: redis} do + Exq.Redis.Connection.qp!(:testredis, [["GET", "key"], ["SET", "key", "value"]]) + assert_received({:EXIT, pid, :killed}) + assert redis == pid + end + + test "pass through other errors" do + assert {:error, %Redix.Error{}} = Exq.Redis.Connection.q(:testredis, ["GETS", "key"]) + assert {:ok, [%Redix.Error{}]} = Exq.Redis.Connection.qp(:testredis, [["GETS", "key"]]) + end +end diff --git a/test/test-redis-replica.conf b/test/test-redis-replica.conf new file mode 100644 index 00000000..5a588fcf --- /dev/null +++ b/test/test-redis-replica.conf @@ -0,0 +1,5 @@ +port 6556 +daemonize yes +logfile stdout +pidfile /tmp/resquex-redis-replica.pid +slaveof 127.0.0.1 6555 diff --git a/test/test_helper.exs b/test/test_helper.exs index 2f3c4a56..a606764b 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -109,6 +109,7 @@ defmodule TestRedis do def start do unless Config.get(:test_with_local_redis) == false do [] = :os.cmd('redis-server test/test-redis.conf') + [] = :os.cmd('redis-server test/test-redis-replica.conf') [] = :os.cmd('redis-server test/test-sentinel.conf --sentinel') :timer.sleep(500) end @@ -117,6 +118,7 @@ defmodule TestRedis do def stop do unless Config.get(:test_with_local_redis) == false do [] = :os.cmd('redis-cli -p 6555 shutdown') + [] = :os.cmd('redis-cli -p 6556 shutdown') [] = :os.cmd('redis-cli -p 6666 shutdown') end end