diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java index 31939b00..451731ab 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java @@ -120,7 +120,7 @@ private void runTest(String branch, boolean useSchema) { .config("value.converter.schemas.enable", useSchema) .config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE)) .config("iceberg.tables.cdc-field", "op") - .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.interval-ms", 5000) .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) .config("iceberg.kafka.auto.offset.reset", "earliest"); diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java index bc5163b8..13cf5250 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java @@ -97,7 +97,7 @@ private void runTest(String branch, boolean useSchema) { .config("value.converter.schemas.enable", useSchema) .config("iceberg.tables.dynamic-enabled", true) .config("iceberg.tables.route-field", "payload") - .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.interval-ms", 5000) .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) .config("iceberg.kafka.auto.offset.reset", "earliest"); diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java index 072495ca..e38d0772 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java @@ -101,7 +101,7 @@ private void runTest(String branch, boolean useSchema) { .config("iceberg.tables.route-field", "type") .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1") .config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2") - .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.interval-ms", 5000) .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) .config("iceberg.kafka.auto.offset.reset", "earliest"); diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java index 3ab50dc3..249b3229 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java @@ -229,7 +229,7 @@ private void runTest(String branch, boolean useSchema, Map extra .config("value.converter", "org.apache.kafka.connect.json.JsonConverter") .config("value.converter.schemas.enable", useSchema) .config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE)) - .config("iceberg.control.commit.interval-ms", 1000) + .config("iceberg.control.commit.interval-ms", 5000) .config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE) .config("iceberg.kafka.auto.offset.reset", "earliest"); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java index b4aee3ad..d1da18f8 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java @@ -18,27 +18,14 @@ */ package io.tabular.iceberg.connect; -import io.tabular.iceberg.connect.channel.Coordinator; -import io.tabular.iceberg.connect.channel.CoordinatorThread; -import io.tabular.iceberg.connect.channel.KafkaClientFactory; -import io.tabular.iceberg.connect.channel.KafkaUtils; -import io.tabular.iceberg.connect.channel.NotRunningException; -import io.tabular.iceberg.connect.channel.Worker; -import io.tabular.iceberg.connect.data.IcebergWriterFactory; +import io.tabular.iceberg.connect.channel.Task; +import io.tabular.iceberg.connect.channel.TaskImpl; import io.tabular.iceberg.connect.data.Utilities; import java.util.Collection; -import java.util.Comparator; import java.util.Map; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; -import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -49,21 +36,7 @@ public class IcebergSinkTask extends SinkTask { private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class); private IcebergSinkConfig config; - private Catalog catalog; - private CoordinatorThread coordinatorThread; - private Worker worker; - - static class TopicPartitionComparator implements Comparator { - - @Override - public int compare(TopicPartition o1, TopicPartition o2) { - int result = o1.topic().compareTo(o2.topic()); - if (result == 0) { - result = Integer.compare(o1.partition(), o2.partition()); - } - return result; - } - } + private Task task; @Override public String version() { @@ -79,43 +52,8 @@ public void start(Map props) { public void open(Collection partitions) { // destroy any state if KC re-uses object clearObjectState(); - catalog = Utilities.loadCatalog(config); - KafkaClientFactory clientFactory = new KafkaClientFactory(config.kafkaProps()); - - ConsumerGroupDescription groupDesc; - try (Admin admin = clientFactory.createAdmin()) { - groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin); - } - if (groupDesc.state() == ConsumerGroupState.STABLE) { - Collection members = groupDesc.members(); - if (isLeader(members, partitions)) { - LOG.info("Task elected leader, starting commit coordinator"); - Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory); - coordinatorThread = new CoordinatorThread(coordinator); - coordinatorThread.start(); - } - } - - LOG.info("Starting commit worker"); - IcebergWriterFactory writerFactory = new IcebergWriterFactory(catalog, config); - worker = new Worker(config, clientFactory, writerFactory, context); - worker.syncCommitOffsets(); - worker.start(); - } - - @VisibleForTesting - boolean isLeader(Collection members, Collection partitions) { - // there should only be one task assigned partition 0 of the first topic, - // so elect that one the leader - TopicPartition firstTopicPartition = - members.stream() - .flatMap(member -> member.assignment().topicPartitions().stream()) - .min(new TopicPartitionComparator()) - .orElseThrow( - () -> new ConnectException("No partitions assigned, cannot determine leader")); - - return partitions.contains(firstTopicPartition); + task = new TaskImpl(context, config); } @Override @@ -128,57 +66,21 @@ private void close() { } private void clearObjectState() { - if (worker != null) { - worker.stop(); - worker = null; - } - - if (coordinatorThread != null) { - coordinatorThread.terminate(); - coordinatorThread = null; - } - - if (catalog != null) { - if (catalog instanceof AutoCloseable) { - try { - ((AutoCloseable) catalog).close(); - } catch (Exception e) { - LOG.warn("An error occurred closing catalog instance, ignoring...", e); - } - } - catalog = null; - } + Utilities.close(task); + task = null; } @Override public void put(Collection sinkRecords) { - if (sinkRecords != null && !sinkRecords.isEmpty() && worker != null) { - worker.save(sinkRecords); - } - processControlEvents(); - } - - @Override - public void flush(Map currentOffsets) { - processControlEvents(); - } - - private void processControlEvents() { - if (coordinatorThread != null && coordinatorThread.isTerminated()) { - throw new NotRunningException("Coordinator unexpectedly terminated"); - } - if (worker != null) { - worker.process(); + if (task != null) { + task.put(sinkRecords); } } @Override public Map preCommit( Map currentOffsets) { - if (worker == null) { - return ImmutableMap.of(); - } - return worker.commitOffsets(); + return ImmutableMap.of(); } @Override diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java index 38367b15..3559ab62 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java @@ -27,9 +27,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -46,7 +48,6 @@ public abstract class Channel { private static final Logger LOG = LoggerFactory.getLogger(Channel.class); private final String controlTopic; - private final String controlGroupId; private final String groupId; private final Producer producer; private final Consumer consumer; @@ -60,22 +61,26 @@ public Channel( IcebergSinkConfig config, KafkaClientFactory clientFactory) { this.controlTopic = config.controlTopic(); - this.controlGroupId = config.controlGroupId(); this.groupId = config.controlGroupId(); String transactionalId = name + config.transactionalSuffix(); - this.producer = clientFactory.createProducer(transactionalId); + Pair> pair = clientFactory.createProducer(transactionalId); + this.producer = pair.second(); this.consumer = clientFactory.createConsumer(consumerGroupId); + consumer.subscribe(ImmutableList.of(controlTopic)); this.admin = clientFactory.createAdmin(); - this.producerId = UUID.randomUUID().toString(); + this.producerId = pair.first().toString(); } protected void send(Event event) { - send(ImmutableList.of(event), ImmutableMap.of()); + send(ImmutableList.of(event), ImmutableMap.of(), null); } - protected void send(List events, Map sourceOffsets) { + protected void send( + List events, + Map sourceOffsets, + ConsumerGroupMetadata consumerGroupMetadata) { Map offsetsToCommit = Maps.newHashMap(); sourceOffsets.forEach((k, v) -> offsetsToCommit.put(k, new OffsetAndMetadata(v.offset()))); @@ -94,10 +99,9 @@ protected void send(List events, Map sourceOffset producer.beginTransaction(); try { recordList.forEach(producer::send); + producer.flush(); if (!sourceOffsets.isEmpty()) { - // TODO: this doesn't fence zombies - producer.sendOffsetsToTransaction( - offsetsToCommit, new ConsumerGroupMetadata(controlGroupId)); + producer.sendOffsetsToTransaction(offsetsToCommit, consumerGroupMetadata); } producer.commitTransaction(); } catch (Exception e) { @@ -111,9 +115,7 @@ protected void send(List events, Map sourceOffset } } - protected abstract boolean receive(Envelope envelope); - - protected void consumeAvailable(Duration pollDuration) { + protected void consumeAvailable(Duration pollDuration, Function receiveFn) { ConsumerRecords records = consumer.poll(pollDuration); while (!records.isEmpty()) { records.forEach( @@ -126,7 +128,7 @@ record -> { if (event.groupId().equals(groupId)) { LOG.debug("Received event of type: {}", event.type().name()); - if (receive(new Envelope(event, record.partition(), record.offset()))) { + if (receiveFn.apply(new Envelope(event, record.partition(), record.offset()))) { LOG.info("Handled event of type: {}", event.type().name()); } } @@ -152,13 +154,6 @@ protected Admin admin() { return admin; } - public void start() { - consumer.subscribe(ImmutableList.of(controlTopic)); - - // initial poll with longer duration so the consumer will initialize... - consumeAvailable(Duration.ofMillis(1000)); - } - public void stop() { LOG.info("Channel stopping"); producer.close(); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Committable.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Committable.java new file mode 100644 index 00000000..26fca4db --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Committable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import io.tabular.iceberg.connect.data.Offset; +import io.tabular.iceberg.connect.data.WriterResult; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.TopicPartition; + +class Committable { + + private final ImmutableMap offsetsByTopicPartition; + private final ImmutableList writerResults; + + Committable( + Map offsetsByTopicPartition, List writerResults) { + this.offsetsByTopicPartition = ImmutableMap.copyOf(offsetsByTopicPartition); + this.writerResults = ImmutableList.copyOf(writerResults); + } + + public Map offsetsByTopicPartition() { + return offsetsByTopicPartition; + } + + public List writerResults() { + return writerResults; + } +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommittableSupplier.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommittableSupplier.java new file mode 100644 index 00000000..d9660ce0 --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommittableSupplier.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +interface CommittableSupplier { + Committable committable(); +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Committer.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Committer.java new file mode 100644 index 00000000..a9812bb3 --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Committer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +interface Committer { + void commit(CommittableSupplier committableSupplier); +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java new file mode 100644 index 00000000..d8c299a0 --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import io.tabular.iceberg.connect.data.Offset; +import io.tabular.iceberg.connect.events.CommitReadyPayload; +import io.tabular.iceberg.connect.events.CommitRequestPayload; +import io.tabular.iceberg.connect.events.CommitResponsePayload; +import io.tabular.iceberg.connect.events.Event; +import io.tabular.iceberg.connect.events.EventType; +import io.tabular.iceberg.connect.events.TableName; +import io.tabular.iceberg.connect.events.TopicPartitionOffset; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CommitterImpl extends Channel implements Committer, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CommitterImpl.class); + private final SinkTaskContext context; + private final IcebergSinkConfig config; + private final Optional maybeCoordinatorThread; + + public CommitterImpl(SinkTaskContext context, IcebergSinkConfig config, Catalog catalog) { + this(context, config, catalog, new KafkaClientFactory(config.kafkaProps())); + } + + private CommitterImpl( + SinkTaskContext context, + IcebergSinkConfig config, + Catalog catalog, + KafkaClientFactory kafkaClientFactory) { + this( + context, + config, + kafkaClientFactory, + new CoordinatorThreadFactoryImpl(catalog, kafkaClientFactory)); + } + + @VisibleForTesting + CommitterImpl( + SinkTaskContext context, + IcebergSinkConfig config, + KafkaClientFactory clientFactory, + CoordinatorThreadFactory coordinatorThreadFactory) { + // pass transient consumer group ID to which we never commit offsets + super( + "committer", + IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(), + config, + clientFactory); + + this.context = context; + this.config = config; + + this.maybeCoordinatorThread = coordinatorThreadFactory.create(context, config); + + // The source-of-truth for source-topic offsets is the control-group-id + Map stableConsumerOffsets = + fetchStableConsumerOffsets(config.controlGroupId()); + // Rewind kafka connect consumer to avoid duplicates + context.offset(stableConsumerOffsets); + + consumeAvailable( + // initial poll with longer duration so the consumer will initialize... + Duration.ofMillis(1000), + envelope -> + receive( + envelope, + // CommittableSupplier that always returns empty committables + () -> new Committable(ImmutableMap.of(), ImmutableList.of()))); + } + + private Map fetchStableConsumerOffsets(String groupId) { + try { + ListConsumerGroupOffsetsResult response = + admin() + .listConsumerGroupOffsets( + groupId, new ListConsumerGroupOffsetsOptions().requireStable(true)); + return response.partitionsToOffsetAndMetadata().get().entrySet().stream() + .filter(entry -> context.assignment().contains(entry.getKey())) + .collect(toMap(Map.Entry::getKey, entry -> entry.getValue().offset())); + } catch (InterruptedException | ExecutionException e) { + throw new ConnectException(e); + } + } + + private void throwExceptionIfCoordinatorIsTerminated() { + if (maybeCoordinatorThread.map(CoordinatorThread::isTerminated).orElse(false)) { + throw new IllegalStateException("Coordinator unexpectedly terminated"); + } + } + + private boolean receive(Envelope envelope, CommittableSupplier committableSupplier) { + if (envelope.event().type() == EventType.COMMIT_REQUEST) { + UUID commitId = ((CommitRequestPayload) envelope.event().payload()).commitId(); + sendCommitResponse(commitId, committableSupplier); + return true; + } + return false; + } + + private void sendCommitResponse(UUID commitId, CommittableSupplier committableSupplier) { + Committable committable = committableSupplier.committable(); + + List events = Lists.newArrayList(); + + committable + .writerResults() + .forEach( + writerResult -> { + Event commitResponse = + new Event( + config.controlGroupId(), + EventType.COMMIT_RESPONSE, + new CommitResponsePayload( + writerResult.partitionStruct(), + commitId, + TableName.of(writerResult.tableIdentifier()), + writerResult.dataFiles(), + writerResult.deleteFiles())); + + events.add(commitResponse); + }); + + // include all assigned topic partitions even if no messages were read + // from a partition, as the coordinator will use that to determine + // when all data for a commit has been received + List assignments = + context.assignment().stream() + .map( + topicPartition -> { + Offset offset = + committable.offsetsByTopicPartition().getOrDefault(topicPartition, null); + return new TopicPartitionOffset( + topicPartition.topic(), + topicPartition.partition(), + offset == null ? null : offset.offset(), + offset == null ? null : offset.timestamp()); + }) + .collect(toList()); + + Event commitReady = + new Event( + config.controlGroupId(), + EventType.COMMIT_READY, + new CommitReadyPayload(commitId, assignments)); + events.add(commitReady); + + Map offsets = committable.offsetsByTopicPartition(); + send(events, offsets, new ConsumerGroupMetadata(config.controlGroupId())); + send(ImmutableList.of(), offsets, new ConsumerGroupMetadata(config.connectGroupId())); + } + + @Override + public void commit(CommittableSupplier committableSupplier) { + throwExceptionIfCoordinatorIsTerminated(); + consumeAvailable(Duration.ZERO, envelope -> receive(envelope, committableSupplier)); + } + + @Override + public void close() throws IOException { + stop(); + maybeCoordinatorThread.ifPresent(CoordinatorThread::terminate); + } +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 439ba4d2..6fb6e2c3 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -56,7 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Coordinator extends Channel { +public class Coordinator extends Channel implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -88,6 +88,9 @@ public Coordinator( String.format(OFFSETS_SNAPSHOT_PROP_FMT, config.controlTopic(), config.controlGroupId()); this.exec = ThreadPools.newWorkerPool("iceberg-committer", config.commitThreads()); this.commitState = new CommitState(config); + + // initial poll with longer duration so the consumer will initialize... + consumeAvailable(Duration.ofMillis(1000), this::receive); } public void process() { @@ -103,15 +106,14 @@ public void process() { LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString()); } - consumeAvailable(POLL_DURATION); + consumeAvailable(POLL_DURATION, this::receive); if (commitState.isCommitTimedOut()) { commit(true); } } - @Override - protected boolean receive(Envelope envelope) { + private boolean receive(Envelope envelope) { switch (envelope.event().type()) { case COMMIT_RESPONSE: commitState.addResponse(envelope); @@ -297,4 +299,10 @@ private Map lastCommittedOffsetsForTable(Table table, String bran } return ImmutableMap.of(); } + + @Override + public void close() throws IOException { + exec.shutdownNow(); + stop(); + } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThread.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThread.java index 46a3dc5c..55c63fcc 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThread.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThread.java @@ -35,13 +35,6 @@ public CoordinatorThread(Coordinator coordinator) { @Override public void run() { - try { - coordinator.start(); - } catch (Exception e) { - LOG.error("Coordinator error during start, exiting thread", e); - terminated = true; - } - while (!terminated) { try { coordinator.process(); @@ -52,7 +45,7 @@ public void run() { } try { - coordinator.stop(); + coordinator.close(); } catch (Exception e) { LOG.error("Coordinator error during stop, ignoring", e); } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactory.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactory.java new file mode 100644 index 00000000..350cbfc1 --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import java.util.Optional; +import org.apache.kafka.connect.sink.SinkTaskContext; + +interface CoordinatorThreadFactory { + Optional create(SinkTaskContext context, IcebergSinkConfig config); +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImpl.java new file mode 100644 index 00000000..176afd04 --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImpl.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import java.util.Collection; +import java.util.Comparator; +import java.util.Optional; +import org.apache.iceberg.catalog.Catalog; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CoordinatorThreadFactoryImpl implements CoordinatorThreadFactory { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorThreadFactoryImpl.class); + private final KafkaClientFactory kafkaClientFactory; + private final Catalog catalog; + + CoordinatorThreadFactoryImpl(Catalog catalog, KafkaClientFactory kafkaClientFactory) { + this.kafkaClientFactory = kafkaClientFactory; + this.catalog = catalog; + } + + private static class TopicPartitionComparator implements Comparator { + @Override + public int compare(TopicPartition o1, TopicPartition o2) { + int result = o1.topic().compareTo(o2.topic()); + if (result == 0) { + result = Integer.compare(o1.partition(), o2.partition()); + } + return result; + } + } + + private boolean isLeader( + Collection members, Collection partitions) { + // there should only be one task assigned partition 0 of the first topic, + // so elect that one the leader + TopicPartition firstTopicPartition = + members.stream() + .flatMap(member -> member.assignment().topicPartitions().stream()) + .min(new TopicPartitionComparator()) + .orElseThrow( + () -> new ConnectException("No partitions assigned, cannot determine leader")); + + return partitions.contains(firstTopicPartition); + } + + @Override + public Optional create(SinkTaskContext context, IcebergSinkConfig config) { + CoordinatorThread thread = null; + + ConsumerGroupDescription groupDesc; + try (Admin admin = kafkaClientFactory.createAdmin()) { + groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin); + } + + if (groupDesc.state() == ConsumerGroupState.STABLE) { + Collection members = groupDesc.members(); + if (isLeader(members, context.assignment())) { + LOG.info("Task elected leader, starting commit coordinator"); + Coordinator coordinator = new Coordinator(catalog, config, members, kafkaClientFactory); + thread = new CoordinatorThread(coordinator); + thread.start(); + } + } + + return Optional.ofNullable(thread); + } +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/KafkaClientFactory.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/KafkaClientFactory.java index 0951c5e4..086d6bd6 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/KafkaClientFactory.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/KafkaClientFactory.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.UUID; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -40,14 +41,17 @@ public KafkaClientFactory(Map kafkaProps) { this.kafkaProps = kafkaProps; } - public Producer createProducer(String transactionalId) { + public Pair> createProducer(String transactionalId) { + UUID producerId = UUID.randomUUID(); + Map producerProps = Maps.newHashMap(kafkaProps); producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); KafkaProducer result = new KafkaProducer<>(producerProps, new StringSerializer(), new ByteArraySerializer()); result.initTransactions(); - return result; + + return Pair.of(producerId, result); } public Consumer createConsumer(String consumerGroupId) { diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Task.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Task.java new file mode 100644 index 00000000..0ce438df --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Task.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import java.util.Collection; +import org.apache.kafka.connect.sink.SinkRecord; + +public interface Task { + void put(Collection sinkRecords); +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/TaskImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/TaskImpl.java new file mode 100644 index 00000000..f7b7e83d --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/TaskImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import io.tabular.iceberg.connect.data.Utilities; +import java.util.Collection; +import org.apache.iceberg.catalog.Catalog; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; + +public class TaskImpl implements Task, AutoCloseable { + + private final Catalog catalog; + private final Writer writer; + private final Committer committer; + + public TaskImpl(SinkTaskContext context, IcebergSinkConfig config) { + this.catalog = Utilities.loadCatalog(config); + this.writer = new Worker(config, catalog); + this.committer = new CommitterImpl(context, config, catalog); + } + + @Override + public void put(Collection sinkRecords) { + writer.write(sinkRecords); + committer.commit(writer); + } + + @Override + public void close() throws Exception { + Utilities.close(writer); + Utilities.close(committer); + Utilities.close(catalog); + } +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java index 512471c7..7fb9d899 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java @@ -19,7 +19,6 @@ package io.tabular.iceberg.connect.channel; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.data.IcebergWriterFactory; @@ -27,87 +26,42 @@ import io.tabular.iceberg.connect.data.RecordWriter; import io.tabular.iceberg.connect.data.Utilities; import io.tabular.iceberg.connect.data.WriterResult; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventType; -import io.tabular.iceberg.connect.events.TableName; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; -import java.time.Duration; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; -import java.util.concurrent.ExecutionException; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; -import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class Worker extends Channel { +// TODO: rename to WriterImpl later, minimize changes for clearer commit history for now +class Worker implements Writer, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(Worker.class); private final IcebergSinkConfig config; private final IcebergWriterFactory writerFactory; - private final SinkTaskContext context; - private final String controlGroupId; private final Map writers; private final Map sourceOffsets; - public Worker( - IcebergSinkConfig config, - KafkaClientFactory clientFactory, - IcebergWriterFactory writerFactory, - SinkTaskContext context) { - // pass transient consumer group ID to which we never commit offsets - super( - "worker", - IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(), - config, - clientFactory); + Worker(IcebergSinkConfig config, Catalog catalog) { + this(config, new IcebergWriterFactory(catalog, config)); + } + @VisibleForTesting + Worker(IcebergSinkConfig config, IcebergWriterFactory writerFactory) { this.config = config; this.writerFactory = writerFactory; - this.context = context; - this.controlGroupId = config.controlGroupId(); this.writers = Maps.newHashMap(); this.sourceOffsets = Maps.newHashMap(); } - public void syncCommitOffsets() { - Map offsets = - commitOffsets().entrySet().stream() - .collect(toMap(Entry::getKey, entry -> entry.getValue().offset())); - context.offset(offsets); - } - - public Map commitOffsets() { - try { - ListConsumerGroupOffsetsResult response = admin().listConsumerGroupOffsets(controlGroupId); - return response.partitionsToOffsetAndMetadata().get().entrySet().stream() - .filter(entry -> context.assignment().contains(entry.getKey())) - .collect(toMap(Entry::getKey, Entry::getValue)); - } catch (InterruptedException | ExecutionException e) { - throw new ConnectException(e); - } - } - - public void process() { - consumeAvailable(Duration.ZERO); - } - @Override - protected boolean receive(Envelope envelope) { - Event event = envelope.event(); - if (event.type() != EventType.COMMIT_REQUEST) { - return false; - } - + public Committable committable() { List writeResults = writers.values().stream().flatMap(writer -> writer.complete().stream()).collect(toList()); Map offsets = Maps.newHashMap(sourceOffsets); @@ -115,60 +69,21 @@ protected boolean receive(Envelope envelope) { writers.clear(); sourceOffsets.clear(); - // include all assigned topic partitions even if no messages were read - // from a partition, as the coordinator will use that to determine - // when all data for a commit has been received - List assignments = - context.assignment().stream() - .map( - tp -> { - Offset offset = offsets.get(tp); - if (offset == null) { - offset = Offset.NULL_OFFSET; - } - return new TopicPartitionOffset( - tp.topic(), tp.partition(), offset.offset(), offset.timestamp()); - }) - .collect(toList()); - - UUID commitId = ((CommitRequestPayload) event.payload()).commitId(); - - List events = - writeResults.stream() - .map( - writeResult -> - new Event( - config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( - writeResult.partitionStruct(), - commitId, - TableName.of(writeResult.tableIdentifier()), - writeResult.dataFiles(), - writeResult.deleteFiles()))) - .collect(toList()); - - Event readyEvent = - new Event( - config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload(commitId, assignments)); - events.add(readyEvent); - - send(events, offsets); - context.requestCommit(); - - return true; + return new Committable(offsets, writeResults); } @Override - public void stop() { - super.stop(); + public void close() throws IOException { writers.values().forEach(RecordWriter::close); + writers.clear(); + sourceOffsets.clear(); } - public void save(Collection sinkRecords) { - sinkRecords.forEach(this::save); + @Override + public void write(Collection sinkRecords) { + if (sinkRecords != null && !sinkRecords.isEmpty()) { + sinkRecords.forEach(this::save); + } } private void save(SinkRecord record) { diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Writer.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Writer.java new file mode 100644 index 00000000..53f7b5db --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Writer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import java.util.Collection; +import org.apache.kafka.connect.sink.SinkRecord; + +interface Writer extends CommittableSupplier { + void write(Collection sinkRecords); +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java index 0ea6261c..6efa09be 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java @@ -19,15 +19,16 @@ package io.tabular.iceberg.connect.data; import java.util.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class Offset implements Comparable { - - public static final Offset NULL_OFFSET = new Offset(null, null); +public class Offset { private final Long offset; private final Long timestamp; public Offset(Long offset, Long timestamp) { + Preconditions.checkNotNull(offset, "offset cannot be null"); + this.offset = offset; this.timestamp = timestamp; } @@ -41,13 +42,24 @@ public Long timestamp() { } @Override - public int compareTo(Offset other) { - if (Objects.equals(this.offset, other.offset)) { - return 0; + public boolean equals(Object o) { + if (this == o) { + return true; } - if (this.offset == null || (other.offset != null && other.offset > this.offset)) { - return -1; + if (o == null || getClass() != o.getClass()) { + return false; } - return 1; + Offset offset1 = (Offset) o; + return Objects.equals(offset, offset1.offset) && Objects.equals(timestamp, offset1.timestamp); + } + + @Override + public int hashCode() { + return Objects.hash(offset, timestamp); + } + + @Override + public String toString() { + return "Offset{" + "offset=" + offset + ", timestamp=" + timestamp + '}'; } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java index 84fe76d7..01aebae8 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java @@ -264,5 +264,20 @@ private static Class dynamicallyLoad(String className) { return configClass; } + public static void close(C closeable) { + if (closeable != null) { + if (closeable instanceof AutoCloseable) { + try { + ((AutoCloseable) closeable).close(); + } catch (Exception e) { + LOG.warn( + "An error occurred while trying to close {} instance, ignoring...", + closeable.getClass().getSimpleName(), + e); + } + } + } + } + private Utilities() {} } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkTaskTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkTaskTest.java deleted file mode 100644 index 22a5913e..00000000 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/IcebergSinkTaskTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package io.tabular.iceberg.connect; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.List; -import java.util.Optional; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.kafka.clients.admin.MemberAssignment; -import org.apache.kafka.clients.admin.MemberDescription; -import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.Test; - -public class IcebergSinkTaskTest { - - @Test - public void testIsLeader() { - IcebergSinkTask task = new IcebergSinkTask(); - - MemberAssignment assignment1 = - new MemberAssignment( - ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 1))); - MemberDescription member1 = - new MemberDescription(null, Optional.empty(), null, null, assignment1); - - MemberAssignment assignment2 = - new MemberAssignment( - ImmutableSet.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1))); - MemberDescription member2 = - new MemberDescription(null, Optional.empty(), null, null, assignment2); - - List members = ImmutableList.of(member1, member2); - - List assignments = - ImmutableList.of(new TopicPartition("topic2", 1), new TopicPartition("topic1", 0)); - assertThat(task.isLeader(members, assignments)).isTrue(); - - assignments = - ImmutableList.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1)); - assertThat(task.isLeader(members, assignments)).isFalse(); - } -} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/ChannelTestBase.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/ChannelTestBase.java index 7bfcfd65..25f4a68c 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/ChannelTestBase.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/ChannelTestBase.java @@ -28,6 +28,7 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.TableSinkConfig; import java.io.IOException; +import java.util.UUID; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; @@ -36,6 +37,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.TopicDescription; @@ -53,11 +55,13 @@ public class ChannelTestBase { protected static final String SRC_TOPIC_NAME = "src-topic"; protected static final String CTL_TOPIC_NAME = "ctl-topic"; + protected static final TopicPartition CTL_TOPIC_PARTITION = new TopicPartition(CTL_TOPIC_NAME, 0); protected static final String CONTROL_CONSUMER_GROUP_ID = "cg-connector"; protected InMemoryCatalog catalog; protected Table table; protected IcebergSinkConfig config; protected KafkaClientFactory clientFactory; + protected UUID producerId; protected MockProducer producer; protected MockConsumer consumer; protected Admin admin; @@ -107,14 +111,15 @@ public void before() { admin = mock(Admin.class); when(admin.describeTopics(anyCollection())).thenReturn(describeResult); + producerId = UUID.randomUUID(); producer = new MockProducer<>(false, new StringSerializer(), new ByteArraySerializer()); producer.initTransactions(); consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); clientFactory = mock(KafkaClientFactory.class); - when(clientFactory.createProducer(any())).thenReturn(producer); when(clientFactory.createConsumer(any())).thenReturn(consumer); + when(clientFactory.createProducer(any())).thenReturn(Pair.of(producerId, producer)); when(clientFactory.createAdmin()).thenReturn(admin); } @@ -124,8 +129,7 @@ public void after() throws IOException { } protected void initConsumer() { - TopicPartition tp = new TopicPartition(CTL_TOPIC_NAME, 0); - consumer.rebalance(ImmutableList.of(tp)); - consumer.updateBeginningOffsets(ImmutableMap.of(tp, 0L)); + consumer.rebalance(ImmutableList.of(CTL_TOPIC_PARTITION)); + consumer.updateBeginningOffsets(ImmutableMap.of(CTL_TOPIC_PARTITION, 0L)); } } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java new file mode 100644 index 00000000..f87e5324 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import io.tabular.iceberg.connect.data.Offset; +import io.tabular.iceberg.connect.data.WriterResult; +import io.tabular.iceberg.connect.events.CommitCompletePayload; +import io.tabular.iceberg.connect.events.CommitReadyPayload; +import io.tabular.iceberg.connect.events.CommitRequestPayload; +import io.tabular.iceberg.connect.events.CommitResponsePayload; +import io.tabular.iceberg.connect.events.Event; +import io.tabular.iceberg.connect.events.EventTestUtil; +import io.tabular.iceberg.connect.events.EventType; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +class CommitterImplTest { + private static final String SOURCE_TOPIC = "source-topic-name"; + private static final TopicPartition SOURCE_TP0 = new TopicPartition(SOURCE_TOPIC, 0); + private static final TopicPartition SOURCE_TP1 = new TopicPartition(SOURCE_TOPIC, 1); + // note: only partition=0 is assigned + private static final Set ASSIGNED_SOURCE_TOPIC_PARTITIONS = + ImmutableSet.of(SOURCE_TP0); + private static final String CONNECTOR_NAME = "connector-name"; + private static final String TABLE_1_NAME = "db.tbl1"; + private static final TableIdentifier TABLE_1_IDENTIFIER = TableIdentifier.parse(TABLE_1_NAME); + private static final String CONTROL_TOPIC = "control-topic-name"; + private static final TopicPartition CONTROL_TOPIC_PARTITION = + new TopicPartition(CONTROL_TOPIC, 0); + private KafkaClientFactory kafkaClientFactory; + private UUID producerId; + private MockProducer producer; + private MockConsumer consumer; + private Admin admin; + + @BeforeEach + public void before() { + admin = mock(Admin.class); + + producerId = UUID.randomUUID(); + producer = new MockProducer<>(false, new StringSerializer(), new ByteArraySerializer()); + producer.initTransactions(); + + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + + kafkaClientFactory = mock(KafkaClientFactory.class); + when(kafkaClientFactory.createConsumer(any())).thenReturn(consumer); + when(kafkaClientFactory.createProducer(any())).thenReturn(Pair.of(producerId, producer)); + when(kafkaClientFactory.createAdmin()).thenReturn(admin); + } + + @AfterEach + public void after() { + producer.close(); + consumer.close(); + admin.close(); + } + + private void initConsumer() { + consumer.rebalance(ImmutableList.of(CONTROL_TOPIC_PARTITION)); + consumer.updateBeginningOffsets(ImmutableMap.of(CONTROL_TOPIC_PARTITION, 0L)); + } + + private static IcebergSinkConfig makeConfig(int taskId) { + return new IcebergSinkConfig( + ImmutableMap.of( + "name", + CONNECTOR_NAME, + "iceberg.catalog.catalog-impl", + "org.apache.iceberg.inmemory.InMemoryCatalog", + "iceberg.tables", + TABLE_1_NAME, + "iceberg.control.topic", + CONTROL_TOPIC, + IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP, + "-txn-" + UUID.randomUUID() + "-" + taskId)); + } + + private static final IcebergSinkConfig CONFIG = makeConfig(1); + + private SinkTaskContext mockContext() { + SinkTaskContext mockContext = mock(SinkTaskContext.class); + when(mockContext.assignment()).thenReturn(ASSIGNED_SOURCE_TOPIC_PARTITIONS); + return mockContext; + } + + private static DynConstructors.Ctor ctorCoordinatorKey() { + return DynConstructors.builder(CoordinatorKey.class) + .hiddenImpl( + "org.apache.kafka.clients.admin.internals.CoordinatorKey", + FindCoordinatorRequest.CoordinatorType.class, + String.class) + .build(); + } + + private static DynConstructors.Ctor + ctorListConsumerGroupOffsetsResult() { + return DynConstructors.builder(ListConsumerGroupOffsetsResult.class) + .hiddenImpl("org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult", Map.class) + .build(); + } + + private final CoordinatorKey coordinatorKey = + ctorCoordinatorKey() + .newInstance(FindCoordinatorRequest.CoordinatorType.GROUP, "fakeCoordinatorKey"); + + @SuppressWarnings("deprecation") + private static ListConsumerGroupOffsetsOptions listOffsetResultMatcher() { + return argThat(x -> x.topicPartitions() == null && x.requireStable()); + } + + private ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult( + Map consumerOffsets) { + return ctorListConsumerGroupOffsetsResult() + .newInstance( + ImmutableMap.of( + coordinatorKey, + KafkaFuture.completedFuture( + consumerOffsets.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())))))); + } + + private void whenAdminListConsumerGroupOffsetsThenReturn( + Map> consumersOffsets) { + consumersOffsets.forEach( + (consumerGroup, consumerOffsets) -> { + when(admin.listConsumerGroupOffsets(eq(consumerGroup), listOffsetResultMatcher())) + .thenReturn(listConsumerGroupOffsetsResult(consumerOffsets)); + }); + } + + private static class NoOpCoordinatorThreadFactory implements CoordinatorThreadFactory { + int numTimesCalled = 0; + + @Override + public Optional create(SinkTaskContext context, IcebergSinkConfig config) { + numTimesCalled += 1; + CoordinatorThread mockThread = mock(CoordinatorThread.class); + Mockito.doNothing().when(mockThread).start(); + Mockito.doNothing().when(mockThread).terminate(); + return Optional.of(mockThread); + } + } + + private static class TerminatedCoordinatorThreadFactory implements CoordinatorThreadFactory { + @Override + public Optional create(SinkTaskContext context, IcebergSinkConfig config) { + CoordinatorThread mockThread = mock(CoordinatorThread.class); + Mockito.doNothing().when(mockThread).start(); + Mockito.doNothing().when(mockThread).terminate(); + Mockito.doReturn(true).when(mockThread).isTerminated(); + return Optional.of(mockThread); + } + } + + private static String toPath(ContentFile contentFile) { + return contentFile.path().toString(); + } + + private static > void assertSameContentFiles( + List actual, List expected) { + assertThat(actual.stream().map(CommitterImplTest::toPath).collect(Collectors.toList())) + .containsExactlyElementsOf( + expected.stream().map(CommitterImplTest::toPath).collect(Collectors.toList())); + } + + private void assertCommitResponse( + ProducerRecord producerRecord, + UUID expectedProducerId, + UUID expectedCommitId, + TableIdentifier expectedTableIdentifier, + List expectedDataFiles, + List expectedDeleteFiles) { + assertThat(producerRecord.key()).isEqualTo(expectedProducerId.toString()); + + Event event = Event.decode(producerRecord.value()); + assertThat(event.type()).isEqualTo(EventType.COMMIT_RESPONSE); + assertThat(event.payload()).isInstanceOf(CommitResponsePayload.class); + CommitResponsePayload commitResponsePayload = (CommitResponsePayload) event.payload(); + assertThat(commitResponsePayload.commitId()).isEqualTo(expectedCommitId); + assertThat(commitResponsePayload.tableName().toIdentifier()).isEqualTo(expectedTableIdentifier); + assertSameContentFiles(commitResponsePayload.dataFiles(), expectedDataFiles); + assertSameContentFiles(commitResponsePayload.deleteFiles(), expectedDeleteFiles); + } + + private void assertCommitReady( + ProducerRecord producerRecord, + UUID expectedProducerId, + UUID expectedCommitId, + Map> expectedAssignments) { + assertThat(producerRecord.key()).isEqualTo(expectedProducerId.toString()); + + Event event = Event.decode(producerRecord.value()); + assertThat(event.type()).isEqualTo(EventType.COMMIT_READY); + assertThat(event.payload()).isInstanceOf(CommitReadyPayload.class); + CommitReadyPayload commitReadyPayload = (CommitReadyPayload) event.payload(); + assertThat(commitReadyPayload.commitId()).isEqualTo(expectedCommitId); + assertThat( + commitReadyPayload.assignments().stream() + .map( + x -> + Pair.of( + new TopicPartition(x.topic(), x.partition()), + Pair.of(x.offset(), x.timestamp()))) + .collect(Collectors.toList())) + .isEqualTo( + expectedAssignments.entrySet().stream() + .map(e -> Pair.of(e.getKey(), e.getValue())) + .collect(Collectors.toList())); + } + + @Test + public void + testShouldRewindOffsetsToStableControlGroupConsumerOffsetsForAssignedPartitionsOnConstruction() + throws IOException { + SinkTaskContext mockContext = mockContext(); + + ArgumentCaptor> offsetArgumentCaptor = + ArgumentCaptor.forClass(Map.class); + + IcebergSinkConfig config = makeConfig(1); + + NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); + + whenAdminListConsumerGroupOffsetsThenReturn( + ImmutableMap.of( + config.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L), + config.connectGroupId(), ImmutableMap.of(SOURCE_TP0, 90L, SOURCE_TP1, 80L))); + + try (CommitterImpl ignored = + new CommitterImpl(mockContext, config, kafkaClientFactory, coordinatorThreadFactory)) { + initConsumer(); + + verify(mockContext).offset(offsetArgumentCaptor.capture()); + assertThat(offsetArgumentCaptor.getAllValues()) + .isEqualTo(ImmutableList.of(ImmutableMap.of(SOURCE_TP0, 110L))); + } + } + + @Test + public void testCommitShouldThrowExceptionIfCoordinatorIsTerminated() throws IOException { + SinkTaskContext mockContext = mockContext(); + IcebergSinkConfig config = makeConfig(0); + + whenAdminListConsumerGroupOffsetsThenReturn( + ImmutableMap.of( + config.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + + TerminatedCoordinatorThreadFactory coordinatorThreadFactory = + new TerminatedCoordinatorThreadFactory(); + + CommittableSupplier committableSupplier = + () -> { + throw new NotImplementedException("Should not be called"); + }; + + try (CommitterImpl committerImpl = + new CommitterImpl(mockContext, config, kafkaClientFactory, coordinatorThreadFactory)) { + initConsumer(); + Committer committer = committerImpl; + + assertThatThrownBy(() -> committer.commit(committableSupplier)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Coordinator unexpectedly terminated"); + + assertThat(producer.history()).isEmpty(); + assertThat(producer.consumerGroupOffsetsHistory()).isEmpty(); + } + } + + @Test + public void testCommitShouldDoNothingIfThereAreNoMessages() throws IOException { + SinkTaskContext mockContext = mockContext(); + + NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); + + whenAdminListConsumerGroupOffsetsThenReturn( + ImmutableMap.of( + CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + + CommittableSupplier committableSupplier = + () -> { + throw new NotImplementedException("Should not be called"); + }; + + try (CommitterImpl committerImpl = + new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + initConsumer(); + Committer committer = committerImpl; + + committer.commit(committableSupplier); + + assertThat(producer.history()).isEmpty(); + assertThat(producer.consumerGroupOffsetsHistory()).isEmpty(); + } + } + + @Test + public void testCommitShouldDoNothingIfThereIsNoCommitRequestMessage() throws IOException { + SinkTaskContext mockContext = mockContext(); + + NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); + + whenAdminListConsumerGroupOffsetsThenReturn( + ImmutableMap.of( + CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + + CommittableSupplier committableSupplier = + () -> { + throw new NotImplementedException("Should not be called"); + }; + + try (CommitterImpl committerImpl = + new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + initConsumer(); + Committer committer = committerImpl; + + consumer.addRecord( + new ConsumerRecord<>( + CONTROL_TOPIC, + CONTROL_TOPIC_PARTITION.partition(), + 0, + UUID.randomUUID().toString(), + Event.encode( + new Event( + CONFIG.controlGroupId(), + EventType.COMMIT_COMPLETE, + new CommitCompletePayload(UUID.randomUUID(), 100L))))); + + committer.commit(committableSupplier); + + assertThat(producer.history()).isEmpty(); + assertThat(producer.consumerGroupOffsetsHistory()).isEmpty(); + } + } + + @Test + public void testCommitShouldRespondToCommitRequest() throws IOException { + SinkTaskContext mockContext = mockContext(); + + NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); + UUID commitId = UUID.randomUUID(); + + whenAdminListConsumerGroupOffsetsThenReturn( + ImmutableMap.of( + CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + + List dataFiles = ImmutableList.of(EventTestUtil.createDataFile()); + List deleteFiles = ImmutableList.of(); + Types.StructType partitionStruct = Types.StructType.of(); + Map sourceOffsets = ImmutableMap.of(SOURCE_TP0, new Offset(100L, 200L)); + CommittableSupplier committableSupplier = + () -> + new Committable( + sourceOffsets, + ImmutableList.of( + new WriterResult(TABLE_1_IDENTIFIER, dataFiles, deleteFiles, partitionStruct))); + + try (CommitterImpl committerImpl = + new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + initConsumer(); + Committer committer = committerImpl; + + consumer.addRecord( + new ConsumerRecord<>( + CONTROL_TOPIC_PARTITION.topic(), + CONTROL_TOPIC_PARTITION.partition(), + 0, + UUID.randomUUID().toString(), + Event.encode( + new Event( + CONFIG.controlGroupId(), + EventType.COMMIT_REQUEST, + new CommitRequestPayload(commitId))))); + + committer.commit(committableSupplier); + + assertThat(producer.transactionCommitted()).isTrue(); + assertThat(producer.history()).hasSize(2); + assertCommitResponse( + producer.history().get(0), + producerId, + commitId, + TABLE_1_IDENTIFIER, + dataFiles, + deleteFiles); + assertCommitReady( + producer.history().get(1), + producerId, + commitId, + ImmutableMap.of(SOURCE_TP0, Pair.of(100L, 200L))); + + assertThat(producer.consumerGroupOffsetsHistory()).hasSize(2); + Map expectedConsumerOffset = + ImmutableMap.of(SOURCE_TP0, new OffsetAndMetadata(100L)); + assertThat(producer.consumerGroupOffsetsHistory().get(0)) + .isEqualTo(ImmutableMap.of(CONFIG.controlGroupId(), expectedConsumerOffset)); + assertThat(producer.consumerGroupOffsetsHistory().get(1)) + .isEqualTo(ImmutableMap.of(CONFIG.connectGroupId(), expectedConsumerOffset)); + } + } + + @Test + public void testCommitWhenCommittableIsEmpty() throws IOException { + SinkTaskContext mockContext = mockContext(); + + NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); + + UUID commitId = UUID.randomUUID(); + + whenAdminListConsumerGroupOffsetsThenReturn( + ImmutableMap.of( + CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + + CommittableSupplier committableSupplier = + () -> new Committable(ImmutableMap.of(), ImmutableList.of()); + + try (CommitterImpl committerImpl = + new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + initConsumer(); + Committer committer = committerImpl; + + consumer.addRecord( + new ConsumerRecord<>( + CONTROL_TOPIC_PARTITION.topic(), + CONTROL_TOPIC_PARTITION.partition(), + 0, + UUID.randomUUID().toString(), + Event.encode( + new Event( + CONFIG.controlGroupId(), + EventType.COMMIT_REQUEST, + new CommitRequestPayload(commitId))))); + + committer.commit(committableSupplier); + + assertThat(producer.transactionCommitted()).isTrue(); + assertThat(producer.history()).hasSize(1); + assertCommitReady( + producer.history().get(0), + producerId, + commitId, + ImmutableMap.of(SOURCE_TP0, Pair.of(null, null))); + + assertThat(producer.consumerGroupOffsetsHistory()).hasSize(0); + } + } + + @Test + public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() throws IOException { + SinkTaskContext mockContext = mockContext(); + + NoOpCoordinatorThreadFactory coordinatorThreadFactory = new NoOpCoordinatorThreadFactory(); + + TopicPartition sourceTp0 = new TopicPartition(SOURCE_TOPIC, 0); + TopicPartition sourceTp1 = new TopicPartition(SOURCE_TOPIC, 1); + Set sourceTopicPartitions = ImmutableSet.of(sourceTp0, sourceTp1); + + when(mockContext.assignment()).thenReturn(sourceTopicPartitions); + + UUID commitId = UUID.randomUUID(); + + whenAdminListConsumerGroupOffsetsThenReturn( + ImmutableMap.of( + CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); + + List dataFiles = ImmutableList.of(EventTestUtil.createDataFile()); + List deleteFiles = ImmutableList.of(); + Types.StructType partitionStruct = Types.StructType.of(); + CommittableSupplier committableSupplier = + () -> + new Committable( + ImmutableMap.of(sourceTp1, new Offset(100L, 200L)), + ImmutableList.of( + new WriterResult(TABLE_1_IDENTIFIER, dataFiles, deleteFiles, partitionStruct))); + + try (CommitterImpl committerImpl = + new CommitterImpl(mockContext, CONFIG, kafkaClientFactory, coordinatorThreadFactory)) { + initConsumer(); + Committer committer = committerImpl; + + consumer.addRecord( + new ConsumerRecord<>( + CONTROL_TOPIC_PARTITION.topic(), + CONTROL_TOPIC_PARTITION.partition(), + 0, + UUID.randomUUID().toString(), + Event.encode( + new Event( + CONFIG.controlGroupId(), + EventType.COMMIT_REQUEST, + new CommitRequestPayload(commitId))))); + + committer.commit(committableSupplier); + + assertThat(producer.transactionCommitted()).isTrue(); + assertThat(producer.history()).hasSize(2); + assertCommitResponse( + producer.history().get(0), + producerId, + commitId, + TABLE_1_IDENTIFIER, + dataFiles, + deleteFiles); + assertCommitReady( + producer.history().get(1), + producerId, + commitId, + ImmutableMap.of( + sourceTp0, Pair.of(null, null), + sourceTp1, Pair.of(100L, 200L))); + + assertThat(producer.consumerGroupOffsetsHistory()).hasSize(2); + Map expectedConsumerOffset = + ImmutableMap.of(sourceTp1, new OffsetAndMetadata(100L)); + assertThat(producer.consumerGroupOffsetsHistory().get(0)) + .isEqualTo(ImmutableMap.of(CONFIG.controlGroupId(), expectedConsumerOffset)); + assertThat(producer.consumerGroupOffsetsHistory().get(1)) + .isEqualTo(ImmutableMap.of(CONFIG.connectGroupId(), expectedConsumerOffset)); + } + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java index 353036fd..d263206d 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.clients.admin.MemberAssignment; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -70,6 +71,8 @@ public void testCommitAppend() { table.refresh(); assertThat(producer.history()).hasSize(3); + assertThat(consumer.committed(ImmutableSet.of(CTL_TOPIC_PARTITION))) + .isEqualTo(ImmutableMap.of(CTL_TOPIC_PARTITION, new OffsetAndMetadata(3L))); assertCommitTable(1, commitId, ts); assertCommitComplete(2, commitId, ts); @@ -97,6 +100,8 @@ public void testCommitDelta() { ts); assertThat(producer.history()).hasSize(3); + assertThat(consumer.committed(ImmutableSet.of(CTL_TOPIC_PARTITION))) + .isEqualTo(ImmutableMap.of(CTL_TOPIC_PARTITION, new OffsetAndMetadata(3L))); assertCommitTable(1, commitId, ts); assertCommitComplete(2, commitId, ts); @@ -120,6 +125,8 @@ public void testCommitNoFiles() { UUID commitId = coordinatorTest(ImmutableList.of(), ImmutableList.of(), ts); assertThat(producer.history()).hasSize(2); + assertThat(consumer.committed(ImmutableSet.of(CTL_TOPIC_PARTITION))) + .isEqualTo(ImmutableMap.of(CTL_TOPIC_PARTITION, new OffsetAndMetadata(3L))); assertCommitComplete(1, commitId, ts); List snapshots = ImmutableList.copyOf(table.snapshots()); @@ -143,6 +150,8 @@ public void testCommitError() { // no commit messages sent assertThat(producer.history()).hasSize(1); + assertThat(consumer.committed(ImmutableSet.of(CTL_TOPIC_PARTITION))) + .isEqualTo(ImmutableMap.of()); List snapshots = ImmutableList.copyOf(table.snapshots()); Assertions.assertEquals(0, snapshots.size()); @@ -284,7 +293,6 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { } final Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory); - coordinator.start(); initConsumer(); // start a new commit immediately and wait for all workers to respond infinitely @@ -442,7 +450,6 @@ private UUID coordinatorTest(Function> eventsFn) { when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE); Coordinator coordinator = new Coordinator(catalog, config, ImmutableList.of(), clientFactory); - coordinator.start(); // init consumer after subscribe() initConsumer(); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImplTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImplTest.java new file mode 100644 index 00000000..6be1d9b6 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImplTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.util.Pair; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.admin.MemberAssignment; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CoordinatorThreadFactoryImplTest { + + private static final String CONNECTOR_NAME = "connector-name"; + private static final String TABLE_1_NAME = "db.tbl1"; + private static final String CONTROL_TOPIC = "control-topic-name"; + private static final IcebergSinkConfig BASIC_CONFIGS = + new IcebergSinkConfig( + ImmutableMap.of( + "name", + CONNECTOR_NAME, + "iceberg.catalog.catalog-impl", + "org.apache.iceberg.inmemory.InMemoryCatalog", + "iceberg.tables", + TABLE_1_NAME, + "iceberg.control.topic", + CONTROL_TOPIC, + IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP, + "-txn-" + UUID.randomUUID() + "-" + 0)); + + private static final String TOPIC_0 = "source-topic-name-0"; + private static final String TOPIC_1 = "source-topic-name-1"; + private static final TopicPartition T0P0 = new TopicPartition(TOPIC_0, 0); + private static final TopicPartition T0P1 = new TopicPartition(TOPIC_0, 1); + private static final TopicPartition T1P0 = new TopicPartition(TOPIC_1, 0); + private static final TopicPartition T1P1 = new TopicPartition(TOPIC_1, 1); + private static final Set LEADER_ASSIGNMENT = ImmutableSet.of(T0P0, T1P1); + private static final Set NON_LEADER_ASSIGNMENT = ImmutableSet.of(T0P1, T1P0); + private static final List MEMBER_DESCRIPTIONS = + ImmutableList.of( + new MemberDescription(null, null, null, new MemberAssignment(LEADER_ASSIGNMENT)), + new MemberDescription(null, null, null, new MemberAssignment(NON_LEADER_ASSIGNMENT))); + + private KafkaClientFactory kafkaClientFactory; + private UUID producerId; + private MockProducer producer; + private MockConsumer consumer; + private Admin admin; + + @BeforeEach + public void before() { + admin = mock(Admin.class); + + producerId = UUID.randomUUID(); + producer = new MockProducer<>(false, new StringSerializer(), new ByteArraySerializer()); + producer.initTransactions(); + + consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + kafkaClientFactory = mock(KafkaClientFactory.class); + when(kafkaClientFactory.createConsumer(any())).thenReturn(consumer); + when(kafkaClientFactory.createProducer(any())).thenReturn(Pair.of(producerId, producer)); + when(kafkaClientFactory.createAdmin()).thenReturn(admin); + } + + @AfterEach + public void after() { + producer.close(); + consumer.close(); + admin.close(); + } + + private void whenAdminDescribeConsumerGroupThenReturn( + ConsumerGroupState consumerGroupState, List memberDescriptions) { + String connectGroupId = BASIC_CONFIGS.connectGroupId(); + + when(admin.describeConsumerGroups(eq(ImmutableList.of(connectGroupId)))) + .thenReturn( + new DescribeConsumerGroupsResult( + ImmutableMap.of( + connectGroupId, + KafkaFuture.completedFuture( + new ConsumerGroupDescription( + connectGroupId, + true, + memberDescriptions, + null, + consumerGroupState, + mock(Node.class)))))); + } + + private void whenAdminDescribeConsumerGroupThenReturn(ConsumerGroupState consumerGroupState) { + whenAdminDescribeConsumerGroupThenReturn(consumerGroupState, MEMBER_DESCRIPTIONS); + } + + @Test + public void testShouldReturnEmptyIfNotLeader() { + whenAdminDescribeConsumerGroupThenReturn(ConsumerGroupState.STABLE); + + CoordinatorThreadFactoryImpl coordinatorThreadFactory = + new CoordinatorThreadFactoryImpl(mock(Catalog.class), kafkaClientFactory); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + when(sinkTaskContext.assignment()).thenReturn(NON_LEADER_ASSIGNMENT); + + Optional maybeCoordinatorThread = + coordinatorThreadFactory.create(sinkTaskContext, BASIC_CONFIGS); + try { + assertThat(maybeCoordinatorThread).isEmpty(); + } finally { + maybeCoordinatorThread.ifPresent(CoordinatorThread::terminate); + } + } + + @Test + public void testShouldReturnEmptyIfLeaderButGroupIsNotStable() { + whenAdminDescribeConsumerGroupThenReturn(ConsumerGroupState.UNKNOWN); + + CoordinatorThreadFactoryImpl coordinatorThreadFactory = + new CoordinatorThreadFactoryImpl(mock(Catalog.class), kafkaClientFactory); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + when(sinkTaskContext.assignment()).thenReturn(LEADER_ASSIGNMENT); + + Optional maybeCoordinatorThread = + coordinatorThreadFactory.create(sinkTaskContext, BASIC_CONFIGS); + try { + assertThat(maybeCoordinatorThread).isEmpty(); + } finally { + maybeCoordinatorThread.ifPresent(CoordinatorThread::terminate); + } + } + + @Test + public void testShouldReturnThreadIfLeaderAndGroupIsStable() { + whenAdminDescribeConsumerGroupThenReturn(ConsumerGroupState.STABLE); + + CoordinatorThreadFactoryImpl coordinatorThreadFactory = + new CoordinatorThreadFactoryImpl(mock(Catalog.class), kafkaClientFactory); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + when(sinkTaskContext.assignment()).thenReturn(LEADER_ASSIGNMENT); + + Optional maybeCoordinatorThread = + coordinatorThreadFactory.create(sinkTaskContext, BASIC_CONFIGS); + try { + assertThat(maybeCoordinatorThread).isPresent(); + } finally { + maybeCoordinatorThread.ifPresent(CoordinatorThread::terminate); + } + } + + @Test + public void testShouldThrowExceptionIfNoPartitionsAssigned() { + // This could happen if a connector is configured with a topics.regex that doesn't match any + // topics in cluster + + whenAdminDescribeConsumerGroupThenReturn( + ConsumerGroupState.STABLE, + ImmutableList.of( + new MemberDescription(null, null, null, new MemberAssignment(ImmutableSet.of())), + new MemberDescription(null, null, null, new MemberAssignment(ImmutableSet.of())))); + + CoordinatorThreadFactoryImpl coordinatorThreadFactory = + new CoordinatorThreadFactoryImpl(mock(Catalog.class), kafkaClientFactory); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + when(sinkTaskContext.assignment()).thenReturn(ImmutableSet.of()); + + assertThatThrownBy(() -> coordinatorThreadFactory.create(sinkTaskContext, BASIC_CONFIGS)) + .isInstanceOf(ConnectException.class) + .hasMessage("No partitions assigned, cannot determine leader"); + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java index c7954900..78a627fd 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java @@ -24,52 +24,42 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.data.IcebergWriter; import io.tabular.iceberg.connect.data.IcebergWriterFactory; import io.tabular.iceberg.connect.data.WriterResult; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.Event; import io.tabular.iceberg.connect.events.EventTestUtil; -import io.tabular.iceberg.connect.events.EventType; import java.util.Map; -import java.util.UUID; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.types.Types.StructType; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; -import org.apache.kafka.connect.sink.SinkTaskContext; import org.junit.jupiter.api.Test; -public class WorkerTest extends ChannelTestBase { - +public class WorkerTest { + private static final String SRC_TOPIC_NAME = "src-topic"; private static final String TABLE_NAME = "db.tbl"; private static final String FIELD_NAME = "fld"; @Test public void testStaticRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); when(config.tables()).thenReturn(ImmutableList.of(TABLE_NAME)); Map value = ImmutableMap.of(FIELD_NAME, "val"); - workerTest(value); + workerTest(config, value); } @Test public void testDynamicRoute() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); when(config.dynamicTablesEnabled()).thenReturn(true); when(config.tablesRouteField()).thenReturn(FIELD_NAME); Map value = ImmutableMap.of(FIELD_NAME, TABLE_NAME); - workerTest(value); + workerTest(config, value); } - private void workerTest(Map value) { - SinkTaskContext context = mock(SinkTaskContext.class); - when(context.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(SRC_TOPIC_NAME, 0))); - + private void workerTest(IcebergSinkConfig config, Map value) { WriterResult writeResult = new WriterResult( TableIdentifier.parse(TABLE_NAME), @@ -82,38 +72,21 @@ private void workerTest(Map value) { IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class); when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer); - Worker worker = new Worker(config, clientFactory, writerFactory, context); - worker.start(); - - // init consumer after subscribe() - initConsumer(); + Writer worker = new Worker(config, writerFactory); // save a record SinkRecord rec = new SinkRecord(SRC_TOPIC_NAME, 0, null, "key", null, value, 0L); - worker.save(ImmutableList.of(rec)); - - UUID commitId = UUID.randomUUID(); - Event commitRequest = - new Event( - config.controlGroupId(), EventType.COMMIT_REQUEST, new CommitRequestPayload(commitId)); - byte[] bytes = Event.encode(commitRequest); - consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes)); - - worker.process(); - - assertThat(producer.history()).hasSize(2); + worker.write(ImmutableList.of(rec)); - Event event = Event.decode(producer.history().get(0).value()); - assertThat(event.type()).isEqualTo(EventType.COMMIT_RESPONSE); - CommitResponsePayload responsePayload = (CommitResponsePayload) event.payload(); - assertThat(responsePayload.commitId()).isEqualTo(commitId); + Committable committable = worker.committable(); - event = Event.decode(producer.history().get(1).value()); - assertThat(event.type()).isEqualTo(EventType.COMMIT_READY); - CommitReadyPayload readyPayload = (CommitReadyPayload) event.payload(); - assertThat(readyPayload.commitId()).isEqualTo(commitId); - assertThat(readyPayload.assignments()).hasSize(1); + assertThat(committable.offsetsByTopicPartition()).hasSize(1); // offset should be one more than the record offset - assertThat(readyPayload.assignments().get(0).offset()).isEqualTo(1L); + assertThat( + committable + .offsetsByTopicPartition() + .get(committable.offsetsByTopicPartition().keySet().iterator().next()) + .offset()) + .isEqualTo(1L); } } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/OffsetTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/OffsetTest.java deleted file mode 100644 index 6b1cffa2..00000000 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/OffsetTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package io.tabular.iceberg.connect.data; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.junit.jupiter.api.Test; - -public class OffsetTest { - - @Test - public void testOffsetEquals() { - assertThat(new Offset(null, null).compareTo(new Offset(null, null))).isEqualTo(0); - assertThat(new Offset(1L, null).compareTo(new Offset(1L, null))).isEqualTo(0); - } - - @Test - public void testOffsetLessThan() { - assertThat(new Offset(null, null).compareTo(new Offset(1L, null))).isEqualTo(-1); - assertThat(new Offset(1L, null).compareTo(new Offset(2L, null))).isEqualTo(-1); - } - - @Test - public void testOffsetGreaterThan() { - assertThat(new Offset(1L, null).compareTo(new Offset(null, null))).isEqualTo(1); - assertThat(new Offset(2L, null).compareTo(new Offset(1L, null))).isEqualTo(1); - } -}