Skip to content

Commit

Permalink
[flink] Cdc ingestion support warkmark strategy (apache#2640)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Jan 10, 2024
1 parent f3b3735 commit 7992ee9
Show file tree
Hide file tree
Showing 13 changed files with 518 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,31 @@ public static <T extends JsonNode> T asSpecificNodeType(String json, Class<T> cl
return clazz.cast(resultNode);
}

/** Parses a JSON string and extracts a value of the specified type from the given path keys. */
public static <T> T extractValue(String json, Class<T> valueType, String... path)
throws JsonProcessingException {
JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
for (String key : path) {
currentNode = currentNode.get(key);
if (currentNode == null) {
throw new IllegalArgumentException("Invalid path or key not found: " + key);
}
}
return OBJECT_MAPPER_INSTANCE.treeToValue(currentNode, valueType);
}

/** Checks if a specified node exists in a JSON string. */
public static boolean isNodeExists(String json, String... path) throws JsonProcessingException {
JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
for (String key : path) {
currentNode = currentNode.get(key);
if (currentNode == null) {
return false;
}
}
return true;
}

public static boolean isNull(JsonNode jsonNode) {
return jsonNode == null || jsonNode.isNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.options.Options;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
Expand All @@ -32,11 +35,20 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
import static org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.createExtractor;

/** Base {@link Action} for table/database synchronizing job. */
public abstract class SynchronizationActionBase extends ActionBase {

Expand Down Expand Up @@ -123,9 +135,32 @@ protected Object buildSource() {

private DataStreamSource<String> buildDataStreamSource(Object source) {
if (source instanceof Source) {
boolean isAutomaticWatermarkCreationEnabled =
tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key())
&& Objects.equals(
tableConfig.get(CoreOptions.TAG_AUTOMATIC_CREATION.key()),
WATERMARK.toString());

Options options = Options.fromMap(tableConfig);
Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
WatermarkStrategy<String> watermarkStrategy =
isAutomaticWatermarkCreationEnabled
? watermarkAlignGroup != null
? new CdcWatermarkStrategy(createExtractor(source))
.withWatermarkAlignment(
watermarkAlignGroup,
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
options.get(
SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL))
: new CdcWatermarkStrategy(createExtractor(source))
: WatermarkStrategy.noWatermarks();
if (idleTimeout != null) {
watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
}
return env.fromSource(
(Source<String, ?, ?>) source,
WatermarkStrategy.noWatermarks(),
watermarkStrategy,
syncJobHandler.provideSourceName());
}
if (source instanceof SourceFunction) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.watermark;

import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.pulsar.source.PulsarSource;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

/** Factory for creating CDC timestamp extractors based on different source types. */
public class CdcTimestampExtractorFactory implements Serializable {

private static final long serialVersionUID = 1L;

private static final Map<Class<?>, Supplier<CdcTimestampExtractor>> extractorMap =
new HashMap<>();

static {
extractorMap.put(MongoDBSource.class, MongoDBCdcTimestampExtractor::new);
extractorMap.put(MySqlSource.class, MysqlCdcTimestampExtractor::new);
extractorMap.put(PulsarSource.class, MessageQueueCdcTimestampExtractor::new);
extractorMap.put(KafkaSource.class, MessageQueueCdcTimestampExtractor::new);
}

public static CdcTimestampExtractor createExtractor(Object source) {
Supplier<CdcTimestampExtractor> extractorSupplier = extractorMap.get(source.getClass());
if (extractorSupplier != null) {
return extractorSupplier.get();
}
throw new IllegalArgumentException(
"Unsupported source type: " + source.getClass().getName());
}

/** Timestamp extractor for MongoDB sources in CDC applications. */
public static class MongoDBCdcTimestampExtractor implements CdcTimestampExtractor {

private static final long serialVersionUID = 1L;

@Override
public long extractTimestamp(String record) throws JsonProcessingException {
return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
}
}

/** Timestamp extractor for Kafka/Pulsar sources in CDC applications. */
public static class MessageQueueCdcTimestampExtractor implements CdcTimestampExtractor {

private static final long serialVersionUID = 1L;

@Override
public long extractTimestamp(String record) throws JsonProcessingException {
if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
// Canal json
return JsonSerdeUtil.extractValue(record, Long.class, "ts");
} else if (JsonSerdeUtil.isNodeExists(record, "pos")) {
// Ogg json
String dateTimeString = JsonSerdeUtil.extractValue(record, String.class, "op_ts");
DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
} else if (JsonSerdeUtil.isNodeExists(record, "xid")) {
// Maxwell json
return JsonSerdeUtil.extractValue(record, Long.class, "ts") * 1000;
} else if (JsonSerdeUtil.isNodeExists(record, "payload", "source", "connector")) {
// Dbz json
return JsonSerdeUtil.extractValue(record, Long.class, "payload", "ts_ms");
} else if (JsonSerdeUtil.isNodeExists(record, "source", "connector")) {
// Dbz json
return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
}
throw new RuntimeException(
String.format(
"Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s",
record));
}
}

/** Timestamp extractor for MySQL sources in CDC applications. */
public static class MysqlCdcTimestampExtractor implements CdcTimestampExtractor {

@Override
public long extractTimestamp(String record) throws JsonProcessingException {
return JsonSerdeUtil.extractValue(record, Long.class, "source", "ts_ms");
}
}

/** Interface defining the contract for CDC timestamp extraction. */
public interface CdcTimestampExtractor extends Serializable {

long extractTimestamp(String record) throws JsonProcessingException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.watermark;

import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;

/**
* Watermark strategy for CDC sources, generating watermarks based on timestamps extracted from
* records.
*/
public class CdcWatermarkStrategy implements WatermarkStrategy<String> {

private final CdcTimestampExtractor timestampExtractor;
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;

public CdcWatermarkStrategy(CdcTimestampExtractor extractor) {
this.timestampExtractor = extractor;
}

@Override
public WatermarkGenerator<String> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<String>() {

@Override
public void onEvent(String record, long timestamp, WatermarkOutput output) {
long tMs;
try {
tMs = timestampExtractor.extractTimestamp(record);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
currentMaxTimestamp = Math.max(currentMaxTimestamp, tMs);
output.emitWatermark(new Watermark(currentMaxTimestamp - 1));
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
long timeMillis = System.currentTimeMillis();
currentMaxTimestamp = Math.max(timeMillis, currentMaxTimestamp);
output.emitWatermark(new Watermark(currentMaxTimestamp - 1));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -1241,4 +1243,38 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval
rowType,
Collections.singletonList("_id"));
}

@Test
@Timeout(60)
public void testWaterMarkSyncTable() throws Exception {
String topic = "watermark";
createTestTopic(topic, 1, 1);
writeRecordsToKafka(topic, readLines("kafka/canal/table/watermark/canal-data-1.txt"));

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);

Map<String, String> config = getBasicTableConfig();
config.put("tag.automatic-creation", "watermark");
config.put("tag.creation-period", "hourly");
config.put("scan.watermark.alignment.group", "alignment-group-1");
config.put("scan.watermark.alignment.max-drift", "20 s");
config.put("scan.watermark.alignment.update-interval", "1 s");

KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
runActionWithDefaultEnv(action);

AbstractFileStoreTable table =
(AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
while (true) {
if (table.snapshotManager().snapshotCount() > 0
&& table.snapshotManager().latestSnapshot().watermark()
!= -9223372036854775808L) {
return;
}
Thread.sleep(1000);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -218,4 +220,38 @@ public void testRecordWithNestedDataType() throws Exception {
Collections.singletonList("+I[101, hammer, {\"row_key\":\"value\"}]");
waitForResult(expected, table, rowType, primaryKeys);
}

@Test
@Timeout(60)
public void testWaterMarkSyncTable() throws Exception {
String topic = "watermark";
createTestTopic(topic, 1, 1);
writeRecordsToKafka(topic, readLines("kafka/debezium/table/watermark/debezium-data-1.txt"));

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
kafkaConfig.put(TOPIC.key(), topic);

Map<String, String> config = getBasicTableConfig();
config.put("tag.automatic-creation", "watermark");
config.put("tag.creation-period", "hourly");
config.put("scan.watermark.alignment.group", "alignment-group-1");
config.put("scan.watermark.alignment.max-drift", "20 s");
config.put("scan.watermark.alignment.update-interval", "1 s");

KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
runActionWithDefaultEnv(action);

AbstractFileStoreTable table =
(AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
while (true) {
if (table.snapshotManager().snapshotCount() > 0
&& table.snapshotManager().latestSnapshot().watermark()
!= -9223372036854775808L) {
return;
}
Thread.sleep(1000);
}
}
}
Loading

0 comments on commit 7992ee9

Please sign in to comment.