diff --git a/build.gradle b/build.gradle index 1e4cf1c..7351e87 100644 --- a/build.gradle +++ b/build.gradle @@ -6,8 +6,8 @@ plugins { id 'jacoco' id 'com.github.kt3k.coveralls' version '2.8.1' - id 'org.springframework.boot' version '1.5.1.RELEASE' - id 'pl.allegro.tech.build.axion-release' version '1.5.0' + id 'org.springframework.boot' version '1.5.6.RELEASE' + id 'pl.allegro.tech.build.axion-release' version '1.7.1' } scmVersion { @@ -16,8 +16,6 @@ scmVersion { hooks { pre 'fileUpdate', [file: 'docs/index.md', pattern: { v, c -> /avro-cli-$v/ }, replacement: { v, c -> "avro-cli-$v" }] - pre 'commit', - { v, p -> "release version: $v" } } } @@ -28,25 +26,28 @@ sourceCompatibility = 1.8 targetCompatibility = 1.8 def versions = [ - avro : '1.8.1', - spock : '1.1-groovy-2.4-rc-3', + avro : '1.8.2', + spring_kafka: '1.2.2.RELEASE', + spock : '1.1-groovy-2.4', - checkstyle: '6.3', - gradle : '3.3' + checkstyle : '6.3', + gradle : '4.0.2' ] dependencies { compile 'org.springframework.boot:spring-boot-starter', + "org.springframework.kafka:spring-kafka:${versions.spring_kafka}", "org.apache.avro:avro:${versions.avro}", - 'com.beust:jcommander:1.58', + 'com.beust:jcommander:1.72', 'tech.allegro.schema.json2avro:converter:0.2.5' - testCompile 'com.github.tomakehurst:wiremock:2.5.1', - 'cglib:cglib-nodep:3.2.4', - 'org.objenesis:objenesis:2.5.1', + testCompile 'com.github.tomakehurst:wiremock:2.6.0', + 'cglib:cglib-nodep:3.2.5', + 'org.objenesis:objenesis:2.6', "org.spockframework:spock-core:${versions.spock}", "org.spockframework:spock-spring:${versions.spock}", - 'org.springframework.boot:spring-boot-starter-test' + 'org.springframework.boot:spring-boot-starter-test', + "org.springframework.kafka:spring-kafka-test:${versions.spring_kafka}" } repositories { diff --git a/docs/index.md b/docs/index.md index 4bf45b0..80f9c9b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -66,6 +66,29 @@ Usage: fingerprint [options] Source of schema to read. ``` +### kafka-consume: Consume records from Kafka + +```bash +Usage: kafka-consume [options] + Options: + --bootstrap-servers, -b + Bootstrap servers. + Default: localhost:9092 + --duration + Read duration in ISO-8601 format (PnDTnHnMn.nS). + --message-type, -m + Topic message type. + Default: TEXT + Possible Values: [TEXT] + --offset-reset, -o + Offset reset consumer value. + Default: LATEST + Possible Values: [LATEST, EARLIEST] + * --topic, -t + Kafka topic name. + Default: [] +``` + ### normalize: Normalize schema to canonical parsing form ``` @@ -123,6 +146,7 @@ java -jar avro-cli-0.2.6.jar validate -s http://localhost:8000/schema-no-fields. # Credits - * [Apache Avro™ 1.8.1 Specification](http://avro.apache.org/docs/1.8.1/spec.html) - * [Spring boot](https://projects.spring.io/spring-boot) + * [Apache Avro™ Specification](http://avro.apache.org/docs/current/spec.html) * [JCommander](http://jcommander.org) + * [Spring Boot](https://projects.spring.io/spring-boot) + * [Spring for Apache Kafka](https://projects.spring.io/spring-kafka) diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 54ed702..3ea4220 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 953fa31..6e30eaa 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Sat Jan 21 03:35:37 CET 2017 +#Mon Jul 31 14:32:43 CEST 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.3-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.2-bin.zip diff --git a/gradlew b/gradlew index 4453cce..cccdd3d 100755 --- a/gradlew +++ b/gradlew @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS="" # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -155,7 +155,7 @@ if $cygwin ; then fi # Escape application args -save ( ) { +save () { for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done echo " " } diff --git a/settings.gradle b/settings.gradle index fe9cbb1..61456bb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,19 +1 @@ -/* - * This settings file was auto generated by the Gradle buildInit task - * by 'rafal.kluszczynski' at '12/9/16 12:57 AM' with Gradle 3.2.1 - * - * The settings file is used to specify which projects to include in your build. - * In a single project build this file can be empty or even removed. - * - * Detailed information about configuring a multi-project build in Gradle can be found - * in the user guide at https://docs.gradle.org/3.2.1/userguide/multi_project_builds.html - */ - -/* -// To declare projects as part of a multi-project build use the 'include' method -include 'shared' -include 'api' -include 'services:webservice' -*/ - rootProject.name = 'avro-cli' diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/CliCommandService.java b/src/main/java/io/github/rkluszczynski/avro/cli/CliCommandService.java index 7aed7a2..daec7e3 100644 --- a/src/main/java/io/github/rkluszczynski/avro/cli/CliCommandService.java +++ b/src/main/java/io/github/rkluszczynski/avro/cli/CliCommandService.java @@ -98,5 +98,5 @@ private JCommander createCommander(List cliCommands, CliMainParamete private Log log = LogFactory.getLog(CliCommandService.class); - private static final String PROGRAM_NAME = "avro-cli"; + public static final String PROGRAM_NAME = "avro-cli"; } diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/CommandNames.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/CommandNames.java index 406bf62..5fecf83 100644 --- a/src/main/java/io/github/rkluszczynski/avro/cli/command/CommandNames.java +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/CommandNames.java @@ -3,10 +3,16 @@ public enum CommandNames { CONVERT, FINGERPRINT, + KAFKA_CONSUME, NORMALIZE, VALIDATE; + /** + * @return Command name for application. + */ public String getCliCommand() { - return name().toLowerCase(); + return name() + .toLowerCase() + .replace('_', '-'); } } diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/ConsumeParameters.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/ConsumeParameters.java new file mode 100644 index 0000000..409522b --- /dev/null +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/ConsumeParameters.java @@ -0,0 +1,85 @@ +package io.github.rkluszczynski.avro.cli.command.kafka; + +import avro.shaded.com.google.common.collect.Lists; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.beust.jcommander.converters.EnumConverter; +import io.github.rkluszczynski.avro.cli.command.CliCommandParameters; +import io.github.rkluszczynski.avro.cli.util.DurationGuessConverter; + +import java.time.Duration; +import java.util.List; + +import static io.github.rkluszczynski.avro.cli.command.kafka.MessageTypeParameter.TEXT; +import static io.github.rkluszczynski.avro.cli.command.kafka.OffsetResetParameter.LATEST; + +@Parameters( + commandDescription = "Consume records from Kafka." +) +class ConsumeParameters extends CliCommandParameters { + @Parameter( + names = {"--bootstrap-servers", "-b"}, + description = "Bootstrap servers." + ) + private String bootstrapServers = "localhost:9092"; + + @Parameter( + names = {"--topic", "-t"}, + description = "Kafka topic name.", + required = true + ) + private List topics = Lists.newArrayList(); + + @Parameter( + names = {"--message-type", "-m"}, + converter = MessageTypeParameterConverter.class, + description = "Topic message type." + ) + private MessageTypeParameter messageType = TEXT; + + @Parameter( + names = {"--offset-reset", "-o"}, + converter = OffsetResetParameterConverter.class, + description = "Offset reset consumer value." + ) + private OffsetResetParameter offsetReset = LATEST; + + @Parameter( + names = {"--duration"}, + converter = DurationGuessConverter.class, + description = "Read duration in ISO-8601 format (PnDTnHnMn.nS)." + ) + private Duration duration; + + public String getBootstrapServers() { + return bootstrapServers; + } + + public List getTopics() { + return topics; + } + + public Duration getDuration() { + return duration; + } + + public MessageTypeParameter getMessageType() { + return messageType; + } + + public OffsetResetParameter getOffsetReset() { + return offsetReset; + } + + private static class MessageTypeParameterConverter extends EnumConverter { + private MessageTypeParameterConverter(String optionName, Class clazz) { + super(optionName, clazz); + } + } + + private static class OffsetResetParameterConverter extends EnumConverter { + private OffsetResetParameterConverter(String optionName, Class clazz) { + super(optionName, clazz); + } + } +} diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/ExtendedMessageListener.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/ExtendedMessageListener.java new file mode 100644 index 0000000..14c3d6a --- /dev/null +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/ExtendedMessageListener.java @@ -0,0 +1,11 @@ +package io.github.rkluszczynski.avro.cli.command.kafka; + +import org.springframework.kafka.listener.MessageListener; + +public abstract class ExtendedMessageListener implements MessageListener { + private volatile long count = 0L; + + protected long incrementAndGet() { + return ++count; + } +} diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/KafkaConsumption.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/KafkaConsumption.java new file mode 100644 index 0000000..653f17f --- /dev/null +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/KafkaConsumption.java @@ -0,0 +1,65 @@ +package io.github.rkluszczynski.avro.cli.command.kafka; + +import io.github.rkluszczynski.avro.cli.CliMainParameters; +import io.github.rkluszczynski.avro.cli.CommandException; +import io.github.rkluszczynski.avro.cli.command.CliCommand; +import io.github.rkluszczynski.avro.cli.command.CliCommandParameters; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; + +import static io.github.rkluszczynski.avro.cli.command.CommandNames.KAFKA_CONSUME; +import static io.github.rkluszczynski.avro.cli.command.kafka.KafkaMessageConsumer.ofConsumeParameters; +import static java.util.Objects.isNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +@Component +public class KafkaConsumption implements CliCommand { + private final ConsumeParameters consumeParameters = new ConsumeParameters(); + private final CountDownLatch awaitLatch = new CountDownLatch(1); + + @Override + public String execute(CliMainParameters mainParameters) { + final KafkaMessageConsumer messageConsumer = ofConsumeParameters(consumeParameters); + + final KafkaMessageListenerContainer listenerContainer = messageConsumer.getListenerContainer(); + listenerContainer.start(); + + registerContainerShutdownHook(listenerContainer); + + try { + final Duration consumeDuration = consumeParameters.getDuration(); + + if (isNull(consumeDuration)) { + awaitLatch.await(); + } else { + awaitLatch.await(consumeDuration.toMillis(), MILLISECONDS); + } + } catch (InterruptedException ex) { + throw new CommandException("Kafka consumer interrupted!", ex); + } finally { + listenerContainer.stop(); + } + return ""; + } + + private void registerContainerShutdownHook(KafkaMessageListenerContainer listenerContainer) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (listenerContainer.isRunning()) { + listenerContainer.stop(); + } + })); + } + + @Override + public String getCommandName() { + return KAFKA_CONSUME.getCliCommand(); + } + + @Override + public CliCommandParameters getParameters() { + return consumeParameters; + } +} diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/KafkaMessageConsumer.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/KafkaMessageConsumer.java new file mode 100644 index 0000000..91ec925 --- /dev/null +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/KafkaMessageConsumer.java @@ -0,0 +1,81 @@ +package io.github.rkluszczynski.avro.cli.command.kafka; + +import io.github.rkluszczynski.avro.cli.command.kafka.text.TextMessageListener; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.kafka.listener.config.ContainerProperties; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static io.github.rkluszczynski.avro.cli.CliCommandService.PROGRAM_NAME; + +class KafkaMessageConsumer { + private final KafkaMessageListenerContainer listenerContainer; + private final ExtendedMessageListener messageListener; + + private KafkaMessageConsumer(String bootstrapServers, + String[] topics, + MessageTypeParameter messageType, + OffsetResetParameter offsetReset) { + final Map consumerConfig = + consumerConfig(bootstrapServers, UUID.randomUUID().toString(), messageType, offsetReset); + final ConsumerFactory consumerFactory = createConsumerFactory(consumerConfig); + + final ContainerProperties containerProperties = new ContainerProperties(topics); + listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + + messageListener = createMessageListener(messageType); + listenerContainer.setupMessageListener(messageListener); + } + + KafkaMessageListenerContainer getListenerContainer() { + return listenerContainer; + } + + private ExtendedMessageListener createMessageListener(MessageTypeParameter messageType) { + return new TextMessageListener(); + } + + private ConsumerFactory createConsumerFactory(Map consumerConfig) { + return new DefaultKafkaConsumerFactory<>(consumerConfig); + } + + private Map consumerConfig(String bootstrapServers, + String groupId, + MessageTypeParameter messageType, + OffsetResetParameter offsetReset) { + Map consumerConfig = new HashMap<>(); + + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, String.format("%s-%s", PROGRAM_NAME, groupId)); + + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + switch (messageType) { + case TEXT: + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + break; + default: + throw new RuntimeException("Unknown kafka message type!"); + } + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset.name().toLowerCase()); + consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); + + return consumerConfig; + } + + static KafkaMessageConsumer ofConsumeParameters(ConsumeParameters consumeParameters) { + final List topicsList = consumeParameters.getTopics(); + return new KafkaMessageConsumer( + consumeParameters.getBootstrapServers(), + topicsList.toArray(new String[topicsList.size()]), + consumeParameters.getMessageType(), + consumeParameters.getOffsetReset() + ); + } +} diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/MessageTypeParameter.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/MessageTypeParameter.java new file mode 100644 index 0000000..6ee22a9 --- /dev/null +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/MessageTypeParameter.java @@ -0,0 +1,5 @@ +package io.github.rkluszczynski.avro.cli.command.kafka; + +enum MessageTypeParameter { + TEXT//, AVRO +} diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/OffsetResetParameter.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/OffsetResetParameter.java new file mode 100644 index 0000000..a75cf0d --- /dev/null +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/OffsetResetParameter.java @@ -0,0 +1,5 @@ +package io.github.rkluszczynski.avro.cli.command.kafka; + +enum OffsetResetParameter { + LATEST, EARLIEST +} diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/text/TextMessageListener.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/text/TextMessageListener.java new file mode 100644 index 0000000..1eb54ab --- /dev/null +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/kafka/text/TextMessageListener.java @@ -0,0 +1,21 @@ +package io.github.rkluszczynski.avro.cli.command.kafka.text; + +import io.github.rkluszczynski.avro.cli.command.kafka.ExtendedMessageListener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import static java.time.Instant.ofEpochMilli; + +public class TextMessageListener extends ExtendedMessageListener { + + @Override + public void onMessage(ConsumerRecord record) { + log.info(String.format("{%d} offset == %d, partition == %d, timestamp == %s", + incrementAndGet(), record.offset(), record.partition(), ofEpochMilli(record.timestamp()))); + + System.out.printf("%s%n%n", record.value()); + } + + private Log log = LogFactory.getLog(TextMessageListener.class); +} \ No newline at end of file diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/command/validation/ValidationParameters.java b/src/main/java/io/github/rkluszczynski/avro/cli/command/validation/ValidationParameters.java index f4c6c3f..879f919 100644 --- a/src/main/java/io/github/rkluszczynski/avro/cli/command/validation/ValidationParameters.java +++ b/src/main/java/io/github/rkluszczynski/avro/cli/command/validation/ValidationParameters.java @@ -61,7 +61,7 @@ public boolean isOnlyLatestValidator() { } private static class CompatibilityStrategyConverter extends EnumConverter { - public CompatibilityStrategyConverter(String optionName, Class clazz) { + private CompatibilityStrategyConverter(String optionName, Class clazz) { super(optionName, clazz); } } diff --git a/src/main/java/io/github/rkluszczynski/avro/cli/util/DurationGuessConverter.java b/src/main/java/io/github/rkluszczynski/avro/cli/util/DurationGuessConverter.java new file mode 100644 index 0000000..d8a5d0a --- /dev/null +++ b/src/main/java/io/github/rkluszczynski/avro/cli/util/DurationGuessConverter.java @@ -0,0 +1,38 @@ +package io.github.rkluszczynski.avro.cli.util; + +import com.beust.jcommander.IStringConverter; +import io.github.rkluszczynski.avro.cli.CommandException; + +import java.time.Duration; +import java.time.format.DateTimeParseException; + +public class DurationGuessConverter implements IStringConverter { + @Override + public Duration convert(String value) { + try { + return Duration.parse(value); + } catch (DateTimeParseException ex) { + parseException = ex; + + return tryGuessPeriodPrefix(value); + } + } + + private Duration tryGuessPeriodPrefix(String value) { + try { + return Duration.parse("P" + value); + } catch (DateTimeParseException ex) { + return tryGuessPeriodTimePrefix(value); + } + } + + private Duration tryGuessPeriodTimePrefix(String value) { + try { + return Duration.parse("PT" + value); + } catch (DateTimeParseException ex) { + throw new CommandException("Could not guess duration parameter!", parseException); + } + } + + private DateTimeParseException parseException = null; +} diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 603f6c3..a34a0d6 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -1,3 +1,7 @@ -spring.main.banner-mode: "off" +schema-registry.url: http://schema-registry.allegrogroup.com + +spring.main: + banner-mode: 'off' + web-environment: 'false' logging.level.: WARN diff --git a/src/test/groovy/io/github/rkluszczynski/avro/cli/WiremockTrait.groovy b/src/test/groovy/io/github/rkluszczynski/avro/cli/WiremockTrait.groovy new file mode 100644 index 0000000..0e2a580 --- /dev/null +++ b/src/test/groovy/io/github/rkluszczynski/avro/cli/WiremockTrait.groovy @@ -0,0 +1,13 @@ +package io.github.rkluszczynski.avro.cli + +import com.github.tomakehurst.wiremock.WireMockServer + +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig + +trait WiremockTrait { + def createWiremockServer() { + new WireMockServer( + wireMockConfig().dynamicPort() + ) + } +} \ No newline at end of file diff --git a/src/test/groovy/io/github/rkluszczynski/avro/cli/command/KafkaConsumeCommandTest.groovy b/src/test/groovy/io/github/rkluszczynski/avro/cli/command/KafkaConsumeCommandTest.groovy new file mode 100644 index 0000000..e7db248 --- /dev/null +++ b/src/test/groovy/io/github/rkluszczynski/avro/cli/command/KafkaConsumeCommandTest.groovy @@ -0,0 +1,58 @@ +package io.github.rkluszczynski.avro.cli.command + +import io.github.rkluszczynski.avro.cli.BaseTestSpecification +import io.github.rkluszczynski.avro.cli.CliCommandService +import io.github.rkluszczynski.avro.cli.command.kafka.KafkaConsumption +import org.junit.ClassRule +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.test.rule.KafkaEmbedded +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Shared +import spock.lang.Unroll + +class KafkaConsumeCommandTest extends BaseTestSpecification { + private commandService = new CliCommandService([new KafkaConsumption()]) + + @ClassRule + @Shared + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, 'testTopic', 'testTopic0-1', 'testTopic0-2', 'testTopic0-3') + + @Unroll + def 'should consume earliest message from topic with duration string #durationParameter'() { + given: + def senderProperties = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString) + def producerFactory = new DefaultKafkaProducerFactory(senderProperties) + + new KafkaTemplate(producerFactory).send(topicName, 'test message') + + when: + commandService.executeCommand('kafka-consume', + '-b', embeddedKafka.brokersAsString, + '-t', topicName, + '-o', 'earliest', + '--duration', durationParameter + ) + + then: + trimmedOutput().endsWith('test message') + + where: + topicName | durationParameter + 'testTopic0-1' | 'PT4S' + 'testTopic0-2' | 'T4S' + 'testTopic0-3' | '4S' + } + + def 'should fail when duration parameter is not parsable'() { + when: + commandService.executeCommand('kafka-consume', + '-b', embeddedKafka.brokersAsString, + '-t', 'testTopic', + '--duration', 'NOT-PARSABLE-DURATION-PARAMETER' + ) + + then: + trimmedOutput() == 'FAILED [java.time.format.DateTimeParseException] Text cannot be parsed to a Duration' + } +} diff --git a/src/test/groovy/io/github/rkluszczynski/avro/cli/command/KafkaConsumeForeverTest.groovy b/src/test/groovy/io/github/rkluszczynski/avro/cli/command/KafkaConsumeForeverTest.groovy new file mode 100644 index 0000000..66114b9 --- /dev/null +++ b/src/test/groovy/io/github/rkluszczynski/avro/cli/command/KafkaConsumeForeverTest.groovy @@ -0,0 +1,94 @@ +package io.github.rkluszczynski.avro.cli.command + +import io.github.rkluszczynski.avro.cli.CliMainParameters +import io.github.rkluszczynski.avro.cli.CommandException +import io.github.rkluszczynski.avro.cli.command.kafka.KafkaConsumption +import org.junit.ClassRule +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.test.rule.KafkaEmbedded +import org.springframework.test.context.ContextConfiguration +import spock.lang.Shared +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +@ContextConfiguration +@SpringBootTest +class KafkaConsumeForeverTest extends Specification { + @ClassRule + @Shared + private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, 'foreverTopic') + + @Shared + protected condition = new PollingConditions(timeout: 5, delay: 0.2) + + @Autowired + private KafkaConsumption kafkaConsumeCommand + + def setup() { + kafkaConsumeCommand.consumeParameters.bootstrapServers = embeddedKafka.brokersAsString + kafkaConsumeCommand.consumeParameters.topics = ['foreverTopic'] + } + + def 'should infinite consumption throw exception when interrupted'() { + setup: + def commandException = null + def commandThread = runInThread { + try { + kafkaConsumeCommand.execute(new CliMainParameters()) + } + catch (Throwable cause) { + commandException = cause + } + } + + when: + commandThread.start() + + and: + condition.eventually { + assert commandThread.state == Thread.State.WAITING + } + + and: + commandThread.interrupt() + commandThread.join() + + then: + commandException instanceof CommandException + commandException.message == 'Kafka consumer interrupted!' + commandException.cause instanceof InterruptedException + } + + def 'should infinite consumption end without output when await latch is disposed'() { + setup: + def commandOutput = null + def commandThread = runInThread { + commandOutput = kafkaConsumeCommand.execute(new CliMainParameters()) + } + + when: + commandThread.start() + + and: + condition.eventually { + assert commandThread.state == Thread.State.WAITING + } + + and: + kafkaConsumeCommand.awaitLatch.countDown() + commandThread.join() + + then: + commandOutput == '' + } + + private Thread runInThread(closure) { + new Thread() { + @Override + void run() { + closure() + } + } + } +} diff --git a/src/test/groovy/io/github/rkluszczynski/avro/cli/command/kafka/ConsumeParametersTest.groovy b/src/test/groovy/io/github/rkluszczynski/avro/cli/command/kafka/ConsumeParametersTest.groovy new file mode 100644 index 0000000..547bf2f --- /dev/null +++ b/src/test/groovy/io/github/rkluszczynski/avro/cli/command/kafka/ConsumeParametersTest.groovy @@ -0,0 +1,17 @@ +package io.github.rkluszczynski.avro.cli.command.kafka + +import spock.lang.Specification + +class ConsumeParametersTest extends Specification { + def 'should works message type enum converter'() { + expect: + new ConsumeParameters.MessageTypeParameterConverter("messageType", MessageTypeParameter.class) + .convert('text') == MessageTypeParameter.TEXT + } + + def 'should works offset reset enum converter'() { + expect: + new ConsumeParameters.OffsetResetParameterConverter("offsetReset", OffsetResetParameter.class) + .convert('latest') == OffsetResetParameter.LATEST + } +} diff --git a/src/test/groovy/io/github/rkluszczynski/avro/cli/command/validation/ValidationParametersTest.groovy b/src/test/groovy/io/github/rkluszczynski/avro/cli/command/validation/ValidationParametersTest.groovy new file mode 100644 index 0000000..0961e65 --- /dev/null +++ b/src/test/groovy/io/github/rkluszczynski/avro/cli/command/validation/ValidationParametersTest.groovy @@ -0,0 +1,11 @@ +package io.github.rkluszczynski.avro.cli.command.validation + +import spock.lang.Specification + +class ValidationParametersTest extends Specification { + def 'should works compatibility strategy enum converter'() { + expect: + new ValidationParameters.CompatibilityStrategyConverter("compatibilityStrategy", CompatibilityStrategy.class) + .convert('full') == CompatibilityStrategy.FULL + } +} diff --git a/src/test/groovy/io/github/rkluszczynski/avro/cli/util/SchemaSourceConverterTest.groovy b/src/test/groovy/io/github/rkluszczynski/avro/cli/util/SchemaSourceConverterTest.groovy index b35307e..4c1b7d6 100644 --- a/src/test/groovy/io/github/rkluszczynski/avro/cli/util/SchemaSourceConverterTest.groovy +++ b/src/test/groovy/io/github/rkluszczynski/avro/cli/util/SchemaSourceConverterTest.groovy @@ -1,17 +1,15 @@ package io.github.rkluszczynski.avro.cli.util import com.github.tomakehurst.wiremock.WireMockServer +import io.github.rkluszczynski.avro.cli.WiremockTrait import org.apache.avro.Schema -import spock.lang.Shared import spock.lang.Specification import static com.github.tomakehurst.wiremock.client.WireMock.aResponse import static com.github.tomakehurst.wiremock.client.WireMock.get import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo -class SchemaSourceConverterTest extends Specification { - @Shared - WireMockServer wireMockServer +class SchemaSourceConverterTest extends Specification implements WiremockTrait { def 'should convert file path to schema'() { expect: @@ -21,7 +19,7 @@ class SchemaSourceConverterTest extends Specification { def 'should convert url content to schema'() { setup: - wireMockServer = new WireMockServer() + WireMockServer wireMockServer = createWiremockServer() wireMockServer.start() wireMockServer.stubFor(get(urlEqualTo('/schema-no-fields')) @@ -37,7 +35,9 @@ class SchemaSourceConverterTest extends Specification { } private checkConvertedSchema(schema) { - schema.type == Schema.Type.RECORD && schema.name == 'testRecord' && - schema.namespace == 'io.github.rkluszczynski.avro.cli' && schema.fields == [] + schema.type == Schema.Type.RECORD && + schema.name == 'testRecord' && + schema.namespace == 'io.github.rkluszczynski.avro.cli' && + schema.fields == [] } }