Skip to content

Commit

Permalink
FMWK-338 Limit lower Server version to 6.1 (#793)
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr authored Nov 28, 2024
1 parent 855134c commit a0beac6
Show file tree
Hide file tree
Showing 23 changed files with 768 additions and 982 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface AerospikeConverter extends AerospikeReader<Object>, AerospikeWr
/**
* Key that identifies POJO's class.
*/
String CLASS_KEY = "@_class";
String CLASS_KEY_DEFAULT = "@_class";

/**
* Access Aerospike-specific conversion service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.Map;

import static org.springframework.data.aerospike.convert.AerospikeConverter.CLASS_KEY;
import static org.springframework.data.aerospike.convert.AerospikeConverter.CLASS_KEY_DEFAULT;

public class AerospikeTypeAliasAccessor implements TypeAliasAccessor<Map<String, Object>> {

Expand All @@ -31,7 +31,7 @@ public AerospikeTypeAliasAccessor(String classKey) {
}

public AerospikeTypeAliasAccessor() {
this.classKey = CLASS_KEY;
this.classKey = CLASS_KEY_DEFAULT;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,7 @@ public <T> void delete(Query query, Class<T> entityClass, String setName) {
List<T> findQueryResults = find(query, entityClass, setName).filter(Objects::nonNull).toList();

if (!findQueryResults.isEmpty()) {
if (serverVersionSupport.isBatchWriteSupported()) {
deleteAll(findQueryResults);
} else {
findQueryResults.forEach(this::delete);
}
deleteAll(findQueryResults);
}
}

Expand All @@ -492,11 +488,7 @@ public <T> void deleteByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, S
.collect(Collectors.toUnmodifiableList());

if (!findQueryResults.isEmpty()) {
if (serverVersionSupport.isBatchWriteSupported()) {
deleteAll(findQueryResults);
} else {
findQueryResults.forEach(this::delete);
}
deleteAll(findQueryResults);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ protected Operation[] getPutAndGetHeaderOperations(AerospikeWriteData data, bool

if (bins.length == 0) {
throw new AerospikeException(
"Cannot put and get header on a document with no bins and \"@_class\" bin disabled.");
"Cannot put and get header on a document with no bins and class bin disabled.");
}

return operations(bins, Operation::put, firstlyDeleteBins ? Operation.array(Operation.delete()) : null,
Expand Down Expand Up @@ -524,8 +524,6 @@ protected void validateGroupedKeys(GroupedKeys groupedKeys) {

protected void validateForBatchWrite(Object object, String objectName) {
Assert.notNull(object, objectName + " must not be null!");
Assert.isTrue(batchWriteSupported(), "Batch write operations are supported starting with " +
"server version " + TemplateUtils.SERVER_VERSION_6);
}

protected boolean batchWriteSizeMatch(int batchSize, int currentSize) {
Expand All @@ -536,10 +534,6 @@ protected boolean batchRecordFailed(BatchRecord batchRecord) {
return batchRecord.resultCode != ResultCode.OK || batchRecord.record == null;
}

protected boolean batchWriteSupported() {
return serverVersionSupport.isBatchWriteSupported();
}

protected enum OperationType {
SAVE_OPERATION("save"),
INSERT_OPERATION("insert"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,7 @@ public <T> Mono<Void> delete(Query query, Class<T> entityClass, String setName)

return findQueryResults.flatMap(list -> {
if (!list.isEmpty()) {
if (serverVersionSupport.isBatchWriteSupported()) {
return deleteAll(list);
} else {
list.forEach(this::delete);
return Mono.empty();
}
return deleteAll(list);
}
return Mono.empty();
}
Expand Down Expand Up @@ -484,12 +479,7 @@ public <T> Mono<Void> deleteByIdsUsingQuery(Collection<?> ids, Class<T> entityCl

return findQueryResults.flatMap(list -> {
if (!list.isEmpty()) {
if (serverVersionSupport.isBatchWriteSupported()) {
return deleteAll(list);
} else {
list.forEach(this::delete);
return Mono.empty();
}
return deleteAll(list);
}
return Mono.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.stream.Stream;

import static java.util.function.Predicate.not;
import static org.springframework.data.aerospike.convert.AerospikeConverter.CLASS_KEY;
import static org.springframework.data.aerospike.convert.AerospikeConverter.CLASS_KEY_DEFAULT;
import static org.springframework.data.aerospike.repository.query.CriteriaDefinition.AerospikeNullQueryCriterion;
import static org.springframework.data.aerospike.repository.query.CriteriaDefinition.AerospikeNullQueryCriterion.NULL_PARAM;
import static org.springframework.util.ClassUtils.isAssignable;
Expand Down Expand Up @@ -283,7 +283,7 @@ protected static boolean isAssignableValueOrConverted(Class<?> propertyType, Obj
*/
protected static boolean isPojoMap(Object object, Class<?> propertyType) {
if (object instanceof TreeMap<?, ?> treeMap) {
Object classKey = treeMap.get(CLASS_KEY);
Object classKey = treeMap.get(CLASS_KEY_DEFAULT);
return classKey != null && classKey.equals(propertyType.getName());
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
@Slf4j
public class ServerVersionSupport {

private static final ModuleDescriptor.Version SERVER_VERSION_5_7_0_0 = ModuleDescriptor.Version.parse("5.7.0.0");
private static final ModuleDescriptor.Version SERVER_VERSION_6_0_0_0 = ModuleDescriptor.Version.parse("6.0.0.0");
private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_0 = ModuleDescriptor.Version.parse("6.1.0.0");
private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_1 = ModuleDescriptor.Version.parse("6.1.0.1");
private static final ModuleDescriptor.Version SERVER_VERSION_6_3_0_0 = ModuleDescriptor.Version.parse("6.3.0.0");
Expand Down Expand Up @@ -42,22 +40,18 @@ public void scheduleServerVersionRefresh(long intervalSeconds) {
private String findServerVersion() {
String fullVersionString = InfoCommandUtils.request(client, client.getCluster().getRandomNode(),
"version");

String versionString = fullVersionString.substring(fullVersionString.lastIndexOf(' ') + 1);

if (ModuleDescriptor.Version.parse(versionString).compareTo(SERVER_VERSION_6_1_0_0) < 0) {
throw new UnsupportedOperationException("Minimal supported Aerospike Server version is 6.1");
}
log.debug("Found server version {}", versionString);
return versionString;
}

public boolean isQueryShowSupported() {
return ModuleDescriptor.Version.parse(getServerVersion())
.compareTo(SERVER_VERSION_5_7_0_0) >= 0;
}

public boolean isBatchWriteSupported() {
return ModuleDescriptor.Version.parse(getServerVersion())
.compareTo(SERVER_VERSION_6_0_0_0) >= 0;
}

/**
* @return true if Server version is 6.1 or greater
*/
public boolean isSIndexCardinalitySupported() {
return ModuleDescriptor.Version.parse(getServerVersion())
.compareTo(SERVER_VERSION_6_1_0_0) >= 0;
Expand All @@ -66,6 +60,8 @@ public boolean isSIndexCardinalitySupported() {
/**
* Since Aerospike Server ver. 6.1.0.1 attempting to create a secondary index which already exists or to drop a
* non-existing secondary index returns success/OK instead of an error.
*
* @return true if Server version is 6.1.0.1 or greater
*/
public boolean isDropCreateBehaviorUpdated() {
return ModuleDescriptor.Version.parse(getServerVersion())
Expand All @@ -74,12 +70,17 @@ public boolean isDropCreateBehaviorUpdated() {

/**
* Since Aerospike Server ver. 6.3.0.0 find by Collection Data Types (Collection / Map / POJO) is supported.
*
* @return true if Server version is 6.3 or greater
*/
public boolean isFindByCDTSupported() {
return ModuleDescriptor.Version.parse(getServerVersion())
.compareTo(SERVER_VERSION_6_3_0_0) >= 0;
}

/**
* @return true if Server version is 7.0 or greater
*/
public boolean isServerVersionGtOrEq7() {
return ModuleDescriptor.Version.parse(getServerVersion())
.compareTo(SERVER_VERSION_7_0_0_0) >= 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,14 @@ public List<Person> saveGeneratedPersons(int count) {
}

public <T> void deleteAll(ReactiveAerospikeRepository<T, ?> repository, Collection<T> entities) {
// batch write operations are supported starting with Server version 6.0+
if (serverVersionSupport.isBatchWriteSupported()) {
try {
repository.deleteAll(entities).block();
} catch (AerospikeException.BatchRecordArray ignored) {
// KEY_NOT_FOUND ResultCode causes exception if there are no entities
}
} else {
entities.forEach(entity -> repository.delete(entity).block());
try {
repository.deleteAll(entities).block();
} catch (AerospikeException.BatchRecordArray ignored) {
// KEY_NOT_FOUND ResultCode causes exception if there are no entities
}
}

public <T> void saveAll(ReactiveAerospikeRepository<T, ?> repository, Collection<T> entities) {
// batch write operations are supported starting with Server version 6.0+
if (serverVersionSupport.isBatchWriteSupported()) {
repository.saveAll(entities).blockLast();
} else {
entities.forEach(entity -> repository.save(entity).block());
}
repository.saveAll(entities).blockLast();
}
}
Loading

0 comments on commit a0beac6

Please sign in to comment.