diff --git a/src/main/java/io/confluent/connect/hdfs/DataWriter.java b/src/main/java/io/confluent/connect/hdfs/DataWriter.java index 26ee8bb06..249709e14 100644 --- a/src/main/java/io/confluent/connect/hdfs/DataWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/DataWriter.java @@ -465,15 +465,12 @@ public void open(Collection partitions) { } public void close() { - // Close any writers we have. We may get assigned the same partitions and end up duplicating - // some effort since we'll have to reprocess those messages. It may be possible to hold on to - // the TopicPartitionWriter and continue to use the temp file, but this can get significantly - // more complex due to potential failures and network partitions. For example, we may get - // this close, then miss a few generations of group membership, during which - // data may have continued to be processed and we'd have to restart from the recovery stage, - // make sure we apply the WAL, and only reuse the temp file if the starting offset is still - // valid. For now, we prefer the simpler solution that may result in a bit of wasted effort. - for (TopicPartitionWriter writer : topicPartitionWriters.values()) { + close(new HashSet<>(topicPartitionWriters.keySet())); + } + + public void close(Collection partitions) { + for (TopicPartition partition: partitions) { + TopicPartitionWriter writer = topicPartitionWriters.get(partition); try { if (writer != null) { // In some failure modes, the writer might not have been created for all assignments @@ -482,8 +479,8 @@ public void close() { } catch (ConnectException e) { log.warn("Unable to close writer for topic partition {}: ", writer.topicPartition(), e); } + topicPartitionWriters.remove(partition); } - topicPartitionWriters.clear(); } public void stop() { diff --git a/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java b/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java index 28f9b91ec..f71d4fc2c 100644 --- a/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java +++ b/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java @@ -166,7 +166,7 @@ public void open(Collection partitions) { public void close(Collection partitions) { log.debug("Closing HDFS Sink Task {}", connectorNameAndTaskId); if (hdfsWriter != null) { - hdfsWriter.close(); + hdfsWriter.close(partitions); } } diff --git a/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java b/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java index cad6dd31b..215dbe601 100644 --- a/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java +++ b/src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -341,6 +342,43 @@ final int record = 12; } } + @Test + public void testPartialRevocation() throws Exception { + setUp(); + + Collection initialAssignment = new ArrayList<>(); + initialAssignment.add(TOPIC_PARTITION); + initialAssignment.add(TOPIC_PARTITION2); + initialAssignment.add(TOPIC_PARTITION3); + + Collection revokedPartitions = new ArrayList<>(); + revokedPartitions.add(TOPIC_PARTITION3); + + String key = "key"; + Schema schema = createSchema(); + Struct record = createRecord(schema); + Collection sinkRecords = Collections.singleton( + new SinkRecord(TOPIC_PARTITION.topic(), TOPIC_PARTITION.partition(), + Schema.STRING_SCHEMA, key, schema, record, 0)); + + HdfsSinkTask task = new HdfsSinkTask(); + task.initialize(context); + task.start(properties); + + // Given 3 owned partitions + task.open(initialAssignment); + + // When 1 partition revoked (partial revocation) + task.close(revokedPartitions); + + try { + // Should continue processing messages from the 2 left partitions (should succeed) + task.put(sinkRecords); + } finally { + task.stop(); + } + } + private void createCommittedFiles() throws IOException { String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); String file1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 0,