Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Co-operative Rebalancing Support for HDFS Connector (#625) #712

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions src/main/java/io/confluent/connect/hdfs/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,15 +465,12 @@ public void open(Collection<TopicPartition> partitions) {
}

public void close() {
// Close any writers we have. We may get assigned the same partitions and end up duplicating
// some effort since we'll have to reprocess those messages. It may be possible to hold on to
// the TopicPartitionWriter and continue to use the temp file, but this can get significantly
// more complex due to potential failures and network partitions. For example, we may get
// this close, then miss a few generations of group membership, during which
// data may have continued to be processed and we'd have to restart from the recovery stage,
// make sure we apply the WAL, and only reuse the temp file if the starting offset is still
// valid. For now, we prefer the simpler solution that may result in a bit of wasted effort.
for (TopicPartitionWriter writer : topicPartitionWriters.values()) {
close(new HashSet<>(topicPartitionWriters.keySet()));
}

public void close(Collection<TopicPartition> partitions) {
for (TopicPartition partition: partitions) {
TopicPartitionWriter writer = topicPartitionWriters.get(partition);
try {
if (writer != null) {
// In some failure modes, the writer might not have been created for all assignments
Expand All @@ -482,8 +479,8 @@ public void close() {
} catch (ConnectException e) {
log.warn("Unable to close writer for topic partition {}: ", writer.topicPartition(), e);
}
topicPartitionWriters.remove(partition);
}
topicPartitionWriters.clear();
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void open(Collection<TopicPartition> partitions) {
public void close(Collection<TopicPartition> partitions) {
log.debug("Closing HDFS Sink Task {}", connectorNameAndTaskId);
if (hdfsWriter != null) {
hdfsWriter.close();
hdfsWriter.close(partitions);
}
}

Expand Down
38 changes: 38 additions & 0 deletions src/test/java/io/confluent/connect/hdfs/HdfsSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -341,6 +342,43 @@ final int record = 12;
}
}

@Test
public void testPartialRevocation() throws Exception {
setUp();

Collection<TopicPartition> initialAssignment = new ArrayList<>();
initialAssignment.add(TOPIC_PARTITION);
initialAssignment.add(TOPIC_PARTITION2);
initialAssignment.add(TOPIC_PARTITION3);

Collection<TopicPartition> revokedPartitions = new ArrayList<>();
revokedPartitions.add(TOPIC_PARTITION3);

String key = "key";
Schema schema = createSchema();
Struct record = createRecord(schema);
Collection<SinkRecord> sinkRecords = Collections.singleton(
new SinkRecord(TOPIC_PARTITION.topic(), TOPIC_PARTITION.partition(),
Schema.STRING_SCHEMA, key, schema, record, 0));

HdfsSinkTask task = new HdfsSinkTask();
task.initialize(context);
task.start(properties);

// Given 3 owned partitions
task.open(initialAssignment);

// When 1 partition revoked (partial revocation)
task.close(revokedPartitions);

try {
// Should continue processing messages from the 2 left partitions (should succeed)
task.put(sinkRecords);
} finally {
task.stop();
}
}

private void createCommittedFiles() throws IOException {
String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic());
String file1 = FileUtils.committedFileName(url, topicsDir, DIRECTORY1, TOPIC_PARTITION, 0,
Expand Down