Skip to content

Commit

Permalink
implement an in-memory positions storage (for testing)
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Mar 5, 2018
1 parent 07d4d3e commit ea0d293
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 1 deletion.
2 changes: 2 additions & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {

runtime project(":kafka-records-storage")
runtime project(":dynamodb-positions-storage")
runtime project(":inmemory-positions-storage")

compile 'org.lognet:grpc-spring-boot-starter'
compile 'io.grpc:grpc-netty'
Expand All @@ -35,4 +36,5 @@ dependencies {

testCompile project(":kafka-records-storage")
testCompile project(":dynamodb-positions-storage")
testCompile project(":inmemory-positions-storage")
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.github.bsideup.liiklus.dynamodb.DynamoDBPositionsStorage;
import com.github.bsideup.liiklus.dynamodb.config.DynamoDBConfiguration.DynamoDBProperties;
import lombok.SneakyThrows;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Profile;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
Expand All @@ -16,6 +17,7 @@
@Profile("test")
@org.springframework.boot.test.context.TestConfiguration
@Order(Ordered.HIGHEST_PRECEDENCE)
@ConditionalOnProperty(value = "storage.positions.type", havingValue = "DYNAMODB")
public class TestConfiguration {

@SneakyThrows
Expand Down
14 changes: 14 additions & 0 deletions inmemory-positions-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
plugins {
id "java"
}

dependencies {
compileOnly 'org.projectlombok:lombok'
compileOnly 'org.springframework.boot:spring-boot-starter'

compile project(":api")

compile 'io.projectreactor:reactor-core'

testCompileOnly 'org.projectlombok:lombok'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.github.bsideup.liiklus.inmemory;

import com.github.bsideup.liiklus.positions.PositionsStorage;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.FieldDefaults;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* WARNING: this storage type should only be used for testing and NOT in production
*
*/
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true)
public class InMemoryPositionsStorage implements PositionsStorage {

ConcurrentMap<Key, ConcurrentMap<Integer, Long>> storage = new ConcurrentHashMap<>();

@Override
public Publisher<Positions> findAll() {
return Flux.fromIterable(storage.entrySet())
.map(entry -> new Positions(
entry.getKey().getTopic(),
entry.getKey().getGroupId(),
entry.getValue()
));
}

@Override
public CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> partitions, Map<Integer, Long> externalPositions) {
ConcurrentMap<Integer, Long> positions = storage.get(Key.of(topic, groupId));

if (positions == null) {
return CompletableFuture.completedFuture(externalPositions);
}

Map<Integer, Long> result = new HashMap<>();
result.putAll(externalPositions);
result.putAll(positions);

return CompletableFuture.completedFuture(result);
}

@Override
public CompletionStage<Void> update(String topic, String groupId, int partition, long position) {

storage.computeIfAbsent(Key.of(topic, groupId), __ -> new ConcurrentHashMap<>()).put(partition, position);

return CompletableFuture.completedFuture(null);
}

@Value
@RequiredArgsConstructor(staticName = "of")
private static class Key {
String topic;

String groupId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.github.bsideup.liiklus.inmemory.config;

import com.github.bsideup.liiklus.config.ExporterProfile;
import com.github.bsideup.liiklus.config.GatewayProfile;
import com.github.bsideup.liiklus.inmemory.InMemoryPositionsStorage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
@ExporterProfile
@GatewayProfile
@ConditionalOnProperty(value = "storage.positions.type", havingValue = "MEMORY")
public class InMemoryConfiguration {

@Bean
InMemoryPositionsStorage inMemoryPositionsStorage() {
log.warn("\n" +
String.format("%0106d", 0).replace("0", "=") + "\n" +
String.format("%0106d", 0).replace("0", "=") + "\n" +
String.format("%0106d", 0).replace("0", "=") + "\n" +
"=== In-memory position storage is used. Please, DO NOT run it in production if you ACK your positions. ===\n" +
String.format("%0106d", 0).replace("0", "=") + "\n" +
String.format("%0106d", 0).replace("0", "=") + "\n" +
String.format("%0106d", 0).replace("0", "=")
);
return new InMemoryPositionsStorage();
}

}
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ include 'app'
include 'protocol'

include 'kafka-records-storage'
include 'dynamodb-positions-storage'
include 'dynamodb-positions-storage'
include 'inmemory-positions-storage'

0 comments on commit ea0d293

Please sign in to comment.