From 3cf8642b6e38a64d74cd0c64673f6151311ef28d Mon Sep 17 00:00:00 2001 From: shaleh Date: Mon, 25 Nov 2019 16:19:38 +0200 Subject: [PATCH] [rxrepo-core][rxrepo-mem][rxrepo-sql-core] Handled concurrent delete during conditioned update, invoking write test again to avoid inconsistency --- .../rxrepo/query/DefaultEntitySet.java | 1 + .../rxrepo/mem/MemoryEntityQueryProvider.java | 11 +- .../sql/DefaultSqlStatementProvider.java | 33 ++- .../rxrepo/sql/SqlQueryProvider.java | 6 +- .../rxrepo/sql/SqlStatementProvider.java | 22 +- .../rxrepo/test/AbstractRepositoryTest.java | 227 +++++++++++------- 6 files changed, 187 insertions(+), 113 deletions(-) diff --git a/rxrepo-core/src/main/java/com/slimgears/rxrepo/query/DefaultEntitySet.java b/rxrepo-core/src/main/java/com/slimgears/rxrepo/query/DefaultEntitySet.java index 49949b26..9c8dd1b2 100644 --- a/rxrepo-core/src/main/java/com/slimgears/rxrepo/query/DefaultEntitySet.java +++ b/rxrepo-core/src/main/java/com/slimgears/rxrepo/query/DefaultEntitySet.java @@ -411,6 +411,7 @@ public Maybe update(K key, Function, Maybe> updater) { private static boolean isConcurrencyException(Throwable exception) { log.debug("Checking exception: {}", exception.getMessage(), exception); return exception instanceof ConcurrentModificationException || + exception instanceof NoSuchElementException || (exception instanceof CompositeException && ((CompositeException)exception) .getExceptions() .stream() diff --git a/rxrepo-mem/src/main/java/com/slimgears/rxrepo/mem/MemoryEntityQueryProvider.java b/rxrepo-mem/src/main/java/com/slimgears/rxrepo/mem/MemoryEntityQueryProvider.java index 8eb149ec..ec013d2a 100644 --- a/rxrepo-mem/src/main/java/com/slimgears/rxrepo/mem/MemoryEntityQueryProvider.java +++ b/rxrepo-mem/src/main/java/com/slimgears/rxrepo/mem/MemoryEntityQueryProvider.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; public class MemoryEntityQueryProvider implements EntityQueryProvider, AutoCloseable { @@ -63,12 +64,12 @@ public MetaClassWithKey metaClass() { @Override public Maybe insertOrUpdate(K key, Function, Maybe> entityUpdater) { return Maybe.defer(() -> { - AtomicReference reference = objects.computeIfAbsent(key, k -> new AtomicReference<>()); - AtomicReference oldValue = new AtomicReference<>(reference.get()); + Supplier> referenceResolver = () -> objects.computeIfAbsent(key, k -> new AtomicReference<>()); + AtomicReference oldValue = new AtomicReference<>(referenceResolver.get().get()); return entityUpdater - .apply(Optional.ofNullable(reference.get()).map(Maybe::just).orElseGet(Maybe::empty)) - .flatMap(e -> reference.compareAndSet(oldValue.get(), e) - ? Maybe.just(e) + .apply(Optional.ofNullable(referenceResolver.get().get()).map(Maybe::just).orElseGet(Maybe::empty)) + .flatMap(e -> referenceResolver.get().compareAndSet(oldValue.get(), e) + ? (e != null ? Maybe.just(e): Maybe.empty()) : Maybe.error(new ConcurrentModificationException("Concurrent modification of " + metaClass.simpleName() + " detected"))) .doOnSuccess(e -> { if (!Objects.equals(oldValue.get(), e)) { diff --git a/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/DefaultSqlStatementProvider.java b/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/DefaultSqlStatementProvider.java index e416b5c8..eff9e3d0 100644 --- a/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/DefaultSqlStatementProvider.java +++ b/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/DefaultSqlStatementProvider.java @@ -79,23 +79,32 @@ public SqlStatement forDelete(DeleteInfo deleteInfo) { limitClause(deleteInfo))); } + @Override + public SqlStatement forUpdate(MetaClassWithKey metaClass, PropertyResolver propertyResolver, ReferenceResolver resolver) { + return forInsertOrUpdate(metaClass, propertyResolver, resolver, false); + } + @Override public SqlStatement forInsertOrUpdate(MetaClassWithKey metaClass, PropertyResolver propertyResolver, ReferenceResolver resolver) { + return forInsertOrUpdate(metaClass, propertyResolver, resolver, true); + } + + private SqlStatement forInsertOrUpdate(MetaClassWithKey metaClass, PropertyResolver propertyResolver, ReferenceResolver resolver, boolean forced) { PropertyMeta keyProperty = metaClass.keyProperty(); return statement(() -> of( - "update", - schemaProvider.tableName(metaClass), - "set", - Streams - .fromIterable(propertyResolver.propertyNames()) - .flatMap(sqlAssignmentGenerator.toAssignment(metaClass, propertyResolver, resolver)) - .collect(Collectors.joining(", ")), - "upsert", - "return after", - "where", - toConditionClause(PropertyExpression.ofObject(keyProperty).eq(propertyResolver.getProperty(keyProperty))) - )); + "update", + schemaProvider.tableName(metaClass), + "set", + Streams + .fromIterable(propertyResolver.propertyNames()) + .flatMap(sqlAssignmentGenerator.toAssignment(metaClass, propertyResolver, resolver)) + .collect(Collectors.joining(", ")), + forced ? "upsert" : "", + "return after", + "where", + toConditionClause(PropertyExpression.ofObject(keyProperty).eq(propertyResolver.getProperty(keyProperty))) + )); } @Override diff --git a/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/SqlQueryProvider.java b/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/SqlQueryProvider.java index c85f3cfd..8c6c14d1 100644 --- a/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/SqlQueryProvider.java +++ b/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/SqlQueryProvider.java @@ -83,13 +83,17 @@ public Maybe insertOrUpdate(MetaClassWithKey metaClass, K key, F .apply(Maybe.just(oldObj)) .map(newObj -> pr.mergeWith(PropertyResolver.fromObject(metaClass, newObj))) .filter(newPr -> !pr.equals(newPr)) - .flatMap(newPr -> insertOrUpdate(metaClass, newPr).toMaybe()); + .flatMap(newPr -> update(metaClass, newPr).toMaybe()); }) .switchIfEmpty(Maybe.defer(() -> entityUpdater .apply(Maybe.empty()) .flatMap(e -> insert(metaClass, e).toMaybe())))); } + private Single update(MetaClassWithKey metaClass, PropertyResolver propertyResolver) { + SqlStatement statement = statementProvider.forUpdate(metaClass, propertyResolver, referenceResolver); + return insertOrUpdate(metaClass, statement); + } private Single insertOrUpdate(MetaClassWithKey metaClass, PropertyResolver propertyResolver) { SqlStatement statement = statementProvider.forInsertOrUpdate(metaClass, propertyResolver, referenceResolver); diff --git a/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/SqlStatementProvider.java b/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/SqlStatementProvider.java index 9e761ef8..157ac1f8 100644 --- a/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/SqlStatementProvider.java +++ b/rxrepo-sql-core/src/main/java/com/slimgears/rxrepo/sql/SqlStatementProvider.java @@ -9,19 +9,29 @@ public interface SqlStatementProvider { SqlStatement forQuery(QueryInfo queryInfo); + SqlStatement forAggregation(QueryInfo queryInfo, - ObjectExpression aggregation, - String projectedName); + ObjectExpression aggregation, + String projectedName); + SqlStatement forUpdate(UpdateInfo updateInfo); + SqlStatement forDelete(DeleteInfo deleteInfo); + SqlStatement forInsert(MetaClassWithKey metaClass, - PropertyResolver propertyResolver, - ReferenceResolver referenceResolver); + PropertyResolver propertyResolver, + ReferenceResolver referenceResolver); + SqlStatement forInsertOrUpdate(MetaClassWithKey metaClass, - PropertyResolver propertyResolver, - ReferenceResolver referenceResolver); + PropertyResolver propertyResolver, + ReferenceResolver referenceResolver); + + SqlStatement forUpdate(MetaClassWithKey metaClass, + PropertyResolver propertyResolver, + ReferenceResolver referenceResolver); SqlStatement forDrop(MetaClassWithKey metaClass); + SqlStatement forDrop(); default SqlStatement forInsertOrUpdate(MetaClassWithKey metaClass, S entity, ReferenceResolver referenceResolver) { diff --git a/rxrepo-test/src/main/java/com/slimgears/rxrepo/test/AbstractRepositoryTest.java b/rxrepo-test/src/main/java/com/slimgears/rxrepo/test/AbstractRepositoryTest.java index d5ec1f0a..aad58378 100644 --- a/rxrepo-test/src/main/java/com/slimgears/rxrepo/test/AbstractRepositoryTest.java +++ b/rxrepo-test/src/main/java/com/slimgears/rxrepo/test/AbstractRepositoryTest.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -30,9 +31,12 @@ import static java.util.Objects.requireNonNull; public abstract class AbstractRepositoryTest { - @Rule public final TestName testNameRule = new TestName(); - @Rule public final MethodRule annotationRules = AnnotationRulesJUnit.rule(); - @Rule public final Timeout timeout = new Timeout(60, TimeUnit.SECONDS); + @Rule + public final TestName testNameRule = new TestName(); + @Rule + public final MethodRule annotationRules = AnnotationRulesJUnit.rule(); + @Rule + public final Timeout timeout = new Timeout(60, TimeUnit.SECONDS); private Repository repository; @@ -50,7 +54,8 @@ public void tearDown() { protected abstract Repository createRepository(); - @Test @Ignore + @Test + @Ignore public void testLiveSelectThenInsert() throws InterruptedException { EntitySet productSet = repository.entities(Product.metaClass); @@ -357,7 +362,8 @@ public void testInsertThenRetrieve() throws InterruptedException { .assertValueCount(113); } - @Test @UseLogLevel(LogLevel.TRACE) + @Test + @UseLogLevel(LogLevel.TRACE) public void testInsertThenSearch() throws InterruptedException { EntitySet productSet = repository.entities(Product.metaClass); Iterable products = Products.createMany(100); @@ -404,7 +410,8 @@ public void testQueryWithEmptyMapping() { .assertValue(l -> l.size() == 1); } - @Test @Ignore + @Test + @Ignore public void testInsertThenUpdate() throws InterruptedException { EntitySet productSet = repository.entities(Product.metaClass); Iterable products = Products.createMany(200); @@ -447,22 +454,23 @@ public void testInsertThenUpdate() throws InterruptedException { @Test public void testObserveReferencedObjectProperties() { repository.entities(Product.metaClass) - .update(Products.createOne().toBuilder().inventory(null).build()) - .ignoreElement() - .blockingAwait(); + .update(Products.createOne().toBuilder().inventory(null).build()) + .ignoreElement() + .blockingAwait(); repository.entities(Product.metaClass) - .query() - .liveSelect() - .properties(Product.$.inventory.name) - .observeAs(Notifications.toList()) - .test() - .awaitCount(1) - .assertValue(l -> l.size() == 1) - .assertValue(l -> l.get(0).inventory() == null); + .query() + .liveSelect() + .properties(Product.$.inventory.name) + .observeAs(Notifications.toList()) + .test() + .awaitCount(1) + .assertValue(l -> l.size() == 1) + .assertValue(l -> l.get(0).inventory() == null); } - @Test @UseLogLevel(LogLevel.TRACE) + @Test + @UseLogLevel(LogLevel.TRACE) public void testPartialRetrieve() throws InterruptedException { EntitySet productSet = repository.entities(Product.metaClass); Iterable products = Products.createMany(10); @@ -784,7 +792,8 @@ public void testQueryByNestedEmbeddedObject() throws InterruptedException { .assertValueAt(0, p -> requireNonNull(p.vendor()).id().equals(vendorId)); } - @Test @UseLogLevel(LogLevel.TRACE) + @Test + @UseLogLevel(LogLevel.TRACE) public void testObserveAsList() { EntitySet products = repository.entities(Product.metaClass); products.update(Products.createMany(10)).ignoreElement().blockingAwait(); @@ -989,120 +998,122 @@ public void testLiveQueryWithProjection() throws InterruptedException { .assertValueAt(10, n -> ProductPrototype.Type.ComputerSoftware.equals(requireNonNull(n.newValue()).type())); } - @Test @UseLogLevel(LogLevel.TRACE) + @Test + @UseLogLevel(LogLevel.TRACE) public void testLiveSelectWithMapping() throws InterruptedException { repository.entities(Product.metaClass) - .update(Products.createMany(10)) - .test() - .await() - .assertNoErrors(); + .update(Products.createMany(10)) + .test() + .await() + .assertNoErrors(); TestObserver> inventoriesObserver = repository.entities(Product.metaClass) - .query() - .liveSelect(Product.$.inventory) - .observeAs(Notifications.toList()) - .doOnNext(n -> n.forEach(System.out::println)) - .debounce(500, TimeUnit.MILLISECONDS) - .test() - .assertSubscribed(); + .query() + .liveSelect(Product.$.inventory) + .observeAs(Notifications.toList()) + .doOnNext(n -> n.forEach(System.out::println)) + .debounce(500, TimeUnit.MILLISECONDS) + .test() + .assertSubscribed(); inventoriesObserver - .assertOf(countAtLeast(1)) - .assertNoTimeout() - .assertNoErrors() - .assertValueAt(0, l -> l.size() == 10) - .assertValueAt(0, l -> l.get(0) != null); + .assertOf(countAtLeast(1)) + .assertNoTimeout() + .assertNoErrors() + .assertValueAt(0, l -> l.size() == 10) + .assertValueAt(0, l -> l.get(0) != null); repository.entities(Product.metaClass) - .update(Products.createMany(11)) - .ignoreElement() - .blockingAwait(); + .update(Products.createMany(11)) + .ignoreElement() + .blockingAwait(); inventoriesObserver - .assertOf(countAtLeast(2)) - .assertNoTimeout() - .assertNoErrors() - .assertValueAt(1, l -> l.size() == 11) - .assertValueAt(1, l -> l.get(10) != null); + .assertOf(countAtLeast(2)) + .assertNoTimeout() + .assertNoErrors() + .assertValueAt(1, l -> l.size() == 11) + .assertValueAt(1, l -> l.get(10) != null); } @Test public void testLiveAggregateWithMapping() { TestObserver inventoriesObserver = repository.entities(Product.metaClass) - .query() - .map(Product.$.inventory) - .observeCount() - .filter(c -> c > 0) - .test() - .assertSubscribed(); + .query() + .map(Product.$.inventory) + .observeCount() + .filter(c -> c > 0) + .test() + .assertSubscribed(); repository.entities(Product.metaClass) - .update(Products.createMany(10)) - .test() - .awaitCount(10) - .assertNoErrors(); + .update(Products.createMany(10)) + .test() + .awaitCount(10) + .assertNoErrors(); inventoriesObserver - .assertOf(countAtLeast(1)) - .assertNoTimeout() - .assertNoErrors(); + .assertOf(countAtLeast(1)) + .assertNoTimeout() + .assertNoErrors(); } @SuppressWarnings("ConstantConditions") @Test public void testLiveQueryThenUpdate() { repository.entities(Product.metaClass) - .update(Products.createMany(10)) - .ignoreElement() - .blockingAwait(); + .update(Products.createMany(10)) + .ignoreElement() + .blockingAwait(); TestObserver> productObserver = repository.entities(Product.metaClass) - .query() - .where(Product.$.name.lessThan("Product 5")) - .liveSelect() - .observe(Product.$.name, Product.$.price) - .test() - .assertSubscribed(); + .query() + .where(Product.$.name.lessThan("Product 5")) + .liveSelect() + .observe(Product.$.name, Product.$.price) + .test() + .assertSubscribed(); Product product1 = repository.entities(Product.metaClass) - .find(UniqueId.productId(1)) - .blockingGet(); + .find(UniqueId.productId(1)) + .blockingGet(); Product product8 = repository.entities(Product.metaClass) - .find(UniqueId.productId(8)) - .blockingGet(); + .find(UniqueId.productId(8)) + .blockingGet(); repository.entities(Product.metaClass) - .update(product8.toBuilder().name(product8.name() + " - Updated").build()) - .ignoreElement() - .blockingAwait(); + .update(product8.toBuilder().name(product8.name() + " - Updated").build()) + .ignoreElement() + .blockingAwait(); repository.entities(Product.metaClass) - .update(product1.toBuilder().name(product1.name() + " - Updated").build()) - .ignoreElement() - .blockingAwait(); + .update(product1.toBuilder().name(product1.name() + " - Updated").build()) + .ignoreElement() + .blockingAwait(); productObserver.awaitCount(1) - .assertValueAt(0, NotificationPrototype::isModify) - .assertValueAt(0, p -> p.oldValue().name().equals("Product 1")) - .assertValueAt(0, p -> p.newValue().name().equals("Product 1 - Updated")); + .assertValueAt(0, NotificationPrototype::isModify) + .assertValueAt(0, p -> p.oldValue().name().equals("Product 1")) + .assertValueAt(0, p -> p.newValue().name().equals("Product 1 - Updated")); repository.entities(Product.metaClass) - .update(product1.toBuilder().productionDate(new Date(product1.productionDate().getTime() + 1)).build()) - .ignoreElement() - .blockingAwait(); + .update(product1.toBuilder().productionDate(new Date(product1.productionDate().getTime() + 1)).build()) + .ignoreElement() + .blockingAwait(); repository.entities(Product.metaClass) - .update(product1.toBuilder().price(product1.price() + 1).build()) - .ignoreElement() - .blockingAwait(); + .update(product1.toBuilder().price(product1.price() + 1).build()) + .ignoreElement() + .blockingAwait(); productObserver.awaitCount(2) - .assertValueAt(1, NotificationPrototype::isModify) - .assertValueAt(1, p -> p.newValue().productionDate().getTime() - p.oldValue().productionDate().getTime() == 1); + .assertValueAt(1, NotificationPrototype::isModify) + .assertValueAt(1, p -> p.newValue().productionDate().getTime() - p.oldValue().productionDate().getTime() == 1); } - @Test @UseLogLevel(LogLevel.TRACE) + @Test + @UseLogLevel(LogLevel.TRACE) public void testRetrieveWithReferenceProperty() { TestObserver productTestObserver = repository.entities(Product.metaClass) .query() @@ -1138,7 +1149,8 @@ public void testRetrieveWithReferenceProperty() { .assertValue(p -> p.inventory() != null); } - @Test @UseLogLevel(LogLevel.DEBUG) + @Test + @UseLogLevel(LogLevel.DEBUG) public void testLargeUpdate() throws InterruptedException { Observable.fromIterable(Products.createMany(2000)) .flatMapSingle(repository.entities(Product.metaClass)::update) @@ -1168,4 +1180,41 @@ public void testSearchWithSpecialSymbols() throws InterruptedException { .assertNoErrors() .assertValueCount(1); } + + @Test + @UseLogLevel(LogLevel.DEBUG) + public void testUpdatingDeletedObjectShouldNotAddObjectIntoRepo() throws InterruptedException { + + EntitySet entities = repository.entities(Product.metaClass); + Product product = Products + .createOne() + .toBuilder() + .name("TestProduct") + .build(); + + // add stub product to the repo + entities.update(product) + .ignoreElement() + .blockingAwait(); + + AtomicBoolean deleted = new AtomicBoolean(false); + entities.update(product.key(), productMaybe -> productMaybe.map(p -> { + // deleting the object from the repo just before update + if (deleted.compareAndSet(false, true)) + entities.delete(p.key()).blockingAwait(); + + return p.toBuilder().name("updatedName").build(); + })) + .test() + .await() + .assertValueCount(0) + .assertComplete(); + + //verify that there is nothing in the collection + entities.query() + .count() + .test() + .await() + .assertValue(0L); + } }