From 5be11d28856744d0c423e81d7d42b30f83089261 Mon Sep 17 00:00:00 2001 From: agrgr Date: Thu, 5 Oct 2023 17:51:57 +0300 Subject: [PATCH] refactoring for reactive flow --- .../aerospike/core/AerospikeTemplate.java | 10 ++--- .../core/ReactiveAerospikeTemplate.java | 37 ++++++------------- 2 files changed, 17 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java index fe0ff0a3a..aedba3109 100644 --- a/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/AerospikeTemplate.java @@ -244,11 +244,11 @@ public void saveAll(Iterable documents) { re = translateError(e); } - failIfErrorsFound(batchWriteRecords, batchWriteDataList, re, "save"); + checkForErrorsAndUpdateVersion(batchWriteRecords, batchWriteDataList, re, "save"); } - private void failIfErrorsFound(List batchRecords, List> batchWriteDataList, - RuntimeException re, String commandName) { + private void checkForErrorsAndUpdateVersion(List batchRecords, List> batchWriteDataList, + RuntimeException re, String commandName) { boolean errorsFound = false; for (BatchWriteData data : batchWriteDataList) { if (!errorsFound && re == null) { @@ -313,7 +313,7 @@ public void insertAll(Iterable documents) { re = translateError(e); } - failIfErrorsFound(batchWriteRecords, batchWriteDataList, re, "insert"); + checkForErrorsAndUpdateVersion(batchWriteRecords, batchWriteDataList, re, "insert"); } @Override @@ -370,7 +370,7 @@ public void updateAll(Iterable documents) { re = translateError(e); } - failIfErrorsFound(batchWriteRecords, batchWriteDataList, re, "update"); + checkForErrorsAndUpdateVersion(batchWriteRecords, batchWriteDataList, re, "update"); } @Override diff --git a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java index 10769dde1..99dca9897 100644 --- a/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java +++ b/src/main/java/org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.java @@ -47,7 +47,6 @@ import org.springframework.util.Assert; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Signal; import java.util.ArrayList; import java.util.Arrays; @@ -56,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -135,50 +133,39 @@ public Flux saveAll(Iterable documents) { private Flux batchWriteAndCheckForErrors(List batchWriteRecords, List> batchWriteDataList, String commandName) { - Function, Mono>> checkForThrowable = signal -> { - if (signal.isOnError()) { - Throwable throwable = signal.getThrowable() == null - ? new IllegalStateException("Got onError signal without Throwable") : signal.getThrowable(); - return Mono.just(Optional.of(translateError(throwable))); - } - return Mono.just(Optional.empty()); - }; - // requires server ver. >= 6.0.0 return reactorClient.operate(null, batchWriteRecords) - .materialize() - .flatMap(checkForThrowable) // returning Throwable if there was an error, otherwise an empty Optional - .flatMap(throwableOptional -> checkForErrorsAndUpdateVersion(throwableOptional, batchWriteDataList, - commandName)) + .onErrorMap(this::translateError) + .flatMap(ignore -> checkForErrorsAndUpdateVersion(batchWriteDataList, commandName)) .flux() .onErrorMap(throwable -> new AerospikeException.BatchRecordArray(batchWriteRecords.toArray(BatchRecord[]::new), throwable)) .flatMapIterable(list -> list.stream().map(BatchWriteData::document).toList()); } - private Mono>> checkForErrorsAndUpdateVersion(Optional throwableOptional, - List> batchWriteDataList, + private Mono>> checkForErrorsAndUpdateVersion(List> batchWriteDataList, String commandName) { boolean errorsFound = false; for (AerospikeTemplate.BatchWriteData data : batchWriteDataList) { - if (!errorsFound && throwableOptional.isEmpty()) { - if (data.batchRecord().resultCode != ResultCode.OK || data.batchRecord().record == null) { - errorsFound = true; - } + if (!errorsFound && batchRecordFailed(data.batchRecord())) { + errorsFound = true; } - if (data.hasVersionProperty() && data.batchRecord().record != null) { + if (data.hasVersionProperty() && !batchRecordFailed(data.batchRecord())) { updateVersion(data.document(), data.batchRecord().record); } } - if (errorsFound || throwableOptional.isPresent()) { - return Mono.error(throwableOptional.isEmpty() ? new AerospikeException("Errors during batch " + commandName) - : throwableOptional.get()); + if (errorsFound) { + return Mono.error(new AerospikeException("Errors during batch " + commandName)); } return Mono.just(batchWriteDataList); } + private boolean batchRecordFailed(BatchRecord batchRecord) { + return batchRecord.resultCode != ResultCode.OK || batchRecord.record == null; + } + @Override public Mono insert(T document) { Assert.notNull(document, "Document must not be null!");