diff --git a/lib/active_graph/node/query/query_proxy_methods_of_mass_updating.rb b/lib/active_graph/node/query/query_proxy_methods_of_mass_updating.rb index e5c5f7e8f..14065a73f 100644 --- a/lib/active_graph/node/query/query_proxy_methods_of_mass_updating.rb +++ b/lib/active_graph/node/query/query_proxy_methods_of_mass_updating.rb @@ -50,11 +50,8 @@ def delete_all_rels # Executed in the database, callbacks will not be run. def replace_with(node_or_nodes) node_or_nodes = Array(node_or_nodes).map { |arg| arg.is_a?(ActiveGraph::Node) ? arg : @model.find(arg) } + ActiveGraph::Base.lock_node(start_object) unless start_object.new_record? original_ids = self.pluck(:id) - $concurrency_queue << 'ready' if $concurrency_test # completed read above so send signal in the queue - while $concurrency_test && $concurrency_test_wait # wait till other thread has also completed its read - sleep 1 - end delete_rels_for_nodes(original_ids, node_or_nodes.collect(&:id)) add_rels(node_or_nodes, original_ids) end @@ -66,12 +63,13 @@ def add_rels(node_or_nodes, original_ids) end def delete_rels_for_nodes(original_ids, new_ids) - ids = original_ids.select { |id| !new_ids.include?(id) } - return unless ids.present? + ids_to_be_removed = original_ids - new_ids + return unless ids_to_be_removed.present? + if association.dependent - start_object.public_send("dependent_#{association.dependent}_callback", association, ids) + start_object.public_send("dependent_#{association.dependent}_callback", association, ids_to_be_removed) else - self.where(id: ids).delete_all_rels + self.where(id: ids_to_be_removed).delete_all_rels end end diff --git a/lib/active_graph/transactions.rb b/lib/active_graph/transactions.rb index 44f01cf2e..5ad72f384 100644 --- a/lib/active_graph/transactions.rb +++ b/lib/active_graph/transactions.rb @@ -28,6 +28,10 @@ def read_transaction(**config, &block) alias transaction write_transaction + def lock_node(node) + node.as(:n).query.set('n._LOCK_ = null').exec if tx&.open? || explicit_session&.open? + end + private def send_transaction(method, **config, &block) diff --git a/spec/e2e/relationship/persistence/query_factory_spec.rb b/spec/e2e/relationship/persistence/query_factory_spec.rb index 49de7cb92..6b696e360 100644 --- a/spec/e2e/relationship/persistence/query_factory_spec.rb +++ b/spec/e2e/relationship/persistence/query_factory_spec.rb @@ -103,56 +103,55 @@ def self.count expect(from_node.reload.to_classes).to be_empty end - it 'does not create duplicate has_one relationship' do - from_node.save - to_node.save - begin - $concurrency_test = true - $concurrency_queue = Queue.new - $concurrency_test_wait = true + context 'concurrent update' do + before do + allow(ActiveGraph::Base).to receive(:lock_node).and_wrap_original do |original, *args| + $concurrency_queue << 'ready' + Thread.stop + original.call(*args) + $concurrency_queue << Thread.current + Thread.stop + end + end + after { $concurrency_queue = nil } + let!(:from_node) { FromClass.create(name: 'foo') } + let!(:to_node) { ToClass.create(name: 'bar') } + let(:from_node_two) { FromClass.create(name: 'foo-2') } + + it 'does not create duplicate has_one relationship' do + $concurrency_queue = Thread::Queue.new t1 = Thread.new { to_node.update(from_class: from_node) } t2 = Thread.new { to_node.update(from_class: from_node) } - while $concurrency_queue.size < 2 - # wait till both thread have complted read query - sleep 1 - end - $concurrency_test_wait = false # make threads resume their work - [t1, t2].join # wait for the threads to finish + sleep(0.1) until $concurrency_queue.size == 2 + $concurrency_queue.clear + [t1, t2].each(&:run) + sleep(0.1) until $concurrency_queue.size == 1 && t1.status == 'sleep' && t2.status == 'sleep' + $concurrency_queue.pop.run + sleep(0.1) until !(t1.status && t2.status) - # to_node.from_class only returns 1 result but in db there are two rels associated with this node - # this assertion fails with with result 2 proving the presence of bug expect(ActiveGraph::Base.query("MATCH (node2:`ToClass`)<-[rel1:`HAS_REL`]-(from_class:`FromClass`) return from_class").to_a.size).to eq(1) - ensure - $concurrency_test = nil - $concurrency_test_wait = nil - $concurrency_queue = nil + (t1.status == 'sleep' ? t1.run : t2.run).join + expect(ActiveGraph::Base.query("MATCH (node2:`ToClass`)<-[rel1:`HAS_REL`]-(from_class:`FromClass`) return from_class").to_a.size).to eq(1) end - end - it 'does not create two rels with different nodes in has_one relationship' do - from_node.save - to_node.save - from_node_two = FromClass.create(name: 'foo-2') - begin - $concurrency_test = true - $concurrency_queue = Queue.new - $concurrency_test_wait = true + it 'does not create two rels with different nodes in has_one relationship' do + $concurrency_queue = Thread::Queue.new t1 = Thread.new { to_node.update(from_class: from_node) } t2 = Thread.new { to_node.update(from_class: from_node_two) } - while $concurrency_queue.size < 2 - # wait till both thread have complted read query - sleep 1 - end - $concurrency_test_wait = false # make threads resume their work - [t1, t2].join # wait for the threads to finish + sleep(0.1) until $concurrency_queue.size == 2 + $concurrency_queue.clear + [t1, t2].each(&:run) + sleep(0.1) until $concurrency_queue.size == 1 && t1.status == 'sleep' && t2.status == 'sleep' + $concurrency_queue.pop.run + sleep(0.1) until !(t1.status && t2.status) + + first_assigned_from_class, second_assigned_from_class = t1.status == 'sleep' ? [from_node_two, from_node] : [from_node, from_node_two] + + expect(ToClass.find(to_node.id).from_class.id).to eq(first_assigned_from_class.id) + (t1.status == 'sleep' ? t1.run : t2.run).join - # to_node.from_class only returns 1 result but in db there are two rels associated with this node - # this assertion fails with with result 2 proving the presence of bug + expect(to_node.reload.from_class.id).to eq(second_assigned_from_class.id) expect(ActiveGraph::Base.query("MATCH (node2:`ToClass`)<-[rel1:`HAS_REL`]-(from_class:`FromClass`) return from_class").to_a.size).to eq(1) - ensure - $concurrency_test = nil - $concurrency_test_wait = nil - $concurrency_queue = nil end end