Skip to content

Commit

Permalink
Group versions (#4)
Browse files Browse the repository at this point in the history
* WIP: group versions

* Move offsets logic to the service out of the records storage, introduce GroupId

* after merge fixes

* extract getOffsetsByGroup

* update deps

* add more tests

* key-per-partition map

* add `isLatest` to Prometheus exporter
  • Loading branch information
bsideup authored Jul 13, 2018
1 parent 4005d52 commit 7ed0650
Show file tree
Hide file tree
Showing 19 changed files with 524 additions and 94 deletions.
4 changes: 4 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ dependencies {
compileOnly 'org.springframework.boot:spring-boot-starter'

compile 'org.reactivestreams:reactive-streams'

testCompileOnly 'org.projectlombok:lombok'
testCompile 'junit:junit'
testCompile 'org.assertj:assertj-core'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.github.bsideup.liiklus.positions;

import lombok.AccessLevel;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;

import java.util.Comparator;
import java.util.Optional;
import java.util.regex.MatchResult;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Value
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class GroupId implements Comparable<GroupId> {

public static final Comparator<GroupId> COMPARATOR = Comparator
.comparing(GroupId::getName)
.thenComparing(it -> it.getVersion().orElse(0));

public static final String VERSION_SEPARATOR = "-v";
public static final Pattern VERSION_PATTERN = Pattern.compile("^(.*)-v(\\d+)$");

public static GroupId of(String name, int version) {
return of(name, Optional.of(version));
}

public static GroupId of(String name, Optional<Integer> version) {
if (version.orElse(0) < 0) {
throw new IllegalArgumentException("version must be >= 0");
}
return new GroupId(name, version.filter(it -> it != 0));
}

public static GroupId ofString(String str) {
Matcher matcher = VERSION_PATTERN.matcher(str);

if (matcher.matches()) {
MatchResult result = matcher.toMatchResult();

return GroupId.of(
result.group(1),
Optional.ofNullable(result.group(2)).map(Integer::parseInt)
);
} else {
return GroupId.of(
str,
Optional.empty()
);
}
}

@NonNull
String name;

@NonNull
Optional<Integer> version;

public String asString() {
return name + version.map(it -> VERSION_SEPARATOR + it).orElse("");
}

@Override
public int compareTo(GroupId other) {
return COMPARATOR.compare(this, other);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@

public interface PositionsStorage {

CompletionStage<Void> update(String topic, String groupId, int partition, long position);
CompletionStage<Void> update(String topic, GroupId groupId, int partition, long position);

Publisher<Positions> findAll();

CompletionStage<Map<Integer, Long>> findAll(String topic, String groupId);
CompletionStage<Map<Integer, Long>> findAll(String topic, GroupId groupId);

CompletionStage<Map<Integer, Map<Integer, Long>>> findAllVersionsByGroup(String topic, String groupName);

@Value
class Positions {

String topic;

String groupId;
GroupId groupId;

Map<Integer, Long> values;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface RecordsStorage {

CompletionStage<OffsetInfo> publish(Envelope envelope);

Subscription subscribe(String topic, String groupId, Optional<String> autoOffsetReset);
Subscription subscribe(String topic, String groupName, Optional<String> autoOffsetReset);

interface Subscription {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.github.bsideup.liiklus.positions;

import lombok.RequiredArgsConstructor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(Parameterized.class)
@RequiredArgsConstructor
public class GroupIdTest {

@Parameterized.Parameters(name = "{index}: {0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{"hello", GroupId.of("hello", Optional.empty())},
{"hello-", GroupId.of("hello-", Optional.empty())},
{"hello-v", GroupId.of("hello-v", Optional.empty())},
{"hello-v-v", GroupId.of("hello-v-v", Optional.empty())},

{"hello-v1", GroupId.of("hello", Optional.of(1))},
{"hello-v100", GroupId.of("hello", Optional.of(100))},

{"hello-v10-v5", GroupId.of("hello-v10", Optional.of(5))},

{"hello-v-1", GroupId.of("hello-v-1", Optional.empty())},
{"hello-v10-alpha", GroupId.of("hello-v10-alpha", Optional.empty())},
});
}

final String string;

final GroupId object;

@Test
public void testParsing() {
assertThat(GroupId.ofString(string)).isEqualTo(object);
}

@Test
public void testStringRepresentation() {
assertThat(object.asString()).isEqualTo(string);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.github.bsideup.liiklus.positions;

import org.junit.Test;

import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class GroupIdValidationTest {

@Test
public void testWrongExplicitVersion() {
assertThatThrownBy(
() -> GroupId.of("test", -1)
).isInstanceOf(IllegalArgumentException.class);
}

@Test
public void testWrongExplicitOptionalVersion() {
assertThatThrownBy(
() -> GroupId.of("test", Optional.of(-1))
).isInstanceOf(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.github.bsideup.liiklus.config.ExporterProfile;
import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.github.bsideup.liiklus.positions.PositionsStorage.Positions;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.GaugeMetricFamily;
Expand All @@ -15,6 +16,7 @@
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -45,11 +47,28 @@ public List<MetricFamilySamples> collect() {
return Collections.emptyList();
}
return Flux.from(positionsStorage.findAll())
.<MetricFamilySamples>map(positions -> {
val gauge = new GaugeMetricFamily("liiklus_topic_position", "", Arrays.asList("topic", "groupId", "partition"));
.groupBy(it -> it.getGroupId().getName())
.flatMap(it -> it
.sort(Comparator.comparing(Positions::getGroupId).reversed())
.index()
)
.<MetricFamilySamples>map(tuple -> {
val isLatest = tuple.getT1() == 0;
val positions = tuple.getT2();

val gauge = new GaugeMetricFamily("liiklus_topic_position", "", Arrays.asList("topic", "groupName", "groupVersion", "isLatest", "partition"));

for (val entry : positions.getValues().entrySet()) {
gauge.addMetric(Arrays.asList(positions.getTopic(), positions.getGroupId(), entry.getKey().toString()), entry.getValue().doubleValue());
gauge.addMetric(
Arrays.asList(
positions.getTopic(),
positions.getGroupId().getName(),
Integer.toString(positions.getGroupId().getVersion().orElse(0)),
Boolean.toString(isLatest),
entry.getKey().toString()
),
entry.getValue().doubleValue()
);
}

return gauge;
Expand Down
Loading

0 comments on commit 7ed0650

Please sign in to comment.