Skip to content

Commit

Permalink
kafka-consume command (without avro support yet) (#27)
Browse files Browse the repository at this point in the history
support plain kafka consumption (without avro support yet)
  • Loading branch information
rkluszczynski authored Aug 1, 2017
1 parent 596f060 commit 7acc07c
Show file tree
Hide file tree
Showing 24 changed files with 570 additions and 49 deletions.
27 changes: 14 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ plugins {
id 'jacoco'
id 'com.github.kt3k.coveralls' version '2.8.1'

id 'org.springframework.boot' version '1.5.1.RELEASE'
id 'pl.allegro.tech.build.axion-release' version '1.5.0'
id 'org.springframework.boot' version '1.5.6.RELEASE'
id 'pl.allegro.tech.build.axion-release' version '1.7.1'
}

scmVersion {
Expand All @@ -16,8 +16,6 @@ scmVersion {
hooks {
pre 'fileUpdate',
[file: 'docs/index.md', pattern: { v, c -> /avro-cli-$v/ }, replacement: { v, c -> "avro-cli-$v" }]
pre 'commit',
{ v, p -> "release version: $v" }
}
}

Expand All @@ -28,25 +26,28 @@ sourceCompatibility = 1.8
targetCompatibility = 1.8

def versions = [
avro : '1.8.1',
spock : '1.1-groovy-2.4-rc-3',
avro : '1.8.2',
spring_kafka: '1.2.2.RELEASE',
spock : '1.1-groovy-2.4',

checkstyle: '6.3',
gradle : '3.3'
checkstyle : '6.3',
gradle : '4.0.2'
]

dependencies {
compile 'org.springframework.boot:spring-boot-starter',
"org.springframework.kafka:spring-kafka:${versions.spring_kafka}",
"org.apache.avro:avro:${versions.avro}",
'com.beust:jcommander:1.58',
'com.beust:jcommander:1.72',
'tech.allegro.schema.json2avro:converter:0.2.5'

testCompile 'com.github.tomakehurst:wiremock:2.5.1',
'cglib:cglib-nodep:3.2.4',
'org.objenesis:objenesis:2.5.1',
testCompile 'com.github.tomakehurst:wiremock:2.6.0',
'cglib:cglib-nodep:3.2.5',
'org.objenesis:objenesis:2.6',
"org.spockframework:spock-core:${versions.spock}",
"org.spockframework:spock-spring:${versions.spock}",
'org.springframework.boot:spring-boot-starter-test'
'org.springframework.boot:spring-boot-starter-test',
"org.springframework.kafka:spring-kafka-test:${versions.spring_kafka}"
}

repositories {
Expand Down
28 changes: 26 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,29 @@ Usage: fingerprint [options]
Source of schema to read.
```

### kafka-consume: Consume records from Kafka

```bash
Usage: kafka-consume [options]
Options:
--bootstrap-servers, -b
Bootstrap servers.
Default: localhost:9092
--duration
Read duration in ISO-8601 format (PnDTnHnMn.nS).
--message-type, -m
Topic message type.
Default: TEXT
Possible Values: [TEXT]
--offset-reset, -o
Offset reset consumer value.
Default: LATEST
Possible Values: [LATEST, EARLIEST]
* --topic, -t
Kafka topic name.
Default: []
```

### normalize: Normalize schema to canonical parsing form

```
Expand Down Expand Up @@ -123,6 +146,7 @@ java -jar avro-cli-0.2.6.jar validate -s http://localhost:8000/schema-no-fields.

# Credits

* [Apache Avro™ 1.8.1 Specification](http://avro.apache.org/docs/1.8.1/spec.html)
* [Spring boot](https://projects.spring.io/spring-boot)
* [Apache Avro™ Specification](http://avro.apache.org/docs/current/spec.html)
* [JCommander](http://jcommander.org)
* [Spring Boot](https://projects.spring.io/spring-boot)
* [Spring for Apache Kafka](https://projects.spring.io/spring-kafka)
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Sat Jan 21 03:35:37 CET 2017
#Mon Jul 31 14:32:43 CEST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-3.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.2-bin.zip
6 changes: 3 additions & 3 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

warn ( ) {
warn () {
echo "$*"
}

die ( ) {
die () {
echo
echo "$*"
echo
Expand Down Expand Up @@ -155,7 +155,7 @@ if $cygwin ; then
fi

# Escape application args
save ( ) {
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
Expand Down
18 changes: 0 additions & 18 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,19 +1 @@
/*
* This settings file was auto generated by the Gradle buildInit task
* by 'rafal.kluszczynski' at '12/9/16 12:57 AM' with Gradle 3.2.1
*
* The settings file is used to specify which projects to include in your build.
* In a single project build this file can be empty or even removed.
*
* Detailed information about configuring a multi-project build in Gradle can be found
* in the user guide at https://docs.gradle.org/3.2.1/userguide/multi_project_builds.html
*/

/*
// To declare projects as part of a multi-project build use the 'include' method
include 'shared'
include 'api'
include 'services:webservice'
*/

rootProject.name = 'avro-cli'
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,5 @@ private JCommander createCommander(List<CliCommand> cliCommands, CliMainParamete

private Log log = LogFactory.getLog(CliCommandService.class);

private static final String PROGRAM_NAME = "avro-cli";
public static final String PROGRAM_NAME = "avro-cli";
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@
public enum CommandNames {
CONVERT,
FINGERPRINT,
KAFKA_CONSUME,
NORMALIZE,
VALIDATE;

/**
* @return Command name for application.
*/
public String getCliCommand() {
return name().toLowerCase();
return name()
.toLowerCase()
.replace('_', '-');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.github.rkluszczynski.avro.cli.command.kafka;

import avro.shaded.com.google.common.collect.Lists;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.EnumConverter;
import io.github.rkluszczynski.avro.cli.command.CliCommandParameters;
import io.github.rkluszczynski.avro.cli.util.DurationGuessConverter;

import java.time.Duration;
import java.util.List;

import static io.github.rkluszczynski.avro.cli.command.kafka.MessageTypeParameter.TEXT;
import static io.github.rkluszczynski.avro.cli.command.kafka.OffsetResetParameter.LATEST;

@Parameters(
commandDescription = "Consume records from Kafka."
)
class ConsumeParameters extends CliCommandParameters {
@Parameter(
names = {"--bootstrap-servers", "-b"},
description = "Bootstrap servers."
)
private String bootstrapServers = "localhost:9092";

@Parameter(
names = {"--topic", "-t"},
description = "Kafka topic name.",
required = true
)
private List<String> topics = Lists.newArrayList();

@Parameter(
names = {"--message-type", "-m"},
converter = MessageTypeParameterConverter.class,
description = "Topic message type."
)
private MessageTypeParameter messageType = TEXT;

@Parameter(
names = {"--offset-reset", "-o"},
converter = OffsetResetParameterConverter.class,
description = "Offset reset consumer value."
)
private OffsetResetParameter offsetReset = LATEST;

@Parameter(
names = {"--duration"},
converter = DurationGuessConverter.class,
description = "Read duration in ISO-8601 format (PnDTnHnMn.nS)."
)
private Duration duration;

public String getBootstrapServers() {
return bootstrapServers;
}

public List<String> getTopics() {
return topics;
}

public Duration getDuration() {
return duration;
}

public MessageTypeParameter getMessageType() {
return messageType;
}

public OffsetResetParameter getOffsetReset() {
return offsetReset;
}

private static class MessageTypeParameterConverter extends EnumConverter<MessageTypeParameter> {
private MessageTypeParameterConverter(String optionName, Class<MessageTypeParameter> clazz) {
super(optionName, clazz);
}
}

private static class OffsetResetParameterConverter extends EnumConverter<OffsetResetParameter> {
private OffsetResetParameterConverter(String optionName, Class<OffsetResetParameter> clazz) {
super(optionName, clazz);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.github.rkluszczynski.avro.cli.command.kafka;

import org.springframework.kafka.listener.MessageListener;

public abstract class ExtendedMessageListener<K, V> implements MessageListener<K, V> {
private volatile long count = 0L;

protected long incrementAndGet() {
return ++count;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.github.rkluszczynski.avro.cli.command.kafka;

import io.github.rkluszczynski.avro.cli.CliMainParameters;
import io.github.rkluszczynski.avro.cli.CommandException;
import io.github.rkluszczynski.avro.cli.command.CliCommand;
import io.github.rkluszczynski.avro.cli.command.CliCommandParameters;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;

import static io.github.rkluszczynski.avro.cli.command.CommandNames.KAFKA_CONSUME;
import static io.github.rkluszczynski.avro.cli.command.kafka.KafkaMessageConsumer.ofConsumeParameters;
import static java.util.Objects.isNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

@Component
public class KafkaConsumption implements CliCommand {
private final ConsumeParameters consumeParameters = new ConsumeParameters();
private final CountDownLatch awaitLatch = new CountDownLatch(1);

@Override
public String execute(CliMainParameters mainParameters) {
final KafkaMessageConsumer messageConsumer = ofConsumeParameters(consumeParameters);

final KafkaMessageListenerContainer<String, String> listenerContainer = messageConsumer.getListenerContainer();
listenerContainer.start();

registerContainerShutdownHook(listenerContainer);

try {
final Duration consumeDuration = consumeParameters.getDuration();

if (isNull(consumeDuration)) {
awaitLatch.await();
} else {
awaitLatch.await(consumeDuration.toMillis(), MILLISECONDS);
}
} catch (InterruptedException ex) {
throw new CommandException("Kafka consumer interrupted!", ex);
} finally {
listenerContainer.stop();
}
return "";
}

private void registerContainerShutdownHook(KafkaMessageListenerContainer<String, String> listenerContainer) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (listenerContainer.isRunning()) {
listenerContainer.stop();
}
}));
}

@Override
public String getCommandName() {
return KAFKA_CONSUME.getCliCommand();
}

@Override
public CliCommandParameters getParameters() {
return consumeParameters;
}
}
Loading

0 comments on commit 7acc07c

Please sign in to comment.