Skip to content

Commit

Permalink
Merge pull request #15 from haimshalev/dev
Browse files Browse the repository at this point in the history
[rxrepo-core][rxrepo-mem][rxrepo-sql-core] Handled concurrent delete …
  • Loading branch information
denis-itskovich authored Nov 25, 2019
2 parents 4f9bb74 + 3cf8642 commit 47ece78
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ public Maybe<S> update(K key, Function<Maybe<S>, Maybe<S>> updater) {
private static boolean isConcurrencyException(Throwable exception) {
log.debug("Checking exception: {}", exception.getMessage(), exception);
return exception instanceof ConcurrentModificationException ||
exception instanceof NoSuchElementException ||
(exception instanceof CompositeException && ((CompositeException)exception)
.getExceptions()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class MemoryEntityQueryProvider<K, S> implements EntityQueryProvider<K, S>, AutoCloseable {
Expand Down Expand Up @@ -63,12 +64,12 @@ public MetaClassWithKey<K, S> metaClass() {
@Override
public Maybe<S> insertOrUpdate(K key, Function<Maybe<S>, Maybe<S>> entityUpdater) {
return Maybe.defer(() -> {
AtomicReference<S> reference = objects.computeIfAbsent(key, k -> new AtomicReference<>());
AtomicReference<S> oldValue = new AtomicReference<>(reference.get());
Supplier<AtomicReference<S>> referenceResolver = () -> objects.computeIfAbsent(key, k -> new AtomicReference<>());
AtomicReference<S> oldValue = new AtomicReference<>(referenceResolver.get().get());
return entityUpdater
.apply(Optional.ofNullable(reference.get()).map(Maybe::just).orElseGet(Maybe::empty))
.flatMap(e -> reference.compareAndSet(oldValue.get(), e)
? Maybe.just(e)
.apply(Optional.ofNullable(referenceResolver.get().get()).map(Maybe::just).orElseGet(Maybe::empty))
.flatMap(e -> referenceResolver.get().compareAndSet(oldValue.get(), e)
? (e != null ? Maybe.just(e): Maybe.empty())
: Maybe.error(new ConcurrentModificationException("Concurrent modification of " + metaClass.simpleName() + " detected")))
.doOnSuccess(e -> {
if (!Objects.equals(oldValue.get(), e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,32 @@ public <K, S> SqlStatement forDelete(DeleteInfo<K, S> deleteInfo) {
limitClause(deleteInfo)));
}

@Override
public <K, S> SqlStatement forUpdate(MetaClassWithKey<K, S> metaClass, PropertyResolver propertyResolver, ReferenceResolver resolver) {
return forInsertOrUpdate(metaClass, propertyResolver, resolver, false);
}

@Override
public <K, S> SqlStatement forInsertOrUpdate(MetaClassWithKey<K, S> metaClass, PropertyResolver propertyResolver, ReferenceResolver resolver) {
return forInsertOrUpdate(metaClass, propertyResolver, resolver, true);
}

private <K, S> SqlStatement forInsertOrUpdate(MetaClassWithKey<K, S> metaClass, PropertyResolver propertyResolver, ReferenceResolver resolver, boolean forced) {
PropertyMeta<S, K> keyProperty = metaClass.keyProperty();

return statement(() -> of(
"update",
schemaProvider.tableName(metaClass),
"set",
Streams
.fromIterable(propertyResolver.propertyNames())
.flatMap(sqlAssignmentGenerator.toAssignment(metaClass, propertyResolver, resolver))
.collect(Collectors.joining(", ")),
"upsert",
"return after",
"where",
toConditionClause(PropertyExpression.ofObject(keyProperty).eq(propertyResolver.getProperty(keyProperty)))
));
"update",
schemaProvider.tableName(metaClass),
"set",
Streams
.fromIterable(propertyResolver.propertyNames())
.flatMap(sqlAssignmentGenerator.toAssignment(metaClass, propertyResolver, resolver))
.collect(Collectors.joining(", ")),
forced ? "upsert" : "",
"return after",
"where",
toConditionClause(PropertyExpression.ofObject(keyProperty).eq(propertyResolver.getProperty(keyProperty)))
));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,17 @@ public <K, S> Maybe<S> insertOrUpdate(MetaClassWithKey<K, S> metaClass, K key, F
.apply(Maybe.just(oldObj))
.map(newObj -> pr.mergeWith(PropertyResolver.fromObject(metaClass, newObj)))
.filter(newPr -> !pr.equals(newPr))
.flatMap(newPr -> insertOrUpdate(metaClass, newPr).toMaybe());
.flatMap(newPr -> update(metaClass, newPr).toMaybe());
})
.switchIfEmpty(Maybe.defer(() -> entityUpdater
.apply(Maybe.empty())
.flatMap(e -> insert(metaClass, e).toMaybe()))));
}

private <K, S> Single<S> update(MetaClassWithKey<K, S> metaClass, PropertyResolver propertyResolver) {
SqlStatement statement = statementProvider.forUpdate(metaClass, propertyResolver, referenceResolver);
return insertOrUpdate(metaClass, statement);
}

private <K, S> Single<S> insertOrUpdate(MetaClassWithKey<K, S> metaClass, PropertyResolver propertyResolver) {
SqlStatement statement = statementProvider.forInsertOrUpdate(metaClass, propertyResolver, referenceResolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,29 @@

public interface SqlStatementProvider {
<K, S, T> SqlStatement forQuery(QueryInfo<K, S, T> queryInfo);

<K, S, T, R> SqlStatement forAggregation(QueryInfo<K, S, T> queryInfo,
ObjectExpression<T, R> aggregation,
String projectedName);
ObjectExpression<T, R> aggregation,
String projectedName);

<K, S> SqlStatement forUpdate(UpdateInfo<K, S> updateInfo);

<K, S> SqlStatement forDelete(DeleteInfo<K, S> deleteInfo);

<K, S> SqlStatement forInsert(MetaClassWithKey<K, S> metaClass,
PropertyResolver propertyResolver,
ReferenceResolver referenceResolver);
PropertyResolver propertyResolver,
ReferenceResolver referenceResolver);

<K, S> SqlStatement forInsertOrUpdate(MetaClassWithKey<K, S> metaClass,
PropertyResolver propertyResolver,
ReferenceResolver referenceResolver);
PropertyResolver propertyResolver,
ReferenceResolver referenceResolver);

<K, S> SqlStatement forUpdate(MetaClassWithKey<K, S> metaClass,
PropertyResolver propertyResolver,
ReferenceResolver referenceResolver);

<K, S> SqlStatement forDrop(MetaClassWithKey<K, S> metaClass);

SqlStatement forDrop();

default <K, S> SqlStatement forInsertOrUpdate(MetaClassWithKey<K, S> metaClass, S entity, ReferenceResolver referenceResolver) {
Expand Down
Loading

0 comments on commit 47ece78

Please sign in to comment.