Skip to content

Commit

Permalink
GH-623 - Apply optimistic locking logic during merge, too.
Browse files Browse the repository at this point in the history
Nodes with assigned ids (i.e. UUID based IDs or fully external IDs) are treated as new nodes, i.e. when loaded in one session and stored in another. OGM uses a MERGE statement for those. The optimistic locking clause is now added to that MERGE statement as well.

This fixes #623.
  • Loading branch information
michael-simons committed Jun 7, 2019
1 parent 7ed1042 commit 0216397
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 106 deletions.
27 changes: 27 additions & 0 deletions api/src/main/java/org/neo4j/ogm/model/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.neo4j.ogm.model;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -54,4 +56,29 @@ public interface Node extends PropertyContainer {
Property<String, Long> getVersion();

Set<String> getPreviousDynamicLabels();

/**
* Turns this node object into a row. The version field is treated different, it is not included with the standard properties,
* but only as separate value.
*
* @param nodeIdTarget The name under which to store this nodes id. The value is used as a backreference.
* @return A map of values representing this node.
*/
default Map<String, Object> toRow(String nodeIdTarget) {
Map<String, Object> rowMap = new HashMap<>();
rowMap.put(nodeIdTarget, this.getId());
Map<String, Object> props = new HashMap<>();
for (Property property : this.getPropertyList()) {
// Don't include version property into props, it will be incremented by the query
if (!property.equals(this.getVersion())) {
props.put((String) property.getKey(), property.getValue());
}
}
rowMap.put("props", props);
if (this.hasVersionProperty()) {
Property version = this.getVersion();
rowMap.put((String) version.getKey(), version.getValue());
}
return rowMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ private NodeBuilder newNodeBuilder(Compiler compiler, Object entity, int horizon
nodeBuilder = compiler.newNode(id).addLabels(labels).setPrimaryIndex(primaryIndex);
context.registerNewObject(id, entity);
} else {
nodeBuilder = compiler.existingNode(Long.valueOf(id.toString()));
nodeBuilder = compiler.existingNode(id);
nodeBuilder.addLabels(labels).setPrimaryIndex(primaryIndex);

this.mappingContext.getSnapshotOf(entity).ifPresent(snapshot ->
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
/**
* @author Luanne Misquitta
* @author Mark Angrish
* @author Michael J. Simons
*/
public class DeletedRelationshipEntityStatementBuilder extends BaseBuilder implements CypherStatementBuilder {
public class DeletedRelationshipEntityStatementBuilder implements CypherStatementBuilder {

private final StatementFactory statementFactory;

Expand All @@ -58,7 +59,7 @@ public Statement build() {
queryBuilder.append("UNWIND {rows} AS row MATCH ()-[r]-() WHERE ID(r) = row.relId ");

if (firstEdge.hasVersionProperty()) {
appendVersionPropertyCheck(queryBuilder, firstEdge, "r");
queryBuilder.append(OptimisticLockingUtils.getFragmentForExistingNodesAndRelationships(firstEdge, "r"));
}
queryBuilder.append("DELETE r RETURN ID(r) as ref, ID(r) as id, {type} as type");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@
*/
package org.neo4j.ogm.cypher.compiler.builders.statement;

import java.util.ArrayList;
import static java.util.stream.Collectors.*;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.neo4j.ogm.cypher.compiler.CypherStatementBuilder;
import org.neo4j.ogm.model.Node;
import org.neo4j.ogm.model.Property;
import org.neo4j.ogm.request.OptimisticLockingConfig;
import org.neo4j.ogm.request.Statement;
import org.neo4j.ogm.request.StatementFactory;

/**
* @author Luanne Misquitta
* @author Mark Angrish
* @author Michael J. Simons
*/
public class ExistingNodeStatementBuilder extends BaseBuilder implements CypherStatementBuilder {
public class ExistingNodeStatementBuilder implements CypherStatementBuilder {

private final StatementFactory statementFactory;

Expand All @@ -55,11 +56,11 @@ public Statement build() {
if (existingNodes != null && existingNodes.size() > 0) {
Node firstNode = existingNodes.iterator().next();

queryBuilder.append("UNWIND {rows} as row ")
.append("MATCH (n) WHERE ID(n)=row.nodeId ");
queryBuilder
.append("UNWIND {rows} as row MATCH (n) WHERE ID(n)=row.nodeId ");

if (firstNode.hasVersionProperty()) {
appendVersionPropertyCheck(queryBuilder, firstNode, "n");
queryBuilder.append(OptimisticLockingUtils.getFragmentForExistingNodesAndRelationships(firstNode, "n"));
}

Set<String> previousDynamicLabels = firstNode.getPreviousDynamicLabels();
Expand All @@ -75,24 +76,7 @@ public Statement build() {
}

queryBuilder.append(" SET n += row.props RETURN row.nodeId as ref, ID(n) as id, {type} as type");
List<Map> rows = new ArrayList<>();
for (Node node : existingNodes) {
Map<String, Object> rowMap = new HashMap<>();
rowMap.put("nodeId", node.getId());
Map<String, Object> props = new HashMap<>();
for (Property property : node.getPropertyList()) {
// Don't include version property into props, it will be incremented by the query
if (!property.equals(node.getVersion())) {
props.put((String) property.getKey(), property.getValue());
}
}
rowMap.put("props", props);
if (node.hasVersionProperty()) {
Property version = node.getVersion();
rowMap.put((String) version.getKey(), version.getValue());
}
rows.add(rowMap);
}
List<Map> rows = existingNodes.stream().map(node -> node.toRow("nodeId")).collect(toList());
parameters.put("type", "node");
parameters.put("rows", rows);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* @author Luanne Misquitta
* @author Mark Angrish
*/
public class ExistingRelationshipStatementBuilder extends BaseBuilder implements CypherStatementBuilder {
public class ExistingRelationshipStatementBuilder implements CypherStatementBuilder {

private final StatementFactory statementFactory;

Expand All @@ -56,7 +56,7 @@ public Statement build() {
queryBuilder.append("UNWIND {rows} AS row MATCH ()-[r]->() WHERE ID(r) = row.relId ");

if (firstEdge.hasVersionProperty()) {
appendVersionPropertyCheck(queryBuilder, firstEdge, "r");
queryBuilder.append(OptimisticLockingUtils.getFragmentForExistingNodesAndRelationships(firstEdge, "r"));
}

queryBuilder.append(firstEdge.createPropertyRemovalFragment("r"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@
*/
package org.neo4j.ogm.cypher.compiler.builders.statement;

import java.util.ArrayList;
import static java.util.stream.Collectors.*;
import static org.neo4j.ogm.cypher.compiler.builders.statement.OptimisticLockingUtils.*;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.neo4j.ogm.cypher.compiler.CypherStatementBuilder;
import org.neo4j.ogm.model.Node;
import org.neo4j.ogm.model.Property;
import org.neo4j.ogm.request.OptimisticLockingConfig;
import org.neo4j.ogm.request.Statement;
import org.neo4j.ogm.request.StatementFactory;

/**
* @author Luanne Misquitta
* @author Mark Angrish
* @author Michael J. Simons
*/
public class NewNodeStatementBuilder implements CypherStatementBuilder {

Expand Down Expand Up @@ -73,23 +76,22 @@ public Statement build() {
.append(firstNode.getPrimaryIndex())
.append("}");
}
queryBuilder.append(") "); // Closing MERGE or CREATE

queryBuilder.append(") SET n=row.props RETURN row.nodeRef as ref, ID(n) as id, {type} as type");
List<Map> rows = new ArrayList<>();
for (Node node : newNodes) {
Map<String, Object> rowMap = new HashMap<>();
rowMap.put("nodeRef", node.getId());
Map<String, Object> props = new HashMap<>();
for (Property property : node.getPropertyList()) {
if (property.getValue() != null) {
props.put((String) property.getKey(), property.getValue());
}
}
rowMap.put("props", props);
rows.add(rowMap);
if (firstNode.hasVersionProperty() && firstNode.getPrimaryIndex() != null) {
queryBuilder.append(getFragmentForNewOrExistingNodes(firstNode, "n"));
}

queryBuilder.append("SET n=row.props RETURN row.nodeRef as ref, ID(n) as id, {type} as type");
List<Map> rows = newNodes.stream().map(node -> node.toRow("nodeRef")).collect(toList());
parameters.put("type", "node");
parameters.put("rows", rows);

if (firstNode.hasVersionProperty()) {
OptimisticLockingConfig olConfig = new OptimisticLockingConfig(rows.size(),
firstNode.getLabels(), firstNode.getVersion().getKey());
return statementFactory.statement(queryBuilder.toString(), parameters, olConfig);
}
}

return statementFactory.statement(queryBuilder.toString(), parameters);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.ogm.cypher.compiler.builders.statement;

import org.neo4j.ogm.model.PropertyContainer;

/**
* Provides fragments that implement optimistic locking logic via Cypher.
* The main logic is as follows:
* <ol>
* <li>{@code AND n.`version` = {version}} Ensure that we only match the current version</li>
* <li>{@code SET n.`version` = n.`version` + 1} Take the lock by incrementing the version of the record in this transaction</li>
* <li>{@code WITH n WHERE n.version = {version} + 1} Add a second version check in case another transaction took the lock between our read and upgrade</li>
* </ol>
* For new nodes we take a merge into account and therefor check whether the record to upgrade has a version or not.
* If it doesn't have a version yet, we start at 0.
*
* @author Frantisek Hartman
* @author Michael J. Simons
*/
final class OptimisticLockingUtils {

private final static String VERSION_PROPERTY_CHECK_FOR_EXISTING_NODES_AND_RELATIONSHIPS = ""
+ "AND %1$s.`%2$s` = row.`%2$s` "
+ "SET %1$s.`%2$s` = %1$s.`%2$s` + 1 "
+ "WITH %1$s, row "
+ "WHERE %1$s.`%2$s` = row.`%2$s` + 1 ";

/**
* In case an entity with an externally assigned ID has also a @Version field, the version check is
* slightly different than in the case of BaseBuilder.appendVersionPropertyCheck:
* When the MERGE statement creates a new node, than the version passed to the statement as input
* parameter is null and has to be treated accordingly (cannot compare null with null in the 1st where)
* We also make sure that the first version will be 0.
*/
private final static String VERSION_PROPERTY_CHECK_FOR_NEW_OR_EXISTING_NODES = ""
+ "WITH %1$s, row "
+ "WHERE (row.`%2$s` IS NULL AND %1$s.`%2$s` IS NULL) OR %1$s.`%2$s` = row.`%2$s` "
+ " SET %1$s.`%2$s` = COALESCE(%1$s.`%2$s`, -1) + 1 "
+ "WITH %1$s, row "
+ "WHERE (row.`%2$s` IS NULL OR %1$s.`%2$s` = row.`%2$s` + 1) ";

/**
* @param container node / relationship to check
* @param variable The variable representing the node or relationship to upgrade
* @return
*/
static String getFragmentForExistingNodesAndRelationships(PropertyContainer container, String variable) {
String key = container.getVersion().getKey();
return String.format(VERSION_PROPERTY_CHECK_FOR_EXISTING_NODES_AND_RELATIONSHIPS, variable, key);
}

/**
* @param container node / relationship to check
* @param variable The variable representing the node or relationship to upgrade
* @return
*/
static String getFragmentForNewOrExistingNodes(PropertyContainer container, String variable) {
String key = container.getVersion().getKey();
return String.format(VERSION_PROPERTY_CHECK_FOR_NEW_OR_EXISTING_NODES, variable, key);
}

private OptimisticLockingUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
*/
public class OptimisticLockingChecker {

private static final Logger logger = LoggerFactory.getLogger(OptimisticLockingChecker.class);

private final Neo4jSession session;

public OptimisticLockingChecker(Neo4jSession session) {
Expand Down
Loading

0 comments on commit 0216397

Please sign in to comment.