From 84b058aacfdb32a15f67e2a125dfe30bc56d6df2 Mon Sep 17 00:00:00 2001 From: Wandenberg Date: Mon, 20 Oct 2014 23:45:07 -0200 Subject: [PATCH 1/3] fix retry on find operations after replicaset reconfiguration --- lib/moped/read_preference/selectable.rb | 2 +- spec/moped/cluster_spec.rb | 97 ++++++++++++++++++++ spec/spec_helper.rb | 116 ++++++++++++++++++++++++ 3 files changed, 214 insertions(+), 1 deletion(-) diff --git a/lib/moped/read_preference/selectable.rb b/lib/moped/read_preference/selectable.rb index 73686de..0cdea3c 100644 --- a/lib/moped/read_preference/selectable.rb +++ b/lib/moped/read_preference/selectable.rb @@ -63,7 +63,7 @@ def query_options(options) def with_retry(cluster, retries = cluster.max_retries, &block) begin block.call - rescue Errors::ConnectionFailure => e + rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e if retries > 0 Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a") sleep(cluster.retry_interval) diff --git a/spec/moped/cluster_spec.rb b/spec/moped/cluster_spec.rb index baa4d30..eaa3268 100644 --- a/spec/moped/cluster_spec.rb +++ b/spec/moped/cluster_spec.rb @@ -548,3 +548,100 @@ end end end + +describe Moped::Cluster, "after a reconfiguration" do + let(:options) do + { + max_retries: 30, + retry_interval: 1, + timeout: 5, + database: 'test_db', + read: :primary, + write: {w: 'majority'} + } + end + + let(:replica_set_name) { 'dev' } + + let(:session) do + Moped::Session.new([ "127.0.0.1:31100", "127.0.0.1:31101", "127.0.0.1:31102" ], options) + end + + def step_down_servers + step_down_file = File.join(Dir.tmpdir, with_authentication? ? "step_down_with_authentication.js" : "step_down_without_authentication.js") + unless File.exists?(step_down_file) + File.open(step_down_file, "w") do |file| + user_data = with_authentication? ? ", 'admin', 'admin_pwd'" : "" + file.puts %{ + function stepDown(dbs) { + for (i in dbs) { + dbs[i].adminCommand({replSetFreeze:5}); + try { dbs[i].adminCommand({replSetStepDown:5}); } catch(e) { print(e) }; + } + }; + + var db1 = connect('localhost:31100/admin'#{user_data}); + var db2 = connect('localhost:31101/admin'#{user_data}); + var db3 = connect('localhost:31102/admin'#{user_data}); + + var dbs = [db1, db2, db3]; + stepDown(dbs); + + while (db1.adminCommand({ismaster:1}).ismaster || db2.adminCommand({ismaster:1}).ismaster || db2.adminCommand({ismaster:1}).ismaster) { + stepDown(dbs); + } + } + end + end + system "mongo --nodb #{step_down_file} 2>&1 > /dev/null" + end + + shared_examples_for "recover the session" do + it "should execute commands normally before the stepDown" do + time = Benchmark.realtime do + session[:foo].find().remove_all() + session[:foo].find().to_a.count.should eql(0) + session[:foo].insert({ name: "bar 1" }) + session[:foo].find().to_a.count.should eql(1) + expect { + session[:foo].insert({ name: "bar 1" }) + }.to raise_exception(Moped::Errors::OperationFailure, /duplicate/) + end + time.should be < 2 + end + + it "should recover and execute a find" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1" }) + step_down_servers + time = Benchmark.realtime do + session[:foo].find().to_a.count.should eql(1) + end + session.cluster.nodes.map(&:refresh) + expect{ session.cluster.nodes.any?{ |node| node.primary? } }.to be_true + time.should be > 5 + time.should be < 29 + end + end + + describe "with authentication off" do + before do + setup_replicaset_environment(with_authentication?) + end + + let(:with_authentication?) { false } + + it_should_behave_like "recover the session" + end + + describe "with authentication on" do + before do + setup_replicaset_environment(with_authentication?) + session.login('common', 'common_pwd') + end + + let(:with_authentication?) { true } + + it_should_behave_like "recover the session" + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 341a488..f7e3106 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -12,6 +12,11 @@ $:.unshift((Pathname(__FILE__).dirname.parent + "lib").to_s) +require "timeout" +require "benchmark" +require "fileutils" +require "tmpdir" +require "tempfile" require "moped" require "support/examples" require "support/mongohq" @@ -36,7 +41,118 @@ Moped::Connection::Manager.instance_variable_set(:@pools, {}) end + config.after(:suite) do + stop_mongo_server(31100) + stop_mongo_server(31101) + stop_mongo_server(31102) + end + unless Support::MongoHQ.replica_set_configured? || Support::MongoHQ.auth_node_configured? $stderr.puts Support::MongoHQ.message end end + +def start_mongo_server(port, extra_options=nil, clean_database_files=true) + dbpath = File.join(Dir.tmpdir, "mongod-db", port.to_s) + FileUtils.mkdir_p(dbpath) + + Timeout::timeout(10) do + loop do + `mongod --oplogSize 40 --noprealloc --smallfiles --port #{port} --dbpath #{dbpath} --logpath #{dbpath}/log --pidfilepath #{dbpath}/pid --fork #{extra_options}` + sleep 1 + break if `echo 'db.runCommand({ping:1}).ok' | mongo --quiet --port #{port} 2>/dev/null`.chomp == "1" + end + end +end + +def stop_mongo_server(port, clean_database_files=true) + dbpath = File.join(Dir.tmpdir, "mongod-db", port.to_s) + pidfile = File.join(dbpath, "pid") + if File.exists?(pidfile) + Timeout::timeout(10) do + loop do + `kill #{File.read(pidfile).chomp}` + sleep 1 + break if `echo 'db.runCommand({ping:1}).ok' | mongo --quiet --port #{port} 2>/dev/null`.chomp != "1" + end + end + end + + FileUtils.rm_rf(dbpath) if clean_database_files +end + +def keyfile + file = File.join(Dir.tmpdir, "mongod-db", "keyfile") + return file if File.exists?(file) + + FileUtils.mkdir_p(File.dirname(file)) + File.open(file, "w", 0600) do |f| + f.puts <<-EOF.gsub /^\s+/, "" + SyrfEmAevWPEbgRZoZx9qZcZtJAAfd269da+kzi0H/7OuowGLxM3yGGUHhD379qP + nw4X8TT2T6ecx6aqJgxG+biJYVOpNK3HHU9Dp5q6Jd0bWGHGGbgFHV32/z2FFiti + EFLimW/vfn2DcJwTW29nQWhz2wN+xfMuwA6hVxFczlQlz5hIY0+a+bQChKw8wDZk + rW1OjTQ//csqPbVA8fwB49ghLGp+o84VujhRxLJ+0sbs8dKoIgmVlX2kLeHGQSf0 + KmF9b8kAWRLwLneOR3ESovXpEoK0qpQb2ym6BNqP32JKyPA6Svb/smVONhjUI71f + /zQ2ETX7ylpxIzw2SMv/zOWcVHBqIbdP9Llrxb3X0EsB6J8PeI8qLjpS94FyEddw + ACMcAxbP+6BaLjXyJ2WsrEeqThAyUC3uF5YN/oQ9XiATqP7pDOTrmfn8LvryyzcB + ByrLRTPOicBaG7y13ATcCbBdrYH3BE4EeLkTUZOg7VzvRnATvDpt0wOkSnbqXow8 + GQ6iMUgd2XvUCuknQLD6gWyoUyHiPADKrLsgnd3Qo9BPxYJ9VWSKB4phK3N7Bic+ + BwxlcpDFzGI285GR4IjcJbRRjjywHq5XHOxrJfN+QrZ/6wy6yu2+4NTPj+BPC5iX + /dNllTEyn7V+pr6FiRv8rv8RcxJgf3nfn/Xz0t2zW2olcalEFxwKKmR20pZxPnSv + Kr6sVHEzh0mtA21LoK5G8bztXsgFgWU7hh9z8UUo7KQQnDfyPb6k4xroeeQtWBNo + TZF1pI5joLytNSEtT+BYA5wQSYm4WCbhG+j7ipcPIJw6Un4ZtAZs0aixDfVE0zo0 + w2FWrYH2dmmCMbz7cEXeqvQiHh9IU/hkTrKGY95STszGGFFjhtS2TbHAn2rRoFI0 + VwNxMJCC+9ZijTWBeGyQOuEupuI4C9IzA5Gz72048tpZ0qMJ9mOiH3lZFtNTg/5P + 28Td2xzaujtXjRnP3aZ9z2lKytlr + EOF + end + file +end + +def setup_replicaset_environment(with_authentication=false, replica_set_name='dev') + status = servers_status(with_authentication).select{|st| st == "PRIMARY" || st == "SECONDARY"} + has_admin = has_user_admin?(with_authentication) + unless status.count == 3 && status.all?{|st| st == "PRIMARY" || st == "SECONDARY"} && (with_authentication ? has_admin : !has_admin) + stop_mongo_server(31101) + stop_mongo_server(31100) + stop_mongo_server(31102) + + options = with_authentication ? "--replSet #{replica_set_name} --keyFile #{keyfile} --auth" : "--replSet #{replica_set_name}" + start_mongo_server(31100, options) + start_mongo_server(31101, options) + start_mongo_server(31102, options) + + Timeout::timeout(90) do + sleep 5 while `echo "rs.initiate({_id : '#{replica_set_name}', 'members' : [{_id:0, host:'127.0.0.1:31100'},{_id:1, host:'127.0.0.1:31101'},{_id:2, host:'127.0.0.1:31102'}]}).ok" | mongo --quiet --port 31100 2>/dev/null`.chomp != "1" + sleep 1 while !servers_status(false).all?{|st| st == "PRIMARY" || st == "SECONDARY"} + end + + master = `echo 'db.isMaster().primary' | mongo --quiet --port 31100 2>/dev/null`.chomp + + auth_credentials = "" + if with_authentication + `echo " + use admin; + db.addUser('admin', 'admin_pwd'); + " | mongo #{master} 2>/dev/null` + + auth_credentials = "-u admin -p admin_pwd --authenticationDatabase admin" + end + + `echo " + use test_db; + db.addUser('common', 'common_pwd'); + db.foo.ensureIndex({name:1}, {unique:1}); + " | mongo #{master} #{auth_credentials} 2>/dev/null` + end +end + +def servers_status(with_authentication) + auth = has_user_admin?(with_authentication) ? "-u admin -p admin_pwd --authenticationDatabase admin" : "" + `echo 'rs.status().members[0].stateStr + "|" + rs.status().members[1].stateStr + "|" + rs.status().members[2].stateStr' | mongo --quiet --port 31100 #{auth} 2>/dev/null`.chomp.split("|") +end + +def has_user_admin?(with_authentication) + auth = with_authentication ? "-u admin -p admin_pwd --authenticationDatabase admin" : "" + `echo 'db.getSisterDB("admin").getUser("admin").user' | mongo --quiet --port 31100 #{auth} 2>/dev/null`.chomp == "admin" +end From 5466e4d1ffef6f379398d1b3a7caae2d785a7675 Mon Sep 17 00:00:00 2001 From: Wandenberg Date: Tue, 21 Oct 2014 12:48:49 -0200 Subject: [PATCH 2/3] add retry on insert operations --- lib/moped/collection.rb | 10 ++++-- lib/moped/read_preference/selectable.rb | 37 ++------------------ lib/moped/retryable.rb | 45 +++++++++++++++++++++++++ spec/moped/cluster_spec.rb | 35 +++++++++++++++++++ 4 files changed, 89 insertions(+), 38 deletions(-) create mode 100644 lib/moped/retryable.rb diff --git a/lib/moped/collection.rb b/lib/moped/collection.rb index d68f8da..5aa8f43 100644 --- a/lib/moped/collection.rb +++ b/lib/moped/collection.rb @@ -1,5 +1,6 @@ # encoding: utf-8 require "moped/query" +require "moped/retryable" module Moped @@ -8,6 +9,7 @@ module Moped # @since 1.0.0 class Collection include Readable + include Retryable # @!attribute database # @return [ Database ] The database for the collection. @@ -120,9 +122,11 @@ def initialize(database, name) # # @since 1.0.0 def insert(documents, flags = nil) - docs = documents.is_a?(Array) ? documents : [ documents ] - cluster.with_primary do |node| - node.insert(database.name, name, docs, write_concern, flags: flags || []) + with_retry(cluster) do + docs = documents.is_a?(Array) ? documents : [ documents ] + cluster.with_primary do |node| + node.insert(database.name, name, docs, write_concern, flags: flags || []) + end end end diff --git a/lib/moped/read_preference/selectable.rb b/lib/moped/read_preference/selectable.rb index 0cdea3c..148f58d 100644 --- a/lib/moped/read_preference/selectable.rb +++ b/lib/moped/read_preference/selectable.rb @@ -1,4 +1,5 @@ # encoding: utf-8 +require "moped/retryable" module Moped module ReadPreference @@ -7,6 +8,7 @@ module ReadPreference # # @since 2.0.0 module Selectable + include Retryable # @!attribute tags # @return [ Array ] The tag sets. @@ -39,41 +41,6 @@ def query_options(options) options[:flags] |= [ :slave_ok ] options end - - private - - # Execute the provided block on the cluster and retry if the execution - # fails. - # - # @api private - # - # @example Execute with retry. - # preference.with_retry(cluster) do - # cluster.with_primary do |node| - # node.refresh - # end - # end - # - # @param [ Cluster ] cluster The cluster. - # @param [ Integer ] retries The number of times to retry. - # - # @return [ Object ] The result of the block. - # - # @since 2.0.0 - def with_retry(cluster, retries = cluster.max_retries, &block) - begin - block.call - rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e - if retries > 0 - Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a") - sleep(cluster.retry_interval) - cluster.refresh - with_retry(cluster, retries - 1, &block) - else - raise e - end - end - end end end end diff --git a/lib/moped/retryable.rb b/lib/moped/retryable.rb new file mode 100644 index 0000000..0fb04e4 --- /dev/null +++ b/lib/moped/retryable.rb @@ -0,0 +1,45 @@ +# encoding: utf-8 +module Moped + # Provides the shared behaviour for retry failed operations. + # + # @since 2.0.0 + module Retryable + + private + + # Execute the provided block on the cluster and retry if the execution + # fails. + # + # @api private + # + # @example Execute with retry. + # preference.with_retry(cluster) do + # cluster.with_primary do |node| + # node.refresh + # end + # end + # + # @param [ Cluster ] cluster The cluster. + # @param [ Integer ] retries The number of times to retry. + # + # @return [ Object ] The result of the block. + # + # @since 2.0.0 + def with_retry(cluster, retries = cluster.max_retries, &block) + begin + block.call + rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e + raise e if e.is_a?(Errors::PotentialReconfiguration) && ! e.message.include?("not master") + + if retries > 0 + Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a") + sleep(cluster.retry_interval) + cluster.refresh + with_retry(cluster, retries - 1, &block) + else + raise e + end + end + end + end +end diff --git a/spec/moped/cluster_spec.rb b/spec/moped/cluster_spec.rb index eaa3268..a524820 100644 --- a/spec/moped/cluster_spec.rb +++ b/spec/moped/cluster_spec.rb @@ -622,6 +622,41 @@ def step_down_servers time.should be > 5 time.should be < 29 end + + it "should recover and execute an insert" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1" }) + step_down_servers + time = Benchmark.realtime do + session[:foo].insert({ name: "bar 2" }) + session[:foo].find().to_a.count.should eql(2) + end + time.should be > 5 + time.should be < 29 + + session[:foo].insert({ name: "bar 3" }) + session[:foo].find().to_a.count.should eql(3) + end + + it "should recover and try an insert which hit a constraint" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1" }) + step_down_servers + time = Benchmark.realtime do + expect { + session[:foo].insert({ name: "bar 1" }) + }.to raise_exception(Moped::Errors::OperationFailure, /duplicate/) + end + session.cluster.nodes.map(&:refresh) + expect{ session.cluster.nodes.any?{ |node| node.primary? } }.to be_true + time.should be > 5 + time.should be < 29 + + session[:foo].find().to_a.count.should eql(1) + + session[:foo].insert({ name: "bar 2" }) + session[:foo].find().to_a.count.should eql(2) + end end describe "with authentication off" do From 0205bcc69db82abacc5aa37475c33bd9950593c9 Mon Sep 17 00:00:00 2001 From: Wandenberg Date: Tue, 21 Oct 2014 18:02:23 -0200 Subject: [PATCH 3/3] add retry to other operations which need to be executed on a primary node --- lib/moped/query.rb | 55 ++++++++++++++++++--------------- lib/moped/retryable.rb | 3 +- spec/moped/cluster_spec.rb | 63 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+), 25 deletions(-) diff --git a/lib/moped/query.rb b/lib/moped/query.rb index f308010..de6af6e 100644 --- a/lib/moped/query.rb +++ b/lib/moped/query.rb @@ -21,6 +21,7 @@ module Moped # people.find.count # => 1 class Query include Enumerable + include Retryable # @attribute [r] collection The collection to execute the query on. # @attribute [r] operation The query operation. @@ -321,14 +322,16 @@ def modify(change, options = {}) # # @since 1.0.0 def remove - cluster.with_primary do |node| - node.remove( - operation.database, - operation.collection, - operation.basic_selector, - write_concern, - flags: [ :remove_first ] - ) + with_retry(cluster) do + cluster.with_primary do |node| + node.remove( + operation.database, + operation.collection, + operation.basic_selector, + write_concern, + flags: [ :remove_first ] + ) + end end end @@ -341,13 +344,15 @@ def remove # # @since 1.0.0 def remove_all - cluster.with_primary do |node| - node.remove( - operation.database, - operation.collection, - operation.basic_selector, - write_concern - ) + with_retry(cluster) do + cluster.with_primary do |node| + node.remove( + operation.database, + operation.collection, + operation.basic_selector, + write_concern + ) + end end end @@ -423,15 +428,17 @@ def tailable # # @since 1.0.0 def update(change, flags = nil) - cluster.with_primary do |node| - node.update( - operation.database, - operation.collection, - operation.selector["$query"] || operation.selector, - change, - write_concern, - flags: flags - ) + with_retry(cluster) do + cluster.with_primary do |node| + node.update( + operation.database, + operation.collection, + operation.selector["$query"] || operation.selector, + change, + write_concern, + flags: flags + ) + end end end diff --git a/lib/moped/retryable.rb b/lib/moped/retryable.rb index 0fb04e4..906b094 100644 --- a/lib/moped/retryable.rb +++ b/lib/moped/retryable.rb @@ -29,7 +29,8 @@ def with_retry(cluster, retries = cluster.max_retries, &block) begin block.call rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e - raise e if e.is_a?(Errors::PotentialReconfiguration) && ! e.message.include?("not master") + raise e if e.is_a?(Errors::PotentialReconfiguration) && + ! (e.message.include?("not master") || e.message.include?("Not primary")) if retries > 0 Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a") diff --git a/spec/moped/cluster_spec.rb b/spec/moped/cluster_spec.rb index a524820..2d4c160 100644 --- a/spec/moped/cluster_spec.rb +++ b/spec/moped/cluster_spec.rb @@ -657,6 +657,69 @@ def step_down_servers session[:foo].insert({ name: "bar 2" }) session[:foo].find().to_a.count.should eql(2) end + + it "should recover and execute an update" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1" }) + cursor = session[:foo].find({ name: "bar 1" }) + step_down_servers + time = Benchmark.realtime do + cursor.update({ name: "bar 2" }) + end + time.should be > 5 + time.should be < 29 + + session[:foo].find().to_a.count.should eql(1) + session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0) + session[:foo].find({ name: "bar 2" }).to_a.count.should eql(1) + end + + it "should recover and execute a remove" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1", type: "some" }) + session[:foo].insert({ name: "bar 2", type: "some" }) + cursor = session[:foo].find({ type: "some" }) + step_down_servers + time = Benchmark.realtime do + cursor.remove() + end + time.should be > 5 + time.should be < 29 + + session[:foo].find().to_a.count.should eql(1) + session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0) + session[:foo].find({ name: "bar 2" }).to_a.count.should eql(1) + end + + it "should recover and execute a remove_all" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1", type: "some" }) + session[:foo].insert({ name: "bar 2", type: "some" }) + cursor = session[:foo].find({ type: "some" }) + step_down_servers + time = Benchmark.realtime do + cursor.remove_all() + end + time.should be > 5 + time.should be < 29 + + session[:foo].find().to_a.count.should eql(0) + session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0) + session[:foo].find({ name: "bar 2" }).to_a.count.should eql(0) + end + + it "should recover and execute an operation using the basic command method" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1", type: "some" }) + session[:foo].insert({ name: "bar 2", type: "some" }) + cursor = session[:foo].find({ type: "some" }) + step_down_servers + time = Benchmark.realtime do + cursor.count.should eql(2) + end + time.should be > 5 + time.should be < 29 + end end describe "with authentication off" do