Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr committed Oct 5, 2023
1 parent a6c4377 commit faaffb6
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public <T> Flux<T> saveAll(Iterable<T> documents) {

private <T> Flux<T> batchWriteAndCheckForErrors(List<BatchRecord> batchWriteRecords,
List<BatchWriteData<T>> batchWriteDataList, String commandName) {
// requires server ver. >= 6.0.0
Function<Signal<Boolean>, Mono<Optional<Throwable>>> checkForThrowable = signal -> {
if (signal.isOnError()) {
Throwable throwable = signal.getThrowable() == null
Expand All @@ -145,19 +144,21 @@ private <T> Flux<T> batchWriteAndCheckForErrors(List<BatchRecord> batchWriteReco
return Mono.just(Optional.empty());
};

// requires server ver. >= 6.0.0
return reactorClient.operate(null, batchWriteRecords)
.materialize()
.flatMap(checkForThrowable) // returning Throwable if there was an error, otherwise an empty Optional
.flatMap(throwableOptional -> checkForErrorsAndUpdateVersion(throwableOptional, batchWriteDataList, commandName))
.flatMap(throwableOptional -> checkForErrorsAndUpdateVersion(throwableOptional, batchWriteDataList,
commandName))
.flux()
.onErrorMap(throwable ->
new AerospikeException.BatchRecordArray(batchWriteRecords.toArray(BatchRecord[]::new), throwable))
.flatMapIterable(list -> list.stream().map(BatchWriteData::document).toList());
}

private <T> Mono<List<BatchWriteData<T>>> checkForErrorsAndUpdateVersion(Optional<Throwable> throwableOptional,
List<BatchWriteData<T>> batchWriteDataList,
String commandName) {
List<BatchWriteData<T>> batchWriteDataList,
String commandName) {
boolean errorsFound = false;
for (AerospikeTemplate.BatchWriteData<T> data : batchWriteDataList) {
if (!errorsFound && throwableOptional.isEmpty()) {
Expand Down

0 comments on commit faaffb6

Please sign in to comment.