Skip to content

Commit

Permalink
refactoring for reactive flow
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr committed Oct 5, 2023
1 parent faaffb6 commit 5be11d2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,11 @@ public <T> void saveAll(Iterable<T> documents) {
re = translateError(e);
}

failIfErrorsFound(batchWriteRecords, batchWriteDataList, re, "save");
checkForErrorsAndUpdateVersion(batchWriteRecords, batchWriteDataList, re, "save");
}

private <T> void failIfErrorsFound(List<BatchRecord> batchRecords, List<BatchWriteData<T>> batchWriteDataList,
RuntimeException re, String commandName) {
private <T> void checkForErrorsAndUpdateVersion(List<BatchRecord> batchRecords, List<BatchWriteData<T>> batchWriteDataList,
RuntimeException re, String commandName) {
boolean errorsFound = false;
for (BatchWriteData<T> data : batchWriteDataList) {
if (!errorsFound && re == null) {
Expand Down Expand Up @@ -313,7 +313,7 @@ public <T> void insertAll(Iterable<? extends T> documents) {
re = translateError(e);
}

failIfErrorsFound(batchWriteRecords, batchWriteDataList, re, "insert");
checkForErrorsAndUpdateVersion(batchWriteRecords, batchWriteDataList, re, "insert");
}

@Override
Expand Down Expand Up @@ -370,7 +370,7 @@ public <T> void updateAll(Iterable<T> documents) {
re = translateError(e);
}

failIfErrorsFound(batchWriteRecords, batchWriteDataList, re, "update");
checkForErrorsAndUpdateVersion(batchWriteRecords, batchWriteDataList, re, "update");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -56,7 +55,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -135,50 +133,39 @@ public <T> Flux<T> saveAll(Iterable<T> documents) {

private <T> Flux<T> batchWriteAndCheckForErrors(List<BatchRecord> batchWriteRecords,
List<BatchWriteData<T>> batchWriteDataList, String commandName) {
Function<Signal<Boolean>, Mono<Optional<Throwable>>> checkForThrowable = signal -> {
if (signal.isOnError()) {
Throwable throwable = signal.getThrowable() == null
? new IllegalStateException("Got onError signal without Throwable") : signal.getThrowable();
return Mono.just(Optional.of(translateError(throwable)));
}
return Mono.just(Optional.empty());
};

// requires server ver. >= 6.0.0
return reactorClient.operate(null, batchWriteRecords)
.materialize()
.flatMap(checkForThrowable) // returning Throwable if there was an error, otherwise an empty Optional
.flatMap(throwableOptional -> checkForErrorsAndUpdateVersion(throwableOptional, batchWriteDataList,
commandName))
.onErrorMap(this::translateError)
.flatMap(ignore -> checkForErrorsAndUpdateVersion(batchWriteDataList, commandName))
.flux()
.onErrorMap(throwable ->
new AerospikeException.BatchRecordArray(batchWriteRecords.toArray(BatchRecord[]::new), throwable))
.flatMapIterable(list -> list.stream().map(BatchWriteData::document).toList());
}

private <T> Mono<List<BatchWriteData<T>>> checkForErrorsAndUpdateVersion(Optional<Throwable> throwableOptional,
List<BatchWriteData<T>> batchWriteDataList,
private <T> Mono<List<BatchWriteData<T>>> checkForErrorsAndUpdateVersion(List<BatchWriteData<T>> batchWriteDataList,
String commandName) {
boolean errorsFound = false;
for (AerospikeTemplate.BatchWriteData<T> data : batchWriteDataList) {
if (!errorsFound && throwableOptional.isEmpty()) {
if (data.batchRecord().resultCode != ResultCode.OK || data.batchRecord().record == null) {
errorsFound = true;
}
if (!errorsFound && batchRecordFailed(data.batchRecord())) {
errorsFound = true;
}
if (data.hasVersionProperty() && data.batchRecord().record != null) {
if (data.hasVersionProperty() && !batchRecordFailed(data.batchRecord())) {
updateVersion(data.document(), data.batchRecord().record);
}
}

if (errorsFound || throwableOptional.isPresent()) {
return Mono.error(throwableOptional.isEmpty() ? new AerospikeException("Errors during batch " + commandName)
: throwableOptional.get());
if (errorsFound) {
return Mono.error(new AerospikeException("Errors during batch " + commandName));
}

return Mono.just(batchWriteDataList);
}

private boolean batchRecordFailed(BatchRecord batchRecord) {
return batchRecord.resultCode != ResultCode.OK || batchRecord.record == null;
}

@Override
public <T> Mono<T> insert(T document) {
Assert.notNull(document, "Document must not be null!");
Expand Down

0 comments on commit 5be11d2

Please sign in to comment.