diff --git a/app/build.gradle b/app/build.gradle index b0de08c3..ec97e81b 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -19,6 +19,7 @@ dependencies { runtime project(":kafka-records-storage") runtime project(":dynamodb-positions-storage") + runtime project(":inmemory-positions-storage") compile 'org.lognet:grpc-spring-boot-starter' compile 'io.grpc:grpc-netty' @@ -35,4 +36,5 @@ dependencies { testCompile project(":kafka-records-storage") testCompile project(":dynamodb-positions-storage") + testCompile project(":inmemory-positions-storage") } diff --git a/app/src/test/java/com/github/bsideup/liiklus/test/TestConfiguration.java b/app/src/test/java/com/github/bsideup/liiklus/test/TestConfiguration.java index 473192c3..11bf16c8 100644 --- a/app/src/test/java/com/github/bsideup/liiklus/test/TestConfiguration.java +++ b/app/src/test/java/com/github/bsideup/liiklus/test/TestConfiguration.java @@ -5,6 +5,7 @@ import com.github.bsideup.liiklus.dynamodb.DynamoDBPositionsStorage; import com.github.bsideup.liiklus.dynamodb.config.DynamoDBConfiguration.DynamoDBProperties; import lombok.SneakyThrows; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Profile; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; @@ -16,6 +17,7 @@ @Profile("test") @org.springframework.boot.test.context.TestConfiguration @Order(Ordered.HIGHEST_PRECEDENCE) +@ConditionalOnProperty(value = "storage.positions.type", havingValue = "DYNAMODB") public class TestConfiguration { @SneakyThrows diff --git a/inmemory-positions-storage/build.gradle b/inmemory-positions-storage/build.gradle new file mode 100644 index 00000000..7c51476f --- /dev/null +++ b/inmemory-positions-storage/build.gradle @@ -0,0 +1,14 @@ +plugins { + id "java" +} + +dependencies { + compileOnly 'org.projectlombok:lombok' + compileOnly 'org.springframework.boot:spring-boot-starter' + + compile project(":api") + + compile 'io.projectreactor:reactor-core' + + testCompileOnly 'org.projectlombok:lombok' +} diff --git a/inmemory-positions-storage/src/main/java/com/github/bsideup/liiklus/inmemory/InMemoryPositionsStorage.java b/inmemory-positions-storage/src/main/java/com/github/bsideup/liiklus/inmemory/InMemoryPositionsStorage.java new file mode 100644 index 00000000..137e1370 --- /dev/null +++ b/inmemory-positions-storage/src/main/java/com/github/bsideup/liiklus/inmemory/InMemoryPositionsStorage.java @@ -0,0 +1,68 @@ +package com.github.bsideup.liiklus.inmemory; + +import com.github.bsideup.liiklus.positions.PositionsStorage; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import lombok.experimental.FieldDefaults; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * WARNING: this storage type should only be used for testing and NOT in production + * + */ +@RequiredArgsConstructor +@FieldDefaults(makeFinal = true) +public class InMemoryPositionsStorage implements PositionsStorage { + + ConcurrentMap> storage = new ConcurrentHashMap<>(); + + @Override + public Publisher findAll() { + return Flux.fromIterable(storage.entrySet()) + .map(entry -> new Positions( + entry.getKey().getTopic(), + entry.getKey().getGroupId(), + entry.getValue() + )); + } + + @Override + public CompletionStage> fetch(String topic, String groupId, Set partitions, Map externalPositions) { + ConcurrentMap positions = storage.get(Key.of(topic, groupId)); + + if (positions == null) { + return CompletableFuture.completedFuture(externalPositions); + } + + Map result = new HashMap<>(); + result.putAll(externalPositions); + result.putAll(positions); + + return CompletableFuture.completedFuture(result); + } + + @Override + public CompletionStage update(String topic, String groupId, int partition, long position) { + + storage.computeIfAbsent(Key.of(topic, groupId), __ -> new ConcurrentHashMap<>()).put(partition, position); + + return CompletableFuture.completedFuture(null); + } + + @Value + @RequiredArgsConstructor(staticName = "of") + private static class Key { + String topic; + + String groupId; + } +} diff --git a/inmemory-positions-storage/src/main/java/com/github/bsideup/liiklus/inmemory/config/InMemoryConfiguration.java b/inmemory-positions-storage/src/main/java/com/github/bsideup/liiklus/inmemory/config/InMemoryConfiguration.java new file mode 100644 index 00000000..14206719 --- /dev/null +++ b/inmemory-positions-storage/src/main/java/com/github/bsideup/liiklus/inmemory/config/InMemoryConfiguration.java @@ -0,0 +1,32 @@ +package com.github.bsideup.liiklus.inmemory.config; + +import com.github.bsideup.liiklus.config.ExporterProfile; +import com.github.bsideup.liiklus.config.GatewayProfile; +import com.github.bsideup.liiklus.inmemory.InMemoryPositionsStorage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@Configuration +@ExporterProfile +@GatewayProfile +@ConditionalOnProperty(value = "storage.positions.type", havingValue = "MEMORY") +public class InMemoryConfiguration { + + @Bean + InMemoryPositionsStorage inMemoryPositionsStorage() { + log.warn("\n" + + String.format("%0106d", 0).replace("0", "=") + "\n" + + String.format("%0106d", 0).replace("0", "=") + "\n" + + String.format("%0106d", 0).replace("0", "=") + "\n" + + "=== In-memory position storage is used. Please, DO NOT run it in production if you ACK your positions. ===\n" + + String.format("%0106d", 0).replace("0", "=") + "\n" + + String.format("%0106d", 0).replace("0", "=") + "\n" + + String.format("%0106d", 0).replace("0", "=") + ); + return new InMemoryPositionsStorage(); + } + +} diff --git a/settings.gradle b/settings.gradle index a2e075e3..54fe7000 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,4 +3,5 @@ include 'app' include 'protocol' include 'kafka-records-storage' -include 'dynamodb-positions-storage' \ No newline at end of file +include 'dynamodb-positions-storage' +include 'inmemory-positions-storage' \ No newline at end of file