From 0205bcc69db82abacc5aa37475c33bd9950593c9 Mon Sep 17 00:00:00 2001 From: Wandenberg Date: Tue, 21 Oct 2014 18:02:23 -0200 Subject: [PATCH] add retry to other operations which need to be executed on a primary node --- lib/moped/query.rb | 55 ++++++++++++++++++--------------- lib/moped/retryable.rb | 3 +- spec/moped/cluster_spec.rb | 63 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+), 25 deletions(-) diff --git a/lib/moped/query.rb b/lib/moped/query.rb index f308010..de6af6e 100644 --- a/lib/moped/query.rb +++ b/lib/moped/query.rb @@ -21,6 +21,7 @@ module Moped # people.find.count # => 1 class Query include Enumerable + include Retryable # @attribute [r] collection The collection to execute the query on. # @attribute [r] operation The query operation. @@ -321,14 +322,16 @@ def modify(change, options = {}) # # @since 1.0.0 def remove - cluster.with_primary do |node| - node.remove( - operation.database, - operation.collection, - operation.basic_selector, - write_concern, - flags: [ :remove_first ] - ) + with_retry(cluster) do + cluster.with_primary do |node| + node.remove( + operation.database, + operation.collection, + operation.basic_selector, + write_concern, + flags: [ :remove_first ] + ) + end end end @@ -341,13 +344,15 @@ def remove # # @since 1.0.0 def remove_all - cluster.with_primary do |node| - node.remove( - operation.database, - operation.collection, - operation.basic_selector, - write_concern - ) + with_retry(cluster) do + cluster.with_primary do |node| + node.remove( + operation.database, + operation.collection, + operation.basic_selector, + write_concern + ) + end end end @@ -423,15 +428,17 @@ def tailable # # @since 1.0.0 def update(change, flags = nil) - cluster.with_primary do |node| - node.update( - operation.database, - operation.collection, - operation.selector["$query"] || operation.selector, - change, - write_concern, - flags: flags - ) + with_retry(cluster) do + cluster.with_primary do |node| + node.update( + operation.database, + operation.collection, + operation.selector["$query"] || operation.selector, + change, + write_concern, + flags: flags + ) + end end end diff --git a/lib/moped/retryable.rb b/lib/moped/retryable.rb index 0fb04e4..906b094 100644 --- a/lib/moped/retryable.rb +++ b/lib/moped/retryable.rb @@ -29,7 +29,8 @@ def with_retry(cluster, retries = cluster.max_retries, &block) begin block.call rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e - raise e if e.is_a?(Errors::PotentialReconfiguration) && ! e.message.include?("not master") + raise e if e.is_a?(Errors::PotentialReconfiguration) && + ! (e.message.include?("not master") || e.message.include?("Not primary")) if retries > 0 Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a") diff --git a/spec/moped/cluster_spec.rb b/spec/moped/cluster_spec.rb index a524820..2d4c160 100644 --- a/spec/moped/cluster_spec.rb +++ b/spec/moped/cluster_spec.rb @@ -657,6 +657,69 @@ def step_down_servers session[:foo].insert({ name: "bar 2" }) session[:foo].find().to_a.count.should eql(2) end + + it "should recover and execute an update" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1" }) + cursor = session[:foo].find({ name: "bar 1" }) + step_down_servers + time = Benchmark.realtime do + cursor.update({ name: "bar 2" }) + end + time.should be > 5 + time.should be < 29 + + session[:foo].find().to_a.count.should eql(1) + session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0) + session[:foo].find({ name: "bar 2" }).to_a.count.should eql(1) + end + + it "should recover and execute a remove" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1", type: "some" }) + session[:foo].insert({ name: "bar 2", type: "some" }) + cursor = session[:foo].find({ type: "some" }) + step_down_servers + time = Benchmark.realtime do + cursor.remove() + end + time.should be > 5 + time.should be < 29 + + session[:foo].find().to_a.count.should eql(1) + session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0) + session[:foo].find({ name: "bar 2" }).to_a.count.should eql(1) + end + + it "should recover and execute a remove_all" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1", type: "some" }) + session[:foo].insert({ name: "bar 2", type: "some" }) + cursor = session[:foo].find({ type: "some" }) + step_down_servers + time = Benchmark.realtime do + cursor.remove_all() + end + time.should be > 5 + time.should be < 29 + + session[:foo].find().to_a.count.should eql(0) + session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0) + session[:foo].find({ name: "bar 2" }).to_a.count.should eql(0) + end + + it "should recover and execute an operation using the basic command method" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1", type: "some" }) + session[:foo].insert({ name: "bar 2", type: "some" }) + cursor = session[:foo].find({ type: "some" }) + step_down_servers + time = Benchmark.realtime do + cursor.count.should eql(2) + end + time.should be > 5 + time.should be < 29 + end end describe "with authentication off" do