Skip to content

Commit

Permalink
add support for findUsingQuery instead of findByQualifier for reactiv…
Browse files Browse the repository at this point in the history
…e flow
  • Loading branch information
agrgr committed Nov 6, 2023
1 parent 1ec57f1 commit 479b1bb
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.aerospike.client.AerospikeException;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.reactor.IAerospikeReactorClient;
Expand All @@ -30,7 +29,6 @@
import org.springframework.data.mapping.context.MappingContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -767,46 +765,6 @@ <T, S> Flux<?> findByIdsUsingQualifiers(Collection<?> ids, Class<T> entityClass,
*/
<T> Flux<T> find(Query query, Class<T> targetClass, String setName);

/**
* Find all documents in the given entityClass's set using provided {@link Qualifier}.
*
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param filter Secondary index filter.
* @param qualifier Qualifier to build filter expressions from. Must not be {@literal null}. If filter param is
* null and qualifier has {@link Qualifier#getExcludeFilter()} == false, secondary index filter
* is built based on the first processed qualifier.
* @return Flux of entities.
*/
<T> Flux<T> findUsingQualifier(Class<T> entityClass, @Nullable Filter filter, Qualifier qualifier);

/**
* Find all documents in the given entityClass's set using provided {@link Qualifier} and map them to the given
* target class type.
*
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param targetClass The class to map the document to. Must not be {@literal null}.
* @param filter Secondary index filter.
* @param qualifier Qualifier to build filter expressions from. Must not be {@literal null}. If filter param is
* null and qualifier has {@link Qualifier#getExcludeFilter()} == false, secondary index filter
* is built based on the first processed qualifier.
* @return Flux of entities.
*/
<T, S> Flux<?> findUsingQualifier(Class<T> entityClass, Class<S> targetClass, Filter filter, Qualifier qualifier);

/**
* Find all documents in the given set using provided {@link Qualifier} and map them to the given target class
* type.
*
* @param targetClass The class to map the document to. Must not be {@literal null}.
* @param filter Secondary index filter.
* @param qualifier Qualifier to build filter expressions from. Must not be {@literal null}. If filter param is
* null and qualifier has {@link Qualifier#getExcludeFilter()} == false, secondary index filter
* is built based on the first processed qualifier.
* @return Flux of entities.
*/
<T> Flux<T> findUsingQualifier(Class<T> targetClass, String setName, Filter filter,
Qualifier qualifier);

/**
* Reactively find all documents in the given entityClass's set and map them to the given class type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,26 +812,6 @@ public <T> Flux<T> find(Query query, Class<T> targetClass, String setName) {
return findUsingQueryWithPostProcessing(setName, targetClass, query);
}

@Override
public <T> Flux<T> findUsingQualifier(Class<T> entityClass, Filter filter,
Qualifier qualifier) {
return findUsingQualifier(entityClass, getSetName(entityClass), filter, qualifier);
}

@Override
public <T, S> Flux<?> findUsingQualifier(Class<T> entityClass, Class<S> targetClass, Filter filter,
Qualifier qualifier) {
return findRecordsUsingQualifiers(getSetName(entityClass), targetClass, filter, qualifier)
.map(keyRecord -> mapToEntity(keyRecord.key, targetClass, keyRecord.record));
}

@Override
public <T> Flux<T> findUsingQualifier(Class<T> targetClass, String setName, Filter filter,
Qualifier qualifier) {
return findRecordsUsingQualifiers(setName, targetClass, filter, qualifier)
.map(keyRecord -> mapToEntity(keyRecord.key, targetClass, keyRecord.record));
}

@Override
public <T> Flux<T> findAll(Class<T> entityClass) {
Assert.notNull(entityClass, "Entity class must not be null!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* @author peter
* @deprecated Since 4.6.0. Use {@link SimpleAerospikeRepository#findById(Object)} or
* {@link AerospikeRepository#findUsingQuery(org.springframework.data.aerospike.repository.query.Query)} with
* {@link Qualifier#forId(String)}
* {@link Qualifier#idEquals(String)}
*/
@Deprecated(since = "4.6.0", forRemoval = true)
public class KeyQualifier extends Qualifier {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.repository.Repository;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
Expand All @@ -29,13 +30,13 @@
public interface ReactiveAerospikeRepository<T, ID> extends ReactiveCrudRepository<T, ID> {

/**
* Run a query to find entities by providing {@link Qualifier}.
* Run a query to find entities.
* <p>
* A qualifier may contain other qualifiers and combine them using either {@link FilterOperation#AND} or
* {@link FilterOperation#OR}.
* A {@link Query} can be created using a qualifier. A {@link Qualifier} may contain other qualifiers and combine
* them using either {@link FilterOperation#AND} or {@link FilterOperation#OR}.
*
* @param qualifier A qualifiers representing expressions. Must not be {@literal null}.
* @param query A qualifiers representing expressions. Must not be {@literal null}.
* @return Flux of entities.
*/
Flux<T> findByQualifier(Qualifier qualifier);
Flux<T> findUsingQuery(Query query);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import lombok.RequiredArgsConstructor;
import org.reactivestreams.Publisher;
import org.springframework.data.aerospike.core.ReactiveAerospikeOperations;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.repository.ReactiveAerospikeRepository;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.repository.core.EntityInformation;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -154,8 +154,7 @@ public void deleteIndex(Class<T> domainType, String indexName) {
}

@Override
public Flux<T> findByQualifier(Qualifier qualifier) {
Assert.notNull(qualifier, "Qualifiers must not be null");
return operations.findUsingQualifier(entityInformation.getJavaType(), null, qualifier);
public Flux<T> findUsingQuery(Query query) {
return operations.find(query, entityInformation.getJavaType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.repository.query.CriteriaDefinition;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.sample.Address;
import org.springframework.data.aerospike.sample.IndexedPerson;
import org.springframework.data.aerospike.sample.ReactiveIndexedPersonRepository;
Expand Down Expand Up @@ -300,7 +301,7 @@ public void findPersonsByMetadata() {
.setFilterOperation(FilterOperation.LT)
.setValue1AsObj(50000L)
.build();
assertThat(reactiveRepository.findByQualifier(sinceUpdateTimeLt10Seconds).collectList().block())
assertThat(reactiveRepository.findUsingQuery(new Query(sinceUpdateTimeLt10Seconds)).collectList().block())
.containsAll(allIndexedPersons);

// creating a condition "since_update_time metadata value is between 1 millisecond and 50 seconds"
Expand All @@ -310,8 +311,9 @@ public void findPersonsByMetadata() {
.setValue1AsObj(1L)
.setValue2AsObj(50000L)
.build();
assertThat(reactiveRepository.findByQualifier(sinceUpdateTimeBetween1And50000).collectList().block())
.containsAll(reactiveRepository.findByQualifier(sinceUpdateTimeLt10Seconds).collectList().block());
assertThat(reactiveRepository.findUsingQuery(new Query(sinceUpdateTimeBetween1And50000)).collectList().block())
.containsAll(reactiveRepository.findUsingQuery(new Query(sinceUpdateTimeLt10Seconds)).collectList()
.block());
}

@Test
Expand All @@ -331,7 +333,7 @@ public void findPersonsByQualifiers() {
.setFilterOperation(FilterOperation.LT)
.setValue1AsObj(50000L)
.build();
assertThat(reactiveRepository.findByQualifier(sinceUpdateTimeLt50Seconds).collectList().block())
assertThat(reactiveRepository.findUsingQuery(new Query(sinceUpdateTimeLt50Seconds)).collectList().block())
.containsAll(allIndexedPersons);

// creating a condition "since_update_time metadata value is between 1 and 50 seconds"
Expand All @@ -355,7 +357,7 @@ public void findPersonsByQualifiers() {
.setFilterOperation(FilterOperation.EQ)
.setValue1(Value.get(34))
.build();
result = reactiveRepository.findByQualifier(ageEq34).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(ageEq34)).collectList().block();
assertThat(result).containsOnly(petra);

// creating a condition "age is greater than 34"
Expand All @@ -364,48 +366,49 @@ public void findPersonsByQualifiers() {
.setField("age")
.setValue1(Value.get(34))
.build();
result = reactiveRepository.findByQualifier(ageGt34).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(ageGt34)).collectList().block();
assertThat(result).doesNotContain(petra);

result = reactiveRepository.findByQualifier(Qualifier.and(sinceUpdateTimeGt1, sinceUpdateTimeLt50Seconds,
result = reactiveRepository.findUsingQuery(new Query(Qualifier.and(sinceUpdateTimeGt1,
sinceUpdateTimeLt50Seconds,
ageEq34,
firstNameEqPetra, sinceUpdateTimeBetween1And50000)).collectList().block();
firstNameEqPetra, sinceUpdateTimeBetween1And50000))).collectList().block();
assertThat(result).containsOnly(petra);

// conditions "age == 34", "firstName is Petra" and "since_update_time metadata value is less than 50 seconds"
// are combined with OR
Qualifier orWide = Qualifier.or(ageEq34, firstNameEqPetra, sinceUpdateTimeLt50Seconds);
result = reactiveRepository.findByQualifier(orWide).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(orWide)).collectList().block();
assertThat(result).containsAll(allIndexedPersons);

// conditions "age == 34" and "firstName is Petra" are combined with OR
Qualifier orNarrow = Qualifier.or(ageEq34, firstNameEqPetra);
result = reactiveRepository.findByQualifier(orNarrow).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(orNarrow)).collectList().block();
assertThat(result).containsOnly(petra);

result = reactiveRepository.findByQualifier(Qualifier.and(ageEq34, ageGt34)).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(Qualifier.and(ageEq34, ageGt34))).collectList().block();
assertThat(result).isEmpty();

// conditions "age == 34" and "age > 34" are not overlapping
result = reactiveRepository.findByQualifier(Qualifier.and(ageEq34, ageGt34)).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(Qualifier.and(ageEq34, ageGt34))).collectList().block();
assertThat(result).isEmpty();

// conditions "age == 34" and "age > 34" are combined with OR
Qualifier ageEqOrGt34 = Qualifier.or(ageEq34, ageGt34);

result = reactiveRepository.findByQualifier(ageEqOrGt34).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(ageEqOrGt34)).collectList().block();
List<IndexedPerson> personsWithAgeEqOrGt34 = allIndexedPersons.stream().filter(person -> person.getAge() >= 34)
.toList();
assertThat(result).containsAll(personsWithAgeEqOrGt34);

// a condition that returns all entities and a condition that returns one entity are combined using AND
result = reactiveRepository.findByQualifier(Qualifier.and(orWide, orNarrow)).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(Qualifier.and(orWide, orNarrow))).collectList().block();
assertThat(result).containsOnly(petra);

// a condition that returns all entities and a condition that returns one entity are combined using AND
// another way of running the same query
Qualifier orCombinedWithAnd = Qualifier.and(orWide, orNarrow);
result = reactiveRepository.findByQualifier(orCombinedWithAnd).collectList().block();
result = reactiveRepository.findUsingQuery(new Query(orCombinedWithAnd)).collectList().block();
assertThat(result).containsOnly(petra);
}
}

0 comments on commit 479b1bb

Please sign in to comment.