Skip to content

Commit

Permalink
[flink] supports writing to multiple partitions of kafka in unaware b…
Browse files Browse the repository at this point in the history
…ucket mode.
  • Loading branch information
liming30 committed Dec 6, 2023
1 parent 182d7c4 commit fb09108
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Restorable;

import java.util.List;
Expand All @@ -48,6 +49,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State
private final RecordExtractor<T> recordExtractor;

private boolean batchCommitted = false;
private BucketMode bucketMode;

public TableWriteImpl(
FileStoreWrite<T> write,
Expand Down Expand Up @@ -92,6 +94,11 @@ public TableWriteImpl<T> withCompactExecutor(ExecutorService compactExecutor) {
return this;
}

public TableWriteImpl<T> withBucketMode(BucketMode bucketMode) {
this.bucketMode = bucketMode;
return this;
}

@Override
public BinaryRow getPartition(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
Expand Down Expand Up @@ -136,7 +143,7 @@ public SinkRecord toLogRecord(SinkRecord record) {
keyAndBucketExtractor.setRecord(record.row());
return new SinkRecord(
record.partition(),
record.bucket(),
bucketMode == BucketMode.UNAWARE ? -1 : record.bucket(),
keyAndBucketExtractor.logPrimaryKey(),
record.row());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public ProducerRecord<byte[], byte[]> serialize(SinkRecord element, @Nullable Lo
} else {
valueBytes = valueSerializer.serialize(new FlinkRowData(element.row()));
}
return new ProducerRecord<>(topic, element.bucket(), primaryKeyBytes, valueBytes);
Integer partition = element.bucket() < 0 ? null : element.bucket();
return new ProducerRecord<>(topic, partition, primaryKeyBytes, valueBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public void testNonKeyed() throws Exception {
checkNonKeyed(LogChangelogMode.ALL, 2, 5, 3);
}

@Test
public void testUnawareBucket() throws Exception {
checkNonKeyed(LogChangelogMode.AUTO, -1, 3, 5);
checkNonKeyed(LogChangelogMode.ALL, -1, 5, 3);
}

private void checkKeyed(LogChangelogMode mode, int bucket, int key, int value)
throws Exception {
check(mode, true, bucket, key, value, RowKind.INSERT);
Expand Down Expand Up @@ -92,7 +98,11 @@ private void check(
SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
ProducerRecord<byte[], byte[]> record = serializer.serialize(input, null);

assertThat(record.partition().intValue()).isEqualTo(bucket);
if (bucket >= 0) {
assertThat(record.partition().intValue()).isEqualTo(bucket);
} else {
assertThat(record.partition()).isNull();
}

AtomicReference<RowData> rowReference = new AtomicReference<>();
deserializer.deserialize(
Expand Down Expand Up @@ -129,7 +139,8 @@ public void close() {}
}

private ConsumerRecord<byte[], byte[]> toConsumerRecord(ProducerRecord<byte[], byte[]> record) {
return new ConsumerRecord<>(TOPIC, record.partition(), 0, record.key(), record.value());
int partition = record.partition() == null ? -1 : record.partition();
return new ConsumerRecord<>(TOPIC, partition, 0, record.key(), record.value());
}

private static KafkaLogSerializationSchema createTestSerializationSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -370,4 +376,50 @@ public void testLogWriteReadWithVirtual() throws Exception {
deleteTopicIfExists(topic);
}
}

@Test
@Timeout(120)
public void testAppendOnlyWithUnawareBucket() throws Exception {
String topic = UUID.randomUUID().toString();
createTopicIfNotExists(topic, 2);

try {
// disable checkpointing to test eventual
env.getCheckpointConfig().disableCheckpointing();
env.setParallelism(1);
tEnv.executeSql(
String.format(
"CREATE TABLE T (i INT, j INT) WITH ("
+ "'log.system'='kafka', "
+ "'log.consistency'='eventual', "
+ "'bucket'='-1', "
+ "'kafka.bootstrap.servers'='%s', "
+ "'kafka.topic'='%s',"
+ "'kafka.batch.size'='20')",
getBootstrapServers(), topic));
tEnv.executeSql(
"CREATE TEMPORARY TABLE gen (i INT, j INT) WITH ('connector'='datagen', 'rows-per-second'='2')");
TableResult write = tEnv.executeSql("INSERT INTO T SELECT * FROM gen");
BlockingIterator<Row, Row> read =
BlockingIterator.of(tEnv.executeSql("SELECT * FROM T").collect());
List<Row> collect = read.collect(10);
assertThat(collect).hasSize(10);
write.getJobClient().get().cancel();
read.close();

// check offsets
try (final AdminClient adminClient = AdminClient.create(getStandardProps())) {
Map<TopicPartition, OffsetSpec> topicPartitionOffsets = new HashMap<>(4);
for (int i = 0; i < 2; i++) {
topicPartitionOffsets.put(new TopicPartition(topic, i), OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> result =
adminClient.listOffsets(topicPartitionOffsets).all().get();
assertThat(result.values())
.allMatch(partitionOffsetInfo -> partitionOffsetInfo.offset() > 0);
}
} finally {
deleteTopicIfExists(topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
state.stateValueFilter().filter(table.name(), part, bucket))
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
.withExecutionMode(isStreamingMode);
.withExecutionMode(isStreamingMode)
.withBucketMode(table.bucketMode());

if (metricGroup != null) {
tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
Expand Down

0 comments on commit fb09108

Please sign in to comment.