Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-199 Refactor implementation of paginated queries #658

Merged
merged 18 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public QueryEngine queryEngine(IAerospikeClient aerospikeClient,
boolean scansEnabled = aerospikeDataSettings().isScansEnabled();
log.debug("AerospikeDataSettings.scansEnabled: {}", scansEnabled);
queryEngine.setScansEnabled(scansEnabled);
int queryMaxRecords = aerospikeDataSettings().getQueryMaxRecords();
log.debug("AerospikeDataSettings.queryMaxRecords: {}", queryMaxRecords);
queryEngine.setQueryMaxRecords(queryMaxRecords);
return queryEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public ReactorQueryEngine reactorQueryEngine(IAerospikeReactorClient aerospikeRe
boolean scansEnabled = aerospikeDataSettings().isScansEnabled();
queryEngine.setScansEnabled(scansEnabled);
log.debug("AerospikeDataSettings.scansEnabled: {}", scansEnabled);
int queryMaxRecords = aerospikeDataSettings().getQueryMaxRecords();
log.debug("AerospikeDataSettings.queryMaxRecords: {}", queryMaxRecords);
queryEngine.setQueryMaxRecords(queryMaxRecords);
return queryEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AerospikeDataSettings {
boolean createIndexesOnStartup = true;
@Builder.Default
int indexCacheRefreshFrequencySeconds = 3600;
@Builder.Default
int queryMaxRecords = 100000;

/*
* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ <T, S> List<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Clas
*/
<T> Stream<T> findInRange(long offset, long limit, Sort sort, Class<T> targetClass, String setName);

<T> Stream<T> findUsingQueryWithoutPostProcessing(Class<?> entityClass, Class<T> targetClass, Query query);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc is required

Copy link
Member

@roimenashe roimenashe Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does entityClass is wrapped with Class<?>, usually when we have entity and target we use Class<T> for entityClass and Class<S> for targetClass, please align with rest of the methods


/**
* Check if a document exists by providing document id and entityClass (set name will be determined by the given
* entityClass).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ public <T, S> Object findByIdUsingQuery(Object id, Class<T> entityClass, Class<S
Assert.notNull(entityClass, "Entity class must not be null!");
Assert.notNull(setName, "Set name must not be null!");

Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
try {
AerospikePersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(entityClass);
Key key = getKey(id, setName);
Expand Down Expand Up @@ -944,6 +944,12 @@ private <T> Stream<T> findUsingQueryWithPostProcessing(String setName, Class<T>
return applyPostProcessingOnResults(results, query);
}

public <T> Stream<T> findUsingQueryWithoutPostProcessing(Class<?> entityClass, Class<T> targetClass, Query query) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Override

verifyUnsortedWithOffset(query.getSort(), query.getOffset());
return findUsingQueryWithDistinctPredicate(getSetName(entityClass), targetClass,
getDistinctPredicate(query), query);
}

private <T> Stream<T> findUsingQueryWithDistinctPredicate(String setName, Class<T> targetClass,
Predicate<KeyRecord> distinctPredicate,
Query query) {
Expand Down Expand Up @@ -1273,7 +1279,7 @@ private <T> Stream<T> applyPostProcessingOnResults(Stream<T> results, Sort sort,
}

private <T> Stream<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
if (idQualifier != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ static void verifyUnsortedWithOffset(Sort sort, long offset) {
static Predicate<KeyRecord> getDistinctPredicate(Query query) {
Predicate<KeyRecord> distinctPredicate;
if (query != null && query.isDistinct()) {
String dotPathString = query.getCriteria().getCriteriaObject().getDotPath();
String dotPathString = query.getQualifier().getDotPath();
if (StringUtils.hasLength(dotPathString)) {
throw new UnsupportedOperationException("DISTINCT queries are currently supported only for the first " +
"level objects, got a query for " + dotPathString);
}

final Set<Object> distinctValues = ConcurrentHashMap.newKeySet();
distinctPredicate = kr -> {
final String distinctField = query.getCriteria().getCriteriaObject().getField();
final String distinctField = query.getQualifier().getField();
if (kr.record == null || kr.record.bins == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,8 @@ <T, S> Flux<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Clas
*/
<T, S> Flux<S> findInRange(long offset, long limit, Sort sort, Class<T> entityClass, Class<S> targetClass);

<T> Flux<T> findUsingQueryWithoutPostProcessing(Class<?> entityClass, Class<T> targetClass, Query query);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc is required

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here Class<?>.


/**
* Reactively check if document exists by providing document id and entityClass (set name will be determined by the
* given entityClass).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,12 @@ private <T> Flux<T> findUsingQueryWithPostProcessing(String setName, Class<T> ta
return results;
}

public <T> Flux<T> findUsingQueryWithoutPostProcessing(Class<?> entityClass, Class<T> targetClass, Query query) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Override

verifyUnsortedWithOffset(query.getSort(), query.getOffset());
return findUsingQueryWithDistinctPredicate(getSetName(entityClass), targetClass,
getDistinctPredicate(query), query);
}

private void verifyUnsortedWithOffset(Sort sort, long offset) {
if ((sort == null || sort.isUnsorted())
&& offset > 0) {
Expand Down Expand Up @@ -1229,7 +1235,7 @@ private <T> Flux<T> findUsingQueryWithDistinctPredicate(String setName, Class<T>
}

private <T> Flux<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
if (idQualifier != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public class FilterExpressionsBuilder {

public Expression build(Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null && excludeIrrelevantFilters(qualifier)) {
return Exp.build(qualifier.toFilterExp());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class QueryEngine {
* scans, set this property to true.
*/
private boolean scansEnabled = false;
private int queryMaxRecords = 100000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be taken from AerospikeDataSettings?


public QueryEngine(IAerospikeClient client, StatementBuilder statementBuilder,
FilterExpressionsBuilder filterExpressionsBuilder, QueryPolicy queryPolicy) {
Expand Down Expand Up @@ -85,7 +86,7 @@ public KeyRecordIterator select(String namespace, String set, @Nullable Query qu
* @return A KeyRecordIterator to iterate over the results
*/
public KeyRecordIterator select(String namespace, String set, String[] binNames, @Nullable Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
/*
* singleton using primary key
*/
Expand All @@ -105,6 +106,7 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
* query with filters
*/
Statement statement = statementBuilder.build(namespace, set, query, binNames);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = new QueryPolicy(queryPolicy);
localQueryPolicy.filterExp = filterExpressionsBuilder.build(query);

Expand All @@ -128,6 +130,11 @@ public void setScansEnabled(boolean scansEnabled) {
this.scansEnabled = scansEnabled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Setter?

}

public void setQueryMaxRecords(int queryMaxRecords) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Setter?

this.queryMaxRecords = queryMaxRecords;
}

@Deprecated(since = "4.6.0", forRemoval = true)
public enum Meta {
KEY,
TTL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ReactorQueryEngine {
* scans, set this property to true.
*/
private boolean scansEnabled = false;
private int queryMaxRecords = 100000;

public ReactorQueryEngine(IAerospikeReactorClient client, StatementBuilder statementBuilder,
FilterExpressionsBuilder filterExpressionsBuilder, QueryPolicy queryPolicy) {
Expand Down Expand Up @@ -81,7 +82,7 @@ public Flux<KeyRecord> select(String namespace, String set, @Nullable Query quer
* @return A Flux<KeyRecord> to iterate over the results
*/
public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @Nullable Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
/*
* singleton using primary key
*/
Expand All @@ -96,6 +97,7 @@ public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @
* query with filters
*/
Statement statement = statementBuilder.build(namespace, set, query, binNames);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = new QueryPolicy(queryPolicy);
localQueryPolicy.filterExp = filterExpressionsBuilder.build(query);
if (!scansEnabled && statement.getFilter() == null) {
Expand All @@ -115,4 +117,8 @@ private Mono<KeyRecord> getRecord(Policy policy, Key key, String[] binNames) {
public void setScansEnabled(boolean scansEnabled) {
this.scansEnabled = scansEnabled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Setter?

}

public void setQueryMaxRecords(int queryMaxRecords) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Setter?

this.queryMaxRecords = queryMaxRecords;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Statement build(String namespace, String set, @Nullable Query query, Stri
}
if (queryCriteriaIsNotNull(query)) {
// statement's filter is set based on the first processed qualifier's filter
setStatementFilterFromQualifiers(stmt, query.getCriteria().getCriteriaObject());
setStatementFilterFromQualifiers(stmt, query.getQualifier());
}
return stmt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.springframework.data.aerospike.repository.query;

import org.springframework.data.aerospike.core.AerospikeOperations;
import org.springframework.data.aerospike.core.AerospikeTemplate;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.domain.PageImpl;
Expand All @@ -40,15 +39,15 @@
*/
public class AerospikePartTreeQuery extends BaseAerospikePartTreeQuery {

private final AerospikeOperations operations;
private final AerospikeTemplate template;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why template instead of operations?


public AerospikePartTreeQuery(QueryMethod queryMethod,
QueryMethodEvaluationContextProvider evalContextProvider,
AerospikeTemplate aerospikeTemplate,
Class<? extends AbstractQueryCreator<?, ?>> queryCreator) {
super(queryMethod, evalContextProvider, queryCreator);

this.operations = aerospikeTemplate;
this.template = aerospikeTemplate;
}

@Override
Expand All @@ -60,32 +59,36 @@ public Object execute(Object[] parameters) {

// queries that include id have their own processing flow
if (parameters != null && parameters.length > 0) {
Qualifier criteria = query.getCriteria().getCriteriaObject();
Qualifier criteria = query.getQualifier();
List<Object> ids;
if (criteria.hasSingleId()) {
ids = getIdValue(criteria);
return operations.findByIdsUsingQuery(ids, entityClass, targetClass, null);
return template.findByIdsUsingQuery(ids, entityClass, targetClass, null);
} else {
Qualifier idQualifier;
if ((idQualifier = getIdQualifier(criteria)) != null) {
ids = getIdValue(idQualifier);
return operations.findByIdsUsingQuery(ids, entityClass, targetClass,
return template.findByIdsUsingQuery(ids, entityClass, targetClass,
new Query(excludeIdQualifier(criteria)));
}
}
}

if (queryMethod.isPageQuery() || queryMethod.isSliceQuery()) {
Stream<?> result = findByQuery(query, targetClass);
List<?> results = result.toList();
Pageable pageable = accessor.getPageable();
long numberOfAllResults = operations.count(query, entityClass);
Stream<?> unprocessedResultsStream =
template.findUsingQueryWithoutPostProcessing(entityClass, targetClass, query);
// Assuming there is enough memory
// and configuration parameter AerospikeDataSettings.queryMaxRecords is less than Integer.MAX_VALUE
List<?> unprocessedResults = unprocessedResultsStream.toList();
long numberOfAllResults = unprocessedResults.size();
List<?> resultsPaginated = applyPostProcessingOnResults(unprocessedResults.stream(), query).toList();

Pageable pageable = accessor.getPageable();
if (queryMethod.isSliceQuery()) {
boolean hasNext = numberOfAllResults > pageable.getPageSize() * (pageable.getOffset() + 1);
return new SliceImpl(results, pageable, hasNext);
return new SliceImpl(resultsPaginated, pageable, hasNext);
} else {
return new PageImpl(results, pageable, numberOfAllResults);
return new PageImpl(resultsPaginated, pageable, numberOfAllResults);
}
} else if (queryMethod.isStreamQuery()) {
return findByQuery(query, targetClass);
Expand All @@ -102,9 +105,9 @@ public Object execute(Object[] parameters) {
private Stream<?> findByQuery(Query query, Class<?> targetClass) {
// Run query and map to different target class.
if (targetClass != null && targetClass != entityClass) {
return operations.find(query, entityClass, targetClass);
return template.find(query, entityClass, targetClass);
}
// Run query and map to entity class type.
return operations.find(query, entityClass);
return template.find(query, entityClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package org.springframework.data.aerospike.repository.query;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.support.PropertyComparator;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.domain.Sort;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.ParametersParameterAccessor;
import org.springframework.data.repository.query.QueryMethod;
Expand All @@ -29,6 +31,8 @@
import org.springframework.util.ClassUtils;

import java.lang.reflect.Constructor;
import java.util.Comparator;
import java.util.stream.Stream;

/**
* @author Peter Milne
Expand Down Expand Up @@ -60,7 +64,7 @@ protected Query prepareQuery(Object[] parameters, ParametersParameterAccessor ac
PartTree tree = new PartTree(queryMethod.getName(), entityClass);
Query baseQuery = createQuery(accessor, tree);

Qualifier criteria = baseQuery.getCriteria().getCriteriaObject();
Qualifier criteria = baseQuery.getQualifier();
Query query = new Query(criteria);

if (accessor.getPageable().isPaged()) {
Expand Down Expand Up @@ -110,4 +114,32 @@ public Query createQuery(ParametersParameterAccessor accessor, PartTree tree) {
.getConstructorIfAvailable(queryCreator, PartTree.class, ParameterAccessor.class);
return (Query) BeanUtils.instantiateClass(constructor, tree, accessor).createQuery();
}

protected <T> Stream<T> applyPostProcessingOnResults(Stream<T> results, Query query) {
if (query.getSort() != null && query.getSort().isSorted()) {
Comparator<T> comparator = getComparator(query);
results = results.sorted(comparator);
}
if (query.hasOffset()) {
results = results.skip(query.getOffset());
}
if (query.hasRows()) {
results = results.limit(query.getRows());
}

return results;
}

protected <T> Comparator<T> getComparator(Query query) {
return query.getSort().stream()
.map(this::<T>getPropertyComparator)
.reduce(Comparator::thenComparing)
.orElseThrow(() -> new IllegalStateException("Comparator can not be created if sort orders are empty"));
}

private <T> Comparator<T> getPropertyComparator(Sort.Order order) {
boolean ignoreCase = true;
boolean ascending = order.getDirection().isAscending();
return new PropertyComparator<>(order.getProperty(), ignoreCase, ascending);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import lombok.Getter;
import lombok.Setter;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
Expand Down Expand Up @@ -71,6 +72,13 @@ public AerospikeCriteria getAerospikeCriteria() {
return (AerospikeCriteria) criteria;
}

/**
* Get the {@link Qualifier} object.
*/
public Qualifier getQualifier() {
return criteria.getCriteriaObject();
}

/**
* Get {@link Sort}.
*/
Expand Down
Loading
Loading