Skip to content

Commit

Permalink
add fallback to filter Exp for incompatible secondary index
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr committed Jan 5, 2025
1 parent b57a3cb commit 2174c25
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1548,11 +1548,10 @@ public Filter sIndexFilter(Map<QualifierKey, Object> qualifierMap) {
* FilterOperations that require both sIndexFilter and FilterExpression
*/
protected static final List<FilterOperation> dualFilterOperations =
// Arrays.asList(
// MAP_VAL_EQ_BY_KEY, MAP_VAL_GT_BY_KEY, MAP_VAL_GTEQ_BY_KEY, MAP_VAL_LT_BY_KEY, MAP_VAL_LTEQ_BY_KEY,
// MAP_VAL_BETWEEN_BY_KEY, MAP_KEYS_BETWEEN, MAP_VAL_BETWEEN
// );
Arrays.stream(FilterOperation.values()).toList();
Arrays.asList(
MAP_VAL_EQ_BY_KEY, MAP_VAL_GT_BY_KEY, MAP_VAL_GTEQ_BY_KEY, MAP_VAL_LT_BY_KEY, MAP_VAL_LTEQ_BY_KEY,
MAP_VAL_BETWEEN_BY_KEY, MAP_KEYS_BETWEEN, MAP_VAL_BETWEEN
);

@SuppressWarnings("unchecked")
private static Exp processMetadataFieldInOrNot(Map<QualifierKey, Object> qualifierMap, boolean notIn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.aerospike.client.query.Statement;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
Expand All @@ -49,6 +50,7 @@
* @author peter
* @author Anastasiia Smirnova
*/
@Slf4j
public class QueryEngine {

private static final Logger logger = LoggerFactory.getLogger(QueryEngine.class);
Expand Down Expand Up @@ -126,15 +128,24 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
return new KeyRecordIterator(namespace, rs);
} catch (AerospikeException e) {
if (statement.getFilter() != null && SEC_INDEX_ERROR_RESULT_CODES.contains(e.getResultCode())) {
// retry without sIndex filter
statement.setFilter(null);
RecordSet rs = client.query(localQueryPolicy, statement);
return new KeyRecordIterator(namespace, rs);
return retryWithoutSIndexFilter(namespace, qualifier, statement, e);
}
throw e;
}
}

private KeyRecordIterator retryWithoutSIndexFilter(String namespace, Qualifier qualifier, Statement statement,
AerospikeException e) {
// retry without sIndex filter
log.warn("Got secondary index related exception (resultCode: {}), retrying with filter expression only",
e.getResultCode());
qualifier.setHasSecIndexFilter(false);
QueryPolicy localQueryPolicyFallback = getQueryPolicy(qualifier, true);
statement.setFilter(null);
RecordSet rs = client.query(localQueryPolicyFallback, statement);
return new KeyRecordIterator(namespace, rs);
}

/**
* Select records filtered by a query to be counted
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.aerospike.client.reactor.IAerospikeReactorClient;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
Expand All @@ -41,6 +43,7 @@
* @author Sergii Karpenko
* @author Anastasiia Smirnova
*/
@Slf4j
public class ReactorQueryEngine {

private final IAerospikeReactorClient client;
Expand Down Expand Up @@ -111,15 +114,25 @@ public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @
&& statement.getFilter() != null
&& SEC_INDEX_ERROR_RESULT_CODES.contains(ae.getResultCode()))
{
// retry without sIndex filter
statement.setFilter(null);
return client.query(localQueryPolicy, statement);
return retryWithoutSIndexFilter(namespace, qualifier, statement, ae);
}
// for other exceptions
return Mono.error(throwable);
});
}

private Publisher<? extends KeyRecord> retryWithoutSIndexFilter(String namespace, Qualifier qualifier,
Statement statement, AerospikeException ae) {
// retry without sIndex filter
log.warn(
"Got secondary index related exception (resultCode: {}), retrying with filter expression only",
ae.getResultCode());
qualifier.setHasSecIndexFilter(false);
QueryPolicy localQueryPolicyFallback = getQueryPolicy(qualifier, true);
statement.setFilter(null);
return client.query(localQueryPolicyFallback, statement);
}

/**
* Select records filtered by a query to be counted
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ private boolean isIndexedBin(Statement stmt, Qualifier qualifier) {
}

if (log.isDebugEnabled() && hasField) {
log.debug("Qualifier #{}, bin {}.{}.{} has {} secondary index(es)",
qualifier.hashCode(), stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), indexesForField.size());
log.debug("Qualifier #{}, bin {}.{}.{} has {} secondary index(es)", qualifier.hashCode(),
stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), indexesForField.size());
}
return !indexesForField.isEmpty();
}
Expand Down

0 comments on commit 2174c25

Please sign in to comment.