Skip to content

Commit

Permalink
FMWK-199 Refactor implementation of paginated queries (#658)
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr authored Nov 21, 2023
1 parent 070b2a9 commit 0f8c0ae
Show file tree
Hide file tree
Showing 26 changed files with 494 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ public AerospikeTemplate aerospikeTemplate(IAerospikeClient aerospikeClient,
public QueryEngine queryEngine(IAerospikeClient aerospikeClient,
StatementBuilder statementBuilder,
FilterExpressionsBuilder filterExpressionsBuilder) {
QueryEngine queryEngine = new QueryEngine(aerospikeClient, statementBuilder, filterExpressionsBuilder,
aerospikeClient.getQueryPolicyDefault());
QueryEngine queryEngine = new QueryEngine(aerospikeClient, statementBuilder, filterExpressionsBuilder);
boolean scansEnabled = aerospikeDataSettings().isScansEnabled();
log.debug("AerospikeDataSettings.scansEnabled: {}", scansEnabled);
queryEngine.setScansEnabled(scansEnabled);
long queryMaxRecords = aerospikeDataSettings().getQueryMaxRecords();
log.debug("AerospikeDataSettings.queryMaxRecords: {}", queryMaxRecords);
queryEngine.setQueryMaxRecords(queryMaxRecords);
return queryEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@ public ReactorQueryEngine reactorQueryEngine(IAerospikeReactorClient aerospikeRe
StatementBuilder statementBuilder,
FilterExpressionsBuilder filterExpressionsBuilder) {
ReactorQueryEngine queryEngine = new ReactorQueryEngine(aerospikeReactorClient, statementBuilder,
filterExpressionsBuilder, aerospikeReactorClient.getQueryPolicyDefault());
filterExpressionsBuilder);
boolean scansEnabled = aerospikeDataSettings().isScansEnabled();
queryEngine.setScansEnabled(scansEnabled);
log.debug("AerospikeDataSettings.scansEnabled: {}", scansEnabled);
long queryMaxRecords = aerospikeDataSettings().getQueryMaxRecords();
log.debug("AerospikeDataSettings.queryMaxRecords: {}", queryMaxRecords);
queryEngine.setQueryMaxRecords(queryMaxRecords);
return queryEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AerospikeDataSettings {
boolean createIndexesOnStartup = true;
@Builder.Default
int indexCacheRefreshFrequencySeconds = 3600;
@Builder.Default
long queryMaxRecords = 10_000L;

/*
* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.query.ResultSet;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
import org.springframework.data.aerospike.core.model.GroupedEntities;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.repository.query.Query;
Expand Down Expand Up @@ -74,6 +75,11 @@ public interface AerospikeOperations {
*/
IAerospikeClient getAerospikeClient();

/**
* @return value of configuration parameter {@link AerospikeDataSettings#getQueryMaxRecords()}.
*/
long getQueryMaxRecords();

/**
* Save a document.
* <p>
Expand Down Expand Up @@ -741,8 +747,8 @@ <T, S> Object findByIdUsingQuery(Object id, Class<T> entityClass, Class<S> targe
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param targetClass The class to map the document to.
* @param query The {@link Query} to filter results. Optional argument (null if no filtering required).
* @return The documents from Aerospike, returned documents will be mapped to targetClass's type, if no document
* exists, an empty list is returned.
* @return The documents from Aerospike, returned documents will be mapped to targetClass's type if provided
* (otherwise to entityClass's type), if no document exists, an empty list is returned.
*/
<T, S> List<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Class<S> targetClass,
@Nullable Query query);
Expand All @@ -758,8 +764,8 @@ <T, S> List<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Clas
* @param targetClass The class to map the document to.
* @param setName Set name to find the document from.
* @param query The {@link Query} to filter results. Optional argument (null if no filtering required).
* @return The documents from Aerospike, returned documents will be mapped to targetClass's type, if no document
* exists, an empty list is returned.
* @return The documents from Aerospike, returned documents will be mapped to targetClass's type if provided
* (otherwise to entityClass's type), if no document exists, an empty list is returned.
*/
<T, S> List<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Class<S> targetClass, String setName,
@Nullable Query query);
Expand Down Expand Up @@ -896,6 +902,18 @@ <T, S> List<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Clas
*/
<T> Stream<T> findInRange(long offset, long limit, Sort sort, Class<T> targetClass, String setName);

/**
* Find documents in the given entityClass's set using a query and map them to the given target class type. If the
* query has pagination and/or sorting, post-processing must be applied separately.
*
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param targetClass The class to map the document to.
* @param query The {@link Query} to filter results.
* @return A Stream of all matching documents regardless of pagination/sorting, returned documents will be mapped to
* targetClass's type.
*/
<T, S> Stream<S> findUsingQueryWithoutPostProcessing(Class<T> entityClass, Class<S> targetClass, Query query);

/**
* Check if a document exists by providing document id and entityClass (set name will be determined by the given
* entityClass).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import static org.springframework.data.aerospike.core.CoreUtils.verifyUnsortedWithOffset;
import static org.springframework.data.aerospike.core.TemplateUtils.excludeIdQualifier;
import static org.springframework.data.aerospike.core.TemplateUtils.getIdValue;
import static org.springframework.data.aerospike.query.QualifierUtils.getOneIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.getIdQualifier;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;

/**
Expand Down Expand Up @@ -110,6 +110,11 @@ public IAerospikeClient getAerospikeClient() {
return client;
}

@Override
public long getQueryMaxRecords() {
return queryEngine.getQueryMaxRecords();
}

@Override
public void refreshIndexesCache() {
indexRefresher.refreshIndexes();
Expand Down Expand Up @@ -814,7 +819,6 @@ public <T, S> Object findByIdUsingQuery(Object id, Class<T> entityClass, Class<S
Assert.notNull(entityClass, "Entity class must not be null!");
Assert.notNull(setName, "Set name must not be null!");

Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
try {
AerospikePersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(entityClass);
Key key = getKey(id, setName);
Expand Down Expand Up @@ -888,7 +892,7 @@ public <T> Stream<T> find(Query query, Class<T> targetClass, String setName) {
Assert.notNull(targetClass, "Target class must not be null!");
Assert.notNull(setName, "Set name must not be null!");

return findUsingQueryWithPostProcessing(setName, targetClass, query);
return findWithPostProcessing(setName, targetClass, query);
}

private <T> Stream<T> find(Class<T> targetClass, String setName) {
Expand Down Expand Up @@ -934,16 +938,24 @@ public <T> Stream<T> findAll(Sort sort, long offset, long limit, Class<T> target
Assert.notNull(setName, "Set name must not be null!");
Assert.notNull(targetClass, "Target class must not be null!");

return findUsingQualifierWithPostProcessing(setName, targetClass, sort, offset, limit, null);
return findWithPostProcessing(setName, targetClass, sort, offset, limit);
}

private <T> Stream<T> findUsingQueryWithPostProcessing(String setName, Class<T> targetClass, Query query) {
private <T> Stream<T> findWithPostProcessing(String setName, Class<T> targetClass, Query query) {
verifyUnsortedWithOffset(query.getSort(), query.getOffset());
Stream<T> results = findUsingQueryWithDistinctPredicate(setName, targetClass,
getDistinctPredicate(query), query);
return applyPostProcessingOnResults(results, query);
}

@Override
public <T, S> Stream<S> findUsingQueryWithoutPostProcessing(Class<T> entityClass, Class<S> targetClass,
Query query) {
verifyUnsortedWithOffset(query.getSort(), query.getOffset());
return findUsingQueryWithDistinctPredicate(getSetName(entityClass), targetClass,
getDistinctPredicate(query), query);
}

private <T> Stream<T> findUsingQueryWithDistinctPredicate(String setName, Class<T> targetClass,
Predicate<KeyRecord> distinctPredicate,
Query query) {
Expand All @@ -970,7 +982,7 @@ public <T> Stream<T> findInRange(long offset, long limit, Sort sort,
Assert.notNull(targetClass, "Target class must not be null!");
Assert.notNull(setName, "Set name must not be null!");

return findUsingQualifierWithPostProcessing(setName, targetClass, sort, offset, limit, null);
return findWithPostProcessing(setName, targetClass, sort, offset, limit);
}

@Override
Expand Down Expand Up @@ -1044,15 +1056,33 @@ public <T> long count(Query query, Class<T> entityClass) {

@Override
public long count(Query query, String setName) {
Stream<KeyRecord> results = findRecordsUsingQuery(setName, query);
Stream<KeyRecord> results = countRecordsUsingQuery(setName, query);
return results.count();
}

private Stream<KeyRecord> findRecordsUsingQuery(String setName, Query query) {
Assert.notNull(query, "Query must not be null!");
private Stream<KeyRecord> countRecordsUsingQuery(String setName, Query query) {
Assert.notNull(setName, "Set name must not be null!");

return findRecordsUsingQuery(setName, null, query);
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getIdQualifier(qualifier);
if (idQualifier != null) {
// a separate flow for a query with id
return findByIdsWithoutMapping(getIdValue(idQualifier), setName, null,
new Query(excludeIdQualifier(qualifier))).stream();
}
}

KeyRecordIterator recIterator = queryEngine.selectForCount(namespace, setName, query);

return StreamUtils.createStreamFromIterator(recIterator)
.onClose(() -> {
try {
recIterator.close();
} catch (Exception e) {
log.error("Caught exception while closing query", e);
}
});
}

@Override
Expand Down Expand Up @@ -1234,8 +1264,8 @@ private Record putAndGetHeader(AerospikeWriteData data, WritePolicy policy, bool
}

@SuppressWarnings("SameParameterValue")
private <T> Stream<T> findUsingQualifierWithPostProcessing(String setName, Class<T> targetClass, Sort sort,
long offset, long limit, Qualifier qualifier) {
private <T> Stream<T> findWithPostProcessing(String setName, Class<T> targetClass, Sort sort, long offset,
long limit) {
verifyUnsortedWithOffset(sort, offset);
Stream<T> results = find(targetClass, setName);
return applyPostProcessingOnResults(results, sort, offset, limit);
Expand Down Expand Up @@ -1273,11 +1303,11 @@ private <T> Stream<T> applyPostProcessingOnResults(Stream<T> results, Sort sort,
}

private <T> Stream<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
Qualifier idQualifier = getIdQualifier(qualifier);
if (idQualifier != null) {
// a special flow if there is id given
// a separate flow for a query with id
return findByIdsWithoutMapping(getIdValue(idQualifier), setName, targetClass,
new Query(excludeIdQualifier(qualifier))).stream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ static void verifyUnsortedWithOffset(Sort sort, long offset) {
static Predicate<KeyRecord> getDistinctPredicate(Query query) {
Predicate<KeyRecord> distinctPredicate;
if (query != null && query.isDistinct()) {
String dotPathString = query.getCriteria().getCriteriaObject().getDotPath();
String dotPathString = query.getQualifier().getDotPath();
if (StringUtils.hasLength(dotPathString)) {
throw new UnsupportedOperationException("DISTINCT queries are currently supported only for the first " +
"level objects, got a query for " + dotPathString);
}

final Set<Object> distinctValues = ConcurrentHashMap.newKeySet();
distinctPredicate = kr -> {
final String distinctField = query.getCriteria().getCriteriaObject().getField();
final String distinctField = query.getQualifier().getField();
if (kr.record == null || kr.record.bins == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
import org.springframework.data.aerospike.core.model.GroupedEntities;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.repository.query.Query;
Expand Down Expand Up @@ -51,6 +52,11 @@ public interface ReactiveAerospikeOperations {
*/
IAerospikeReactorClient getAerospikeReactorClient();

/**
* @return value of configuration parameter {@link AerospikeDataSettings#getQueryMaxRecords()}.
*/
long getQueryMaxRecords();

/**
* Reactively save document.
* <p>
Expand Down Expand Up @@ -710,8 +716,8 @@ <T, S> Mono<?> findByIdUsingQuery(Object id, Class<T> entityClass, Class<S> targ
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param targetClass The class to map the document to.
* @param query The {@link Query} to filter results. Optional argument (null if no filtering required).
* @return The documents from Aerospike, returned documents will be mapped to targetClass's type, if no document
* exists, an empty list is returned.
* @return The documents from Aerospike, returned documents will be mapped to targetClass's type if provided
* (otherwise to entityClass's type), if no document exists, an empty list is returned.
*/
<T, S> Flux<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Class<S> targetClass,
@Nullable Query query);
Expand All @@ -727,8 +733,8 @@ <T, S> Flux<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Clas
* @param targetClass The class to map the document to.
* @param setName Set name to find the document from.
* @param query The {@link Query} to filter results. Optional argument (null if no filtering required).
* @return The documents from Aerospike, returned documents will be mapped to targetClass's type, if no document
* exists, an empty list is returned.
* @return The documents from Aerospike, returned documents will be mapped to targetClass's type if provided
* (otherwise to entityClass's type), if no document exists, an empty list is returned.
*/
<T, S> Flux<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Class<S> targetClass, String setName,
@Nullable Query query);
Expand Down Expand Up @@ -868,6 +874,18 @@ <T, S> Flux<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Clas
*/
<T, S> Flux<S> findInRange(long offset, long limit, Sort sort, Class<T> entityClass, Class<S> targetClass);

/**
* Reactively find documents in the given entityClass's set using a query and map them to the given target class
* type. If the query has pagination and/or sorting, post-processing must be applied separately.
*
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param targetClass The class to map the document to.
* @param query The {@link Query} to filter results.
* @return A Flux of all matching documents regardless of pagination/sorting, returned documents will be mapped to
* targetClass's type.
*/
<T, S> Flux<S> findUsingQueryWithoutPostProcessing(Class<T> entityClass, Class<S> targetClass, Query query);

/**
* Reactively check if document exists by providing document id and entityClass (set name will be determined by the
* given entityClass).
Expand Down
Loading

0 comments on commit 0f8c0ae

Please sign in to comment.