diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java index 4ddbefea0..3610349f6 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java @@ -3,9 +3,18 @@ import java.time.Duration; import java.util.Collection; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.function.FunctionItemProcessor; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.util.Assert; + import com.redis.riot.CompareStatusItemWriter.StatusCount; +import com.redis.riot.core.RiotUtils; import com.redis.riot.core.Step; +import com.redis.riot.function.StringKeyValue; +import com.redis.riot.function.ToStringKeyValue; import com.redis.spring.batch.item.redis.RedisItemReader; +import com.redis.spring.batch.item.redis.common.KeyValue; import com.redis.spring.batch.item.redis.reader.DefaultKeyComparator; import com.redis.spring.batch.item.redis.reader.KeyComparator; import com.redis.spring.batch.item.redis.reader.KeyComparison; @@ -13,6 +22,7 @@ import com.redis.spring.batch.item.redis.reader.RedisScanSizeEstimator; import io.lettuce.core.codec.ByteArrayCodec; +import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Option; public abstract class AbstractCompareCommand extends AbstractReplicateCommand { @@ -33,6 +43,41 @@ public abstract class AbstractCompareCommand extends AbstractReplicateCommand { @Option(names = "--ttl-tolerance", description = "Max TTL offset in millis to consider keys equal (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>") private long ttlToleranceMillis = DEFAULT_TTL_TOLERANCE.toMillis(); + @ArgGroup(exclusive = false) + private EvaluationContextArgs evaluationContextArgs = new EvaluationContextArgs(); + + @ArgGroup(exclusive = false, heading = "Processor options%n") + private KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs(); + + protected ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor() { + return RiotUtils.processor(new KeyValueFilter<>(ByteArrayCodec.INSTANCE, log), keyValueProcessor()); + } + + protected abstract boolean isStruct(); + + private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> keyValueProcessor() { + if (isIgnoreStreamMessageId()) { + Assert.isTrue(isStruct(), "--no-stream-id can only be used with --struct"); + } + StandardEvaluationContext evaluationContext = evaluationContext(); + log.info("Creating processor with {}", processorArgs); + ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = processorArgs + .processor(evaluationContext); + if (processor == null) { + return null; + } + ToStringKeyValue<byte[]> code = new ToStringKeyValue<>(ByteArrayCodec.INSTANCE); + StringKeyValue<byte[]> decode = new StringKeyValue<>(ByteArrayCodec.INSTANCE); + return RiotUtils.processor(new FunctionItemProcessor<>(code), processor, new FunctionItemProcessor<>(decode)); + } + + private StandardEvaluationContext evaluationContext() { + log.info("Creating SpEL evaluation context with {}", evaluationContextArgs); + StandardEvaluationContext evaluationContext = evaluationContextArgs.evaluationContext(); + configure(evaluationContext); + return evaluationContext; + } + private String compareMessage(Collection<StatusCount> counts) { StringBuilder builder = new StringBuilder(); counts.stream().map(CompareStepListener::toString).forEach(s -> builder.append(STATUS_DELIMITER).append(s)); @@ -70,6 +115,7 @@ protected KeyComparisonItemReader<byte[], byte[]> compareReader() { RedisItemReader<byte[], byte[], Object> target = compareTargetReader(); KeyComparisonItemReader<byte[], byte[]> reader = new KeyComparisonItemReader<>(source, target); reader.setComparator(keyComparator()); + reader.setProcessor(processor()); return reader; } @@ -83,7 +129,9 @@ private KeyComparator<byte[]> keyComparator() { return comparator; } - protected abstract boolean isIgnoreStreamMessageId(); + protected boolean isIgnoreStreamMessageId() { + return !processorArgs.isPropagateIds(); + } private RedisItemReader<byte[], byte[], Object> compareSourceReader() { RedisItemReader<byte[], byte[], Object> reader = compareRedisReader(); @@ -128,4 +176,12 @@ public void setTtlToleranceMillis(long tolerance) { this.ttlToleranceMillis = tolerance; } + public KeyValueProcessorArgs getProcessorArgs() { + return processorArgs; + } + + public void setProcessorArgs(KeyValueProcessorArgs args) { + this.processorArgs = args; + } + } diff --git a/plugins/riot/src/main/java/com/redis/riot/Compare.java b/plugins/riot/src/main/java/com/redis/riot/Compare.java index 9d92eae45..50b99bede 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Compare.java +++ b/plugins/riot/src/main/java/com/redis/riot/Compare.java @@ -14,6 +14,11 @@ public class Compare extends AbstractCompareCommand { @Option(names = "--quick", description = "Skip value comparison.") private boolean quick; + @Override + protected boolean isStruct() { + return true; + } + @Override protected boolean isQuickCompare() { return quick; diff --git a/plugins/riot/src/main/java/com/redis/riot/Replicate.java b/plugins/riot/src/main/java/com/redis/riot/Replicate.java index 10f166cb3..303b9bd8a 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Replicate.java +++ b/plugins/riot/src/main/java/com/redis/riot/Replicate.java @@ -4,15 +4,8 @@ import java.util.List; import org.springframework.batch.core.Job; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.function.FunctionItemProcessor; -import org.springframework.expression.spel.support.StandardEvaluationContext; -import org.springframework.util.Assert; -import com.redis.riot.core.RiotUtils; import com.redis.riot.core.Step; -import com.redis.riot.function.StringKeyValue; -import com.redis.riot.function.ToStringKeyValue; import com.redis.spring.batch.item.redis.RedisItemReader; import com.redis.spring.batch.item.redis.RedisItemReader.ReaderMode; import com.redis.spring.batch.item.redis.RedisItemWriter; @@ -45,12 +38,6 @@ public enum CompareMode { @Option(names = "--struct", description = "Enable data structure-specific replication") private boolean struct; - @ArgGroup(exclusive = false) - private EvaluationContextArgs evaluationContextArgs = new EvaluationContextArgs(); - - @ArgGroup(exclusive = false, heading = "Processor options%n") - private KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs(); - @ArgGroup(exclusive = false) private RedisWriterArgs targetRedisWriterArgs = new RedisWriterArgs(); @@ -76,38 +63,6 @@ protected Job job() { return job(steps); } - @Override - protected boolean isIgnoreStreamMessageId() { - return !processorArgs.isPropagateIds(); - } - - private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor() { - return RiotUtils.processor(new KeyValueFilter<>(ByteArrayCodec.INSTANCE, log), keyValueProcessor()); - } - - private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> keyValueProcessor() { - if (isIgnoreStreamMessageId()) { - Assert.isTrue(isStruct(), "--no-stream-id can only be used with --struct"); - } - StandardEvaluationContext evaluationContext = evaluationContext(); - log.info("Creating processor with {}", processorArgs); - ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = processorArgs - .processor(evaluationContext); - if (processor == null) { - return null; - } - ToStringKeyValue<byte[]> code = new ToStringKeyValue<>(ByteArrayCodec.INSTANCE); - StringKeyValue<byte[]> decode = new StringKeyValue<>(ByteArrayCodec.INSTANCE); - return RiotUtils.processor(new FunctionItemProcessor<>(code), processor, new FunctionItemProcessor<>(decode)); - } - - private StandardEvaluationContext evaluationContext() { - log.info("Creating SpEL evaluation context with {}", evaluationContextArgs); - StandardEvaluationContext evaluationContext = evaluationContextArgs.evaluationContext(); - configure(evaluationContext); - return evaluationContext; - } - @Override protected void configureTargetRedisWriter(RedisItemWriter<?, ?, ?> writer) { super.configureTargetRedisWriter(writer); @@ -198,6 +153,7 @@ public void setTargetRedisWriterArgs(RedisWriterArgs redisWriterArgs) { this.targetRedisWriterArgs = redisWriterArgs; } + @Override public boolean isStruct() { return struct; } @@ -206,14 +162,6 @@ public void setStruct(boolean type) { this.struct = type; } - public KeyValueProcessorArgs getProcessorArgs() { - return processorArgs; - } - - public void setProcessorArgs(KeyValueProcessorArgs args) { - this.processorArgs = args; - } - public boolean isLogKeys() { return logKeys; } diff --git a/plugins/riot/src/test/java/com/redis/riot/RiotTests.java b/plugins/riot/src/test/java/com/redis/riot/RiotTests.java index 66b3a2da8..ee7518736 100644 --- a/plugins/riot/src/test/java/com/redis/riot/RiotTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/RiotTests.java @@ -60,17 +60,17 @@ protected static double abv(Map<String, String> beer) { return Double.parseDouble(beer.get("abv")); } - protected void execute(Replicate replication, TestInfo info) throws Exception { + protected void execute(Replicate replicate, TestInfo info) throws Exception { System.setProperty(SimpleLogger.LOG_KEY_PREFIX + ReplicateWriteLogger.class.getName(), "error"); - replication.getJobArgs().getProgressArgs().setStyle(ProgressStyle.NONE); - replication.setJobName(name(info)); - replication.setJobRepository(jobRepository); - replication.setSourceRedisUri(redisURI); - replication.getSourceRedisArgs().setCluster(getRedisServer().isRedisCluster()); - replication.setTargetRedisUri(targetRedisURI); - replication.getTargetRedisArgs().setCluster(getTargetRedisServer().isRedisCluster()); - replication.getSourceRedisReaderArgs().setIdleTimeout(DEFAULT_IDLE_TIMEOUT_SECONDS); - replication.call(); + replicate.getJobArgs().getProgressArgs().setStyle(ProgressStyle.NONE); + replicate.setJobName(name(info)); + replicate.setJobRepository(jobRepository); + replicate.setSourceRedisUri(redisURI); + replicate.getSourceRedisArgs().setCluster(getRedisServer().isRedisCluster()); + replicate.setTargetRedisUri(targetRedisURI); + replicate.getTargetRedisArgs().setCluster(getTargetRedisServer().isRedisCluster()); + replicate.getSourceRedisReaderArgs().setIdleTimeout(DEFAULT_IDLE_TIMEOUT_SECONDS); + replicate.call(); } @Test diff --git a/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java b/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java index 9629c5a15..67ce1a9b9 100644 --- a/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java @@ -44,7 +44,9 @@ import com.redis.lettucemod.timeseries.MRangeOptions; import com.redis.lettucemod.timeseries.RangeResult; import com.redis.lettucemod.timeseries.TimeRange; +import com.redis.riot.Replicate.CompareMode; import com.redis.riot.core.Expression; +import com.redis.riot.core.ProgressStyle; import com.redis.riot.core.QuietMapAccessor; import com.redis.riot.file.xml.XmlItemReader; import com.redis.riot.file.xml.XmlItemReaderBuilder; @@ -741,6 +743,18 @@ void replicateStruct(TestInfo info) throws Throwable { execute(info, filename); } + @Test + void compareKeyProcessor(TestInfo info) throws Throwable { + GeneratorItemReader gen = generator(1, DataType.HASH); + generate(info, gen); + Long sourceSize = redisCommands.dbsize(); + Assertions.assertTrue(sourceSize > 0); + execute(testInfo(info, "replicate"), "replicate-key-processor-compare-none"); + Assertions.assertEquals(sourceSize, targetRedisCommands.dbsize()); + Assertions.assertEquals(redisCommands.hgetall("gen:1"), targetRedisCommands.hgetall("prefix:gen:1")); + execute(info, "compare-key-processor"); + } + @Test void keyProcessor(TestInfo info) throws Throwable { String key1 = "key1"; diff --git a/plugins/riot/src/test/resources/compare-key-processor b/plugins/riot/src/test/resources/compare-key-processor new file mode 100644 index 000000000..0c3a79009 --- /dev/null +++ b/plugins/riot/src/test/resources/compare-key-processor @@ -0,0 +1 @@ +riot compare --batch 1 --key-proc="prefix:#{key}" redis://source redis://target \ No newline at end of file diff --git a/plugins/riot/src/test/resources/replicate-key-processor-compare-none b/plugins/riot/src/test/resources/replicate-key-processor-compare-none new file mode 100644 index 000000000..c787675f8 --- /dev/null +++ b/plugins/riot/src/test/resources/replicate-key-processor-compare-none @@ -0,0 +1 @@ +riot replicate --batch 1 --key-proc="prefix:#{key}" --compare none redis://source redis://target \ No newline at end of file