From 2174c25d9e9a8b82ea2fe1042374cf5a461f708e Mon Sep 17 00:00:00 2001 From: agrgr Date: Sun, 5 Jan 2025 17:38:18 +0200 Subject: [PATCH] add fallback to filter Exp for incompatible secondary index --- .../data/aerospike/query/FilterOperation.java | 9 ++++----- .../data/aerospike/query/QueryEngine.java | 19 +++++++++++++++---- .../aerospike/query/ReactorQueryEngine.java | 19 ++++++++++++++++--- .../aerospike/query/StatementBuilder.java | 4 ++-- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/query/FilterOperation.java b/src/main/java/org/springframework/data/aerospike/query/FilterOperation.java index eae7c6ee..08f730e9 100644 --- a/src/main/java/org/springframework/data/aerospike/query/FilterOperation.java +++ b/src/main/java/org/springframework/data/aerospike/query/FilterOperation.java @@ -1548,11 +1548,10 @@ public Filter sIndexFilter(Map qualifierMap) { * FilterOperations that require both sIndexFilter and FilterExpression */ protected static final List dualFilterOperations = -// Arrays.asList( -// MAP_VAL_EQ_BY_KEY, MAP_VAL_GT_BY_KEY, MAP_VAL_GTEQ_BY_KEY, MAP_VAL_LT_BY_KEY, MAP_VAL_LTEQ_BY_KEY, -// MAP_VAL_BETWEEN_BY_KEY, MAP_KEYS_BETWEEN, MAP_VAL_BETWEEN -// ); - Arrays.stream(FilterOperation.values()).toList(); + Arrays.asList( + MAP_VAL_EQ_BY_KEY, MAP_VAL_GT_BY_KEY, MAP_VAL_GTEQ_BY_KEY, MAP_VAL_LT_BY_KEY, MAP_VAL_LTEQ_BY_KEY, + MAP_VAL_BETWEEN_BY_KEY, MAP_KEYS_BETWEEN, MAP_VAL_BETWEEN + ); @SuppressWarnings("unchecked") private static Exp processMetadataFieldInOrNot(Map qualifierMap, boolean notIn) { diff --git a/src/main/java/org/springframework/data/aerospike/query/QueryEngine.java b/src/main/java/org/springframework/data/aerospike/query/QueryEngine.java index de2fa9d4..3fd940a2 100644 --- a/src/main/java/org/springframework/data/aerospike/query/QueryEngine.java +++ b/src/main/java/org/springframework/data/aerospike/query/QueryEngine.java @@ -26,6 +26,7 @@ import com.aerospike.client.query.Statement; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.aerospike.config.AerospikeDataSettings; @@ -49,6 +50,7 @@ * @author peter * @author Anastasiia Smirnova */ +@Slf4j public class QueryEngine { private static final Logger logger = LoggerFactory.getLogger(QueryEngine.class); @@ -126,15 +128,24 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames, return new KeyRecordIterator(namespace, rs); } catch (AerospikeException e) { if (statement.getFilter() != null && SEC_INDEX_ERROR_RESULT_CODES.contains(e.getResultCode())) { - // retry without sIndex filter - statement.setFilter(null); - RecordSet rs = client.query(localQueryPolicy, statement); - return new KeyRecordIterator(namespace, rs); + return retryWithoutSIndexFilter(namespace, qualifier, statement, e); } throw e; } } + private KeyRecordIterator retryWithoutSIndexFilter(String namespace, Qualifier qualifier, Statement statement, + AerospikeException e) { + // retry without sIndex filter + log.warn("Got secondary index related exception (resultCode: {}), retrying with filter expression only", + e.getResultCode()); + qualifier.setHasSecIndexFilter(false); + QueryPolicy localQueryPolicyFallback = getQueryPolicy(qualifier, true); + statement.setFilter(null); + RecordSet rs = client.query(localQueryPolicyFallback, statement); + return new KeyRecordIterator(namespace, rs); + } + /** * Select records filtered by a query to be counted * diff --git a/src/main/java/org/springframework/data/aerospike/query/ReactorQueryEngine.java b/src/main/java/org/springframework/data/aerospike/query/ReactorQueryEngine.java index fd737752..f7a07817 100644 --- a/src/main/java/org/springframework/data/aerospike/query/ReactorQueryEngine.java +++ b/src/main/java/org/springframework/data/aerospike/query/ReactorQueryEngine.java @@ -25,6 +25,8 @@ import com.aerospike.client.reactor.IAerospikeReactorClient; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springframework.data.aerospike.config.AerospikeDataSettings; import org.springframework.data.aerospike.query.qualifier.Qualifier; import org.springframework.data.aerospike.repository.query.Query; @@ -41,6 +43,7 @@ * @author Sergii Karpenko * @author Anastasiia Smirnova */ +@Slf4j public class ReactorQueryEngine { private final IAerospikeReactorClient client; @@ -111,15 +114,25 @@ public Flux select(String namespace, String set, String[] binNames, @ && statement.getFilter() != null && SEC_INDEX_ERROR_RESULT_CODES.contains(ae.getResultCode())) { - // retry without sIndex filter - statement.setFilter(null); - return client.query(localQueryPolicy, statement); + return retryWithoutSIndexFilter(namespace, qualifier, statement, ae); } // for other exceptions return Mono.error(throwable); }); } + private Publisher retryWithoutSIndexFilter(String namespace, Qualifier qualifier, + Statement statement, AerospikeException ae) { + // retry without sIndex filter + log.warn( + "Got secondary index related exception (resultCode: {}), retrying with filter expression only", + ae.getResultCode()); + qualifier.setHasSecIndexFilter(false); + QueryPolicy localQueryPolicyFallback = getQueryPolicy(qualifier, true); + statement.setFilter(null); + return client.query(localQueryPolicyFallback, statement); + } + /** * Select records filtered by a query to be counted * diff --git a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java index 5af284ff..5bd0dd7c 100644 --- a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java +++ b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java @@ -142,8 +142,8 @@ private boolean isIndexedBin(Statement stmt, Qualifier qualifier) { } if (log.isDebugEnabled() && hasField) { - log.debug("Qualifier #{}, bin {}.{}.{} has {} secondary index(es)", - qualifier.hashCode(), stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), indexesForField.size()); + log.debug("Qualifier #{}, bin {}.{}.{} has {} secondary index(es)", qualifier.hashCode(), + stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), indexesForField.size()); } return !indexesForField.isEmpty(); }