Skip to content

Commit

Permalink
Merge pull request #382 from deepfryed/feature/handle-elasticache-red…
Browse files Browse the repository at this point in the history
…is-failover

Handle AWS Elasticache Redis DNS failover.
  • Loading branch information
akira authored Jun 17, 2019
2 parents 67e50c6 + 7211d11 commit 471047d
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 3 deletions.
60 changes: 57 additions & 3 deletions lib/exq/redis/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,68 @@ defmodule Exq.Redis.Connection do
end

def q(redis, command) do
Redix.command(redis, command, timeout: Config.get(:redis_timeout))
redis
|> Redix.command(command, timeout: Config.get(:redis_timeout))
|> handle_response(redis)
end

def qp(redis, command) do
Redix.pipeline(redis, command, timeout: Config.get(:redis_timeout))
redis
|> Redix.pipeline(command, timeout: Config.get(:redis_timeout))
|> handle_responses(redis)
end

def qp!(redis, command) do
Redix.pipeline!(redis, command, timeout: Config.get(:redis_timeout))
redis
|> Redix.pipeline!(command, timeout: Config.get(:redis_timeout))
|> handle_responses(redis)
end

defp handle_response({:error, %{message: "READONLY" <> _rest}} = error, redis) do
disconnect(redis)
error
end

defp handle_response({:error, message} = error, _) do
Logger.error(inspect(message))
error
end

defp handle_response(response, _) do
response
end

defp handle_responses({:ok, responses} = result, redis) do
# Disconnect once for multiple readonly redis node errors.
if Enum.any?(responses, &readonly_error?/1) do
disconnect(redis)
end
result
end

defp handle_responses(responses, redis) when is_list(responses) do
# Disconnect once for multiple readonly redis node errors.
if Enum.any?(responses, &readonly_error?/1) do
disconnect(redis)
end
responses
end

defp handle_responses(responses, _) do
responses
end

defp readonly_error?(%{message: "READONLY" <> _rest}), do: true
defp readonly_error?(_), do: false

defp disconnect(redis) do
pid = Process.whereis(redis)
if !is_nil(pid) && Process.alive?(pid) do
# Let the supervisor restart the process with a new connection.
Logger.error("Redis failover - forcing a reconnect")
Process.exit(pid, :kill)
# Give the process some time to be restarted.
:timer.sleep(100)
end
end
end
40 changes: 40 additions & 0 deletions test/readonly_reconnect_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule ReadonlyReconnectTest do
use ExUnit.Case
import ExqTestUtil

setup do
Process.flag(:trap_exit, true)
{:ok, redis} = Redix.start_link(host: "127.0.0.1", port: 6556)
Process.register(redis, :testredis)

on_exit(fn ->
wait()
TestRedis.teardown()
end)

{:ok, redis: redis}
end

test "disconnect on read-only errors with single command", %{redis: redis} do
Exq.Redis.Connection.q(:testredis, ["SET", "key", "value"])
assert_received({:EXIT, pid, :killed})
assert redis == pid
end

test "disconnect on read-only errors with command pipeline", %{redis: redis} do
Exq.Redis.Connection.qp(:testredis, [["GET", "key"], ["SET", "key", "value"]])
assert_received({:EXIT, pid, :killed})
assert redis == pid
end

test "disconnect on read-only errors with command pipeline returning values", %{redis: redis} do
Exq.Redis.Connection.qp!(:testredis, [["GET", "key"], ["SET", "key", "value"]])
assert_received({:EXIT, pid, :killed})
assert redis == pid
end

test "pass through other errors" do
assert {:error, %Redix.Error{}} = Exq.Redis.Connection.q(:testredis, ["GETS", "key"])
assert {:ok, [%Redix.Error{}]} = Exq.Redis.Connection.qp(:testredis, [["GETS", "key"]])
end
end
5 changes: 5 additions & 0 deletions test/test-redis-replica.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
port 6556
daemonize yes
logfile stdout
pidfile /tmp/resquex-redis-replica.pid
slaveof 127.0.0.1 6555
2 changes: 2 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ defmodule TestRedis do
def start do
unless Config.get(:test_with_local_redis) == false do
[] = :os.cmd('redis-server test/test-redis.conf')
[] = :os.cmd('redis-server test/test-redis-replica.conf')
[] = :os.cmd('redis-server test/test-sentinel.conf --sentinel')
:timer.sleep(500)
end
Expand All @@ -117,6 +118,7 @@ defmodule TestRedis do
def stop do
unless Config.get(:test_with_local_redis) == false do
[] = :os.cmd('redis-cli -p 6555 shutdown')
[] = :os.cmd('redis-cli -p 6556 shutdown')
[] = :os.cmd('redis-cli -p 6666 shutdown')
end
end
Expand Down

0 comments on commit 471047d

Please sign in to comment.