Skip to content

Commit

Permalink
add retry to other operations which need to be executed on a primary …
Browse files Browse the repository at this point in the history
…node
  • Loading branch information
wandenberg committed Feb 23, 2015
1 parent 5466e4d commit 0205bcc
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 25 deletions.
55 changes: 31 additions & 24 deletions lib/moped/query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Moped
# people.find.count # => 1
class Query
include Enumerable
include Retryable

# @attribute [r] collection The collection to execute the query on.
# @attribute [r] operation The query operation.
Expand Down Expand Up @@ -321,14 +322,16 @@ def modify(change, options = {})
#
# @since 1.0.0
def remove
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
end
end
end

Expand All @@ -341,13 +344,15 @@ def remove
#
# @since 1.0.0
def remove_all
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
end
end
end

Expand Down Expand Up @@ -423,15 +428,17 @@ def tailable
#
# @since 1.0.0
def update(change, flags = nil)
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
with_retry(cluster) do
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
end
end
end

Expand Down
3 changes: 2 additions & 1 deletion lib/moped/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) && ! e.message.include?("not master")
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
! (e.message.include?("not master") || e.message.include?("Not primary"))

if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
Expand Down
63 changes: 63 additions & 0 deletions spec/moped/cluster_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,69 @@ def step_down_servers
session[:foo].insert({ name: "bar 2" })
session[:foo].find().to_a.count.should eql(2)
end

it "should recover and execute an update" do
session[:foo].find().remove_all()
session[:foo].insert({ name: "bar 1" })
cursor = session[:foo].find({ name: "bar 1" })
step_down_servers
time = Benchmark.realtime do
cursor.update({ name: "bar 2" })
end
time.should be > 5
time.should be < 29

session[:foo].find().to_a.count.should eql(1)
session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0)
session[:foo].find({ name: "bar 2" }).to_a.count.should eql(1)
end

it "should recover and execute a remove" do
session[:foo].find().remove_all()
session[:foo].insert({ name: "bar 1", type: "some" })
session[:foo].insert({ name: "bar 2", type: "some" })
cursor = session[:foo].find({ type: "some" })
step_down_servers
time = Benchmark.realtime do
cursor.remove()
end
time.should be > 5
time.should be < 29

session[:foo].find().to_a.count.should eql(1)
session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0)
session[:foo].find({ name: "bar 2" }).to_a.count.should eql(1)
end

it "should recover and execute a remove_all" do
session[:foo].find().remove_all()
session[:foo].insert({ name: "bar 1", type: "some" })
session[:foo].insert({ name: "bar 2", type: "some" })
cursor = session[:foo].find({ type: "some" })
step_down_servers
time = Benchmark.realtime do
cursor.remove_all()
end
time.should be > 5
time.should be < 29

session[:foo].find().to_a.count.should eql(0)
session[:foo].find({ name: "bar 1" }).to_a.count.should eql(0)
session[:foo].find({ name: "bar 2" }).to_a.count.should eql(0)
end

it "should recover and execute an operation using the basic command method" do
session[:foo].find().remove_all()
session[:foo].insert({ name: "bar 1", type: "some" })
session[:foo].insert({ name: "bar 2", type: "some" })
cursor = session[:foo].find({ type: "some" })
step_down_servers
time = Benchmark.realtime do
cursor.count.should eql(2)
end
time.should be > 5
time.should be < 29
end
end

describe "with authentication off" do
Expand Down

0 comments on commit 0205bcc

Please sign in to comment.