Skip to content

Commit

Permalink
Merge pull request mongoid#351 from wandenberg/fix_retries_operations
Browse files Browse the repository at this point in the history
fix retries operations
  • Loading branch information
durran committed May 26, 2015
2 parents 10abbf3 + 0205bcc commit 68923e0
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 62 deletions.
10 changes: 7 additions & 3 deletions lib/moped/collection.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# encoding: utf-8
require "moped/query"
require "moped/retryable"

module Moped

Expand All @@ -8,6 +9,7 @@ module Moped
# @since 1.0.0
class Collection
include Readable
include Retryable

# @!attribute database
# @return [ Database ] The database for the collection.
Expand Down Expand Up @@ -120,9 +122,11 @@ def initialize(database, name)
#
# @since 1.0.0
def insert(documents, flags = nil)
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
with_retry(cluster) do
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
end
end
end

Expand Down
55 changes: 31 additions & 24 deletions lib/moped/query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Moped
# people.find.count # => 1
class Query
include Enumerable
include Retryable

# @attribute [r] collection The collection to execute the query on.
# @attribute [r] operation The query operation.
Expand Down Expand Up @@ -321,14 +322,16 @@ def modify(change, options = {})
#
# @since 1.0.0
def remove
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
end
end
end

Expand All @@ -341,13 +344,15 @@ def remove
#
# @since 1.0.0
def remove_all
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
end
end
end

Expand Down Expand Up @@ -423,15 +428,17 @@ def tailable
#
# @since 1.0.0
def update(change, flags = nil)
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
with_retry(cluster) do
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
end
end
end

Expand Down
37 changes: 2 additions & 35 deletions lib/moped/read_preference/selectable.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# encoding: utf-8
require "moped/retryable"
module Moped
module ReadPreference

Expand All @@ -7,6 +8,7 @@ module ReadPreference
#
# @since 2.0.0
module Selectable
include Retryable

# @!attribute tags
# @return [ Array<Hash> ] The tag sets.
Expand Down Expand Up @@ -39,41 +41,6 @@ def query_options(options)
options[:flags] |= [ :slave_ok ]
options
end

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure => e
if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
end
46 changes: 46 additions & 0 deletions lib/moped/retryable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# encoding: utf-8
module Moped
# Provides the shared behaviour for retry failed operations.
#
# @since 2.0.0
module Retryable

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
! (e.message.include?("not master") || e.message.include?("Not primary"))

if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
Loading

0 comments on commit 68923e0

Please sign in to comment.