Skip to content

Commit

Permalink
[CDC] Refactor cdc to reduce redundant code and set default checkpoin…
Browse files Browse the repository at this point in the history
…t interval (#2461)
  • Loading branch information
yuzelin authored Dec 11, 2023
1 parent 973e45f commit f68743c
Show file tree
Hide file tree
Showing 34 changed files with 674 additions and 755 deletions.
9 changes: 8 additions & 1 deletion docs/content/cdc-ingestion/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ store up to 19 digits integer value. So you should ensure the overflow won't occ
9. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary value is passed as bytes in binlog, so it
should be mapped to byte type (BYTES or VARBINARY). We choose VARBINARY because it can retain the length information.

## Setting Custom Job Name
## Custom Job Settings

### Checkpointing

Use `-Dexecution.checkpointing.interval=<interval>` to enable checkpointing and set interval. For 0.7 and later versions,
if you haven't enabled checkpointing, Paimon will enable checkpointing by default and set checkpoint interval to 180 seconds.

### Job Name

Use `-Dpipeline.name=<job-name>` to set custom synchronization job name.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -283,4 +285,15 @@ public static String combinedModeTableList(
excludingPattern = "?!" + excludingPattern;
return String.format("(%s)(%s)", excludingPattern, includingPattern);
}

public static void checkRequiredOptions(
Configuration config, String confName, ConfigOption<?>... configOptions) {
for (ConfigOption<?> configOption : configOptions) {
checkArgument(
config.contains(configOption),
"%s [%s] must be specified.",
confName,
configOption.key());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ public class MessageQueueSchemaUtils {
* Retrieves the Kafka schema for a given topic.
*
* @param consumer The wrapper of message queue consumer to fetch messages.
* @param topic The topic to retrieve the schema for.
* @param dataFormat The data format for the messages in the message queue.
* @param typeMapping Data type mapping options.
* @return The schema for the topic.
* @throws SchemaRetrievalException If unable to retrieve the schema after max retries.
*/
public static Schema getSchema(
ConsumerWrapper consumer, String topic, DataFormat dataFormat, TypeMapping typeMapping)
ConsumerWrapper consumer, DataFormat dataFormat, TypeMapping typeMapping)
throws SchemaRetrievalException {
int retry = 0;
int retryInterval = 1000;
Expand All @@ -55,7 +54,7 @@ public static Schema getSchema(

while (true) {
Optional<Schema> schema =
consumer.getRecords(topic, POLL_TIMEOUT_MILLIS).stream()
consumer.getRecords(POLL_TIMEOUT_MILLIS).stream()
.map(recordParser::buildSchema)
.filter(Objects::nonNull)
.findFirst();
Expand All @@ -66,7 +65,8 @@ public static Schema getSchema(

if (retry >= MAX_RETRY) {
throw new SchemaRetrievalException(
String.format("Could not get metadata from server, topic: %s", topic));
String.format(
"Could not get metadata from server, topic: %s", consumer.topic()));
}

sleepSafely(retryInterval);
Expand All @@ -86,6 +86,8 @@ private static void sleepSafely(int duration) {
/** Wrap the consumer for different message queues. */
public interface ConsumerWrapper extends AutoCloseable {

List<String> getRecords(String topic, int pollTimeOutMills);
List<String> getRecords(int pollTimeOutMills);

String topic();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;

import org.apache.flink.api.common.functions.FlatMapFunction;

import java.util.Map;

/**
Expand Down Expand Up @@ -60,15 +56,16 @@ public MessageQueueSyncTableActionBase(
String database,
String table,
Map<String, String> catalogConfig,
Map<String, String> mqConfig) {
super(warehouse, database, table, catalogConfig, mqConfig);
Map<String, String> mqConfig,
SyncJobHandler.SourceType sourceType) {
super(warehouse, database, table, catalogConfig, mqConfig, sourceType);
}

@Override
protected Schema retrieveSchema() throws Exception {
String topic = topic();
try (MessageQueueSchemaUtils.ConsumerWrapper consumer = consumer(topic)) {
return MessageQueueSchemaUtils.getSchema(consumer, topic, getDataFormat(), typeMapping);
try (MessageQueueSchemaUtils.ConsumerWrapper consumer = syncJobHandler.provideConsumer()) {
return MessageQueueSchemaUtils.getSchema(
consumer, syncJobHandler.provideDataFormat(), typeMapping);
}
}

Expand All @@ -83,17 +80,4 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) {
metadataConverters,
false);
}

@Override
protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
boolean caseSensitive = catalog.caseSensitive();
DataFormat format = getDataFormat();
return format.createParser(caseSensitive, typeMapping, computedColumns);
}

protected abstract String topic();

protected abstract MessageQueueSchemaUtils.ConsumerWrapper consumer(String topic);

protected abstract DataFormat getDataFormat();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
Expand All @@ -31,51 +29,41 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;

/** Base {@link Action} for synchronizing into one Paimon database. */
public abstract class SyncDatabaseActionBase extends ActionBase {
public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {

protected final String database;
protected final Configuration cdcSourceConfig;
protected Map<String, String> tableConfig = new HashMap<>();
protected boolean mergeShards = true;
protected MultiTablesSinkMode mode = COMBINED;
protected String tablePrefix = "";
protected String tableSuffix = "";
protected String includingTables = ".*";
@Nullable protected String excludingTables;
protected TypeMapping typeMapping = TypeMapping.defaultMapping();

protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[] {};
protected List<FileStoreTable> tables = new ArrayList<>();

public SyncDatabaseActionBase(
String warehouse,
String database,
Map<String, String> catalogConfig,
Map<String, String> cdcSourceConfig) {
super(warehouse, catalogConfig);
this.database = database;
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
}

public SyncDatabaseActionBase withTableConfig(Map<String, String> tableConfig) {
this.tableConfig = tableConfig;
return this;
Map<String, String> cdcSourceConfig,
SyncJobHandler.SourceType sourceType) {
super(
warehouse,
database,
catalogConfig,
cdcSourceConfig,
new SyncJobHandler(sourceType, cdcSourceConfig, database));
}

public SyncDatabaseActionBase mergeShards(boolean mergeShards) {
Expand Down Expand Up @@ -114,59 +102,37 @@ public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables)
return this;
}

public SyncDatabaseActionBase withTypeMapping(TypeMapping typeMapping) {
this.typeMapping = typeMapping;
return this;
}

public SyncDatabaseActionBase withMetadataColumns(List<String> metadataColumns) {
this.metadataConverters =
metadataColumns.stream()
.map(this::metadataConverter)
.filter(Optional::isPresent)
.map(Optional::get)
.toArray(CdcMetadataConverter[]::new);
return this;
@Override
protected void validateCaseSensitivity() {
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database);
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
}

protected Optional<CdcMetadataConverter<?>> metadataConverter(String column) {
return Optional.empty();
@Override
protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(
caseSensitive, Collections.emptyList(), typeMapping, metadataConverters);
}

protected void checkCdcSourceArgument() {}

protected abstract DataStreamSource<String> buildSource() throws Exception;

protected abstract String sourceName();

protected abstract FlatMapFunction<String, RichCdcMultiplexRecord> recordParse();

@Override
public void build() throws Exception {
checkCdcSourceArgument();
boolean caseSensitive = catalog.caseSensitive();

validateCaseInsensitive(caseSensitive);

catalog.createDatabase(database, true);

protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory() {
NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(tableConfig, caseSensitive);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix);

DataStream<RichCdcMultiplexRecord> input =
buildSource().flatMap(recordParse()).name("Parse");
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
() ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
includingPattern,
excludingPattern,
tableNameConverter);
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder, includingPattern, excludingPattern, tableNameConverter);
}

@Override
protected void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(input)
.withParserFactory(parserFactory)
Expand All @@ -177,27 +143,4 @@ public void build() throws Exception {
.withTableOptions(tableConfig)
.build();
}

protected void validateCaseInsensitive(boolean caseSensitive) {
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database);
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
}

@VisibleForTesting
public Map<String, String> tableConfig() {
return tableConfig;
}

// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------

protected abstract String jobName();

@Override
public void run() throws Exception {
build();
execute(jobName());
}
}
Loading

0 comments on commit f68743c

Please sign in to comment.