Skip to content

Commit

Permalink
Merge pull request mongoid#380 from jonhyman/feature/better-failover-…
Browse files Browse the repository at this point in the history
…squash

Improves Moped failover.
  • Loading branch information
durran committed Jun 16, 2015
2 parents c7378dc + 2774bae commit 803bbcd
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 28 deletions.
24 changes: 19 additions & 5 deletions lib/moped/address.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,13 @@ def initialize(address, timeout)
#
# @since 2.0.0
def resolve(node)
return @resolved if @resolved
start = Time.now
retries = 0
begin
return @resolved if @resolved
Timeout::timeout(@timeout) do
# This timeout should be very large since Timeout::timeout plays very badly with multithreaded code
# TODO: Remove this Timeout entirely
Timeout::timeout(@timeout * 10) do
Resolv.each_address(host) do |ip|
if ip =~ Resolv::IPv4::Regex
@ip ||= ip
Expand All @@ -57,9 +61,19 @@ def resolve(node)
raise Resolv::ResolvError unless @ip
end
@resolved = "#{ip}:#{port}"
rescue Timeout::Error, Resolv::ResolvError, SocketError
Loggable.warn(" MOPED:", "Could not resolve IP for: #{original}", "n/a")
node.down! and false
rescue Timeout::Error, Resolv::ResolvError, SocketError => e
msg = [" MOPED:", "Could not resolve IP for: #{original}, delta is #{Time.now - start}, error class is #{e.inspect}, retries is #{retries}. Node is #{node.inspect}", "n/a"]
if retries == 0
Loggable.info(*msg)
else
Loggable.warn(*msg)
end
if retries < 2
retries += 1
retry
else
node.down! and false
end
end
end
end
Expand Down
11 changes: 9 additions & 2 deletions lib/moped/connection/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ def pool(node)
#
# @since 2.0.3
def shutdown(node)
pool = nil
MUTEX.synchronize do
pool = pools.delete(node.address.resolved)
pool.shutdown{ |conn| conn.disconnect } if pool
nil
end
pool.shutdown{ |conn| conn.disconnect } if pool
nil
end

def delete_pool(node)
MUTEX.synchronize do
pools.delete(node.address.resolved)
end
end

Expand Down
11 changes: 7 additions & 4 deletions lib/moped/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Moped
class Cursor
include Readable
include Enumerable
include Retryable

# @attribute [r] get_more_op The get more message.
# @attribute [r] kill_cursor_op The kill cursor message.
Expand Down Expand Up @@ -43,10 +44,12 @@ def each
#
# @since 1.0.0
def get_more
reply = @node.get_more @database, @collection, @cursor_id, request_limit
@limit -= reply.count if limited?
@cursor_id = reply.cursor_id
reply.documents
with_retry(session.cluster) do
reply = @node.get_more @database, @collection, @cursor_id, request_limit
@limit -= reply.count if limited?
@cursor_id = reply.cursor_id
reply.documents
end
end

# Determine the request limit for the query
Expand Down
7 changes: 4 additions & 3 deletions lib/moped/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ class DoNotDisconnect < MongoError; end
class PotentialReconfiguration < MongoError

# Not master error codes.
NOT_MASTER = [ 13435, 13436, 10009, 15986 ]
NOT_MASTER = [ 13435, 13436, 10009, 15986, 83 ]

# Error codes received around reconfiguration
CONNECTION_ERRORS_RECONFIGURATION = [ 15988, 10276, 11600, 9001, 13639, 10009, 11002 ]
CONNECTION_ERRORS_RECONFIGURATION = [ 15988, 10276, 11600, 9001, 13639, 10009, 11002, 7 ]

# Replica set reconfigurations can be either in the form of an operation
# error with code 13435, or with an error message stating the server is
Expand All @@ -126,7 +126,8 @@ def reconfiguring_replica_set?
end

def connection_failure?
CONNECTION_ERRORS_RECONFIGURATION.include?(details["code"])
err = details["err"] || details["errmsg"] || details["$err"] || ""
CONNECTION_ERRORS_RECONFIGURATION.include?(details["code"]) || err.include?("could not get last error") || err.include?("connection attempt failed")
end

# Is the error due to a namespace not being found?
Expand Down
3 changes: 2 additions & 1 deletion lib/moped/failover.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ module Failover
Errors::ConnectionFailure => Retry,
Errors::CursorNotFound => Ignore,
Errors::OperationFailure => Reconfigure,
Errors::QueryFailure => Reconfigure
Errors::QueryFailure => Reconfigure,
Errors::PoolTimeout => Retry
}.freeze

# Get the appropriate failover handler given the provided exception.
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/failover/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Failover
module Retry
extend self

# Executes the failover strategy. In the case of retyr, we disconnect and
# Executes the failover strategy. In the case of retry, we disconnect and
# reconnect, then try the operation one more time.
#
# @example Execute the retry strategy.
Expand All @@ -24,11 +24,13 @@ module Retry
#
# @since 2.0.0
def execute(exception, node)
node.disconnect
node.disconnect unless exception.is_a?(Errors::PoolTimeout)
begin
node.connection do |conn|
yield(conn) if block_given?
end
rescue Errors::PoolTimeout => e
raise Errors::ConnectionFailure.new e
rescue Exception => e
node.down!
raise(e)
Expand Down
4 changes: 4 additions & 0 deletions lib/moped/loggable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def self.debug(prefix, payload, runtime)
Moped.logger.debug([ prefix, payload, "runtime: #{runtime}" ].join(' '))
end

def self.info(prefix, payload, runtime)
Moped.logger.info([ prefix, payload, "runtime: #{runtime}" ].join(' '))
end

# Log the payload to warn.
#
# @example Log to warn.
Expand Down
29 changes: 22 additions & 7 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,19 @@ def connected?
#
# @since 2.0.0
def connection
pool.with do |conn|
yield(conn)
connection_acquired = false
begin
pool.with do |conn|
connection_acquired = true
yield(conn)
end
rescue Timeout::Error, ConnectionPool::PoolShuttingDownError => e
if e.kind_of?(ConnectionPool::PoolShuttingDownError)
@pool = nil
Connection::Manager.delete_pool(self)
raise Errors::PoolTimeout.new(e)
end
raise connection_acquired ? e : Errors::PoolTimeout.new(e)
end
end

Expand Down Expand Up @@ -182,6 +193,10 @@ def ensure_connected(&block)
yield(conn)
end
rescue Exception => e
if e.kind_of?(ConnectionPool::PoolShuttingDownError)
@pool = nil
Connection::Manager.delete_pool(self)
end
Failover.get(e).execute(e, self, &block)
ensure
end_execution(:connection)
Expand All @@ -198,7 +213,7 @@ def ensure_connected(&block)
#
# @return [ nil ] nil.
#
# @since 1.0.0
# @since 1.0.0s
def ensure_primary
execute(:ensure_primary) do
yield(self)
Expand Down Expand Up @@ -585,14 +600,14 @@ def discover(*nodes)
def flush(ops = queue)
operations, callbacks = ops.transpose
logging(operations) do
replies = nil
ensure_connected do |conn|
conn.write(operations)
replies = conn.receive_replies(operations)

replies.zip(callbacks).map do |reply, callback|
callback ? callback[reply] : reply
end.last
end
replies.zip(callbacks).map do |reply, callback|
callback ? callback[reply] : reply
end.last
end
ensure
ops.clear
Expand Down
23 changes: 20 additions & 3 deletions lib/moped/operation/read.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,27 @@ def initialize(operation)
# @since 2.0.0
def execute(node)
node.process(operation) do |reply|
if operation.failure?(reply)
raise operation.failure_exception(reply)
# Avoid LocalJumpError
ret = nil
if reply.unauthorized? && node.credentials.key?(@database)
node.connection do |conn|
username, password = node.credentials[@database]
if username && password
conn.login(operation.database, username, password)
ret = execute(node)
end
end
end
operation.results(reply)

if ret.nil?
if operation.failure?(reply)
raise operation.failure_exception(reply)
end

ret = operation.results(reply)
end

ret
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def with_retry(cluster, retries = cluster.max_retries, &block)
! (e.message.include?("not master") || e.message.include?("Not primary"))

if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s), nodes is #{cluster.nodes.inspect}, seeds are #{cluster.seeds.inspect}, cluster is #{cluster.inspect}. Error backtrace is #{e.backtrace}.", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
Expand Down
43 changes: 43 additions & 0 deletions spec/moped/node_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,47 @@
end
end
end

describe "#connection" do
let(:node) do
described_class.new("127.0.0.1:27017", pool_size: 1, pool_timeout: 0.1)
end

context "when take a long time to get a connection from pool" do
it "raise a Errors::PoolTimeout error" do
expect {

exception = nil
100.times.map do |i|
Thread.new do
begin
node.connection do |conn|
conn.apply_credentials({})
node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true})
end
rescue => e
exception = e if exception.nil?
end
end
end.each {|t| t.join }
raise exception unless exception.nil?

}.to raise_error(Moped::Errors::PoolTimeout)
end
end

context "when the timeout happens after get a connection from pool" do
it "raise a Timeout::Error" do
expect {
node.connection do |conn|
Timeout::timeout(0.01) do
conn.apply_credentials({})
node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true})
sleep(0.1) # just to simulate a long block which raise a timeout
end
end
}.to raise_error(Timeout::Error)
end
end
end
end
20 changes: 20 additions & 0 deletions spec/moped/session_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,24 @@
nodes.last.should be_down
end
end

context "when connections on pool are busy" do
let(:session) do
Moped::Session.new([ "127.0.0.1:27017" ], database: "moped_test", pool_size: 1, pool_timeout: 0.2, max_retries: 30, retry_interval: 1)
end

it "should retry the operation" do
session[:test].find({ name: "test_counter" }).update({'$set' => {'cnt' => 1}}, {upsert: true})

results = []

300.times.map do |i|
Thread.new do
results.push session[:test].find({ name: "test_counter" }).first["cnt"]
end
end.each {|t| t.join }

expect(results.count).to eql(300)
end
end
end

0 comments on commit 803bbcd

Please sign in to comment.