Skip to content

Commit

Permalink
[cdc] Add AWS DMS CDC format support (apache#4433)
Browse files Browse the repository at this point in the history
  • Loading branch information
Moonlight-CL authored Nov 4, 2024
1 parent f561137 commit d7dd184
Show file tree
Hide file tree
Showing 23 changed files with 742 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.dms;

import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;

/**
* Supports the message queue's AWS DMS json data format and provides definitions for the message
* queue's record json deserialization class and parsing class {@link DMSRecordParser}.
*/
public class DMSDataFormat extends AbstractJsonDataFormat {

@Override
protected RecordParserFactory parser() {
return DMSRecordParser::new;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.dms;

import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;

/** Factory to create {@link DMSDataFormat}. */
public class DMSDataFormatFactory implements DataFormatFactory {

public static final String IDENTIFIER = "aws-dms-json";

@Override
public DataFormat create() {
return new DMSDataFormat();
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.dms;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Pair;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* The {@code DMSRecordParser} class extends the abstract {@link AbstractJsonRecordParser} and is
* designed to parse records from AWS DMS's JSON change data capture (CDC) format. AWS DMS is a CDC
* solution for RDMS that captures row-level changes to database tables and outputs them in JSON
* format. This parser extracts relevant information from the DMS-JSON format and converts it into a
* list of {@link RichCdcMultiplexRecord} objects.
*
* <p>The class supports various database operations such as INSERT, UPDATE, and DELETE, and creates
* corresponding {@link RichCdcMultiplexRecord} objects to represent these changes.
*
* <p>Validation is performed to ensure that the JSON records contain all necessary fields, and the
* class also supports schema extraction for the Kafka topic.
*/
public class DMSRecordParser extends AbstractJsonRecordParser {

private static final String FIELD_DATA = "data";
private static final String FIELD_METADATA = "metadata";
private static final String FIELD_TYPE = "record-type";
private static final String FIELD_OP = "operation";
private static final String FIELD_DATABASE = "schema-name";
private static final String FIELD_TABLE = "table-name";

private static final String OP_LOAD = "load";
private static final String OP_INSERT = "insert";
private static final String OP_UPDATE = "update";
private static final String OP_DELETE = "delete";

private static final String BEFORE_PREFIX = "BI_";

public DMSRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(typeMapping, computedColumns);
}

@Override
protected @Nullable String getTableName() {
JsonNode metaNode = getAndCheck(FIELD_METADATA);
return metaNode.get(FIELD_TABLE).asText();
}

@Override
protected List<RichCdcMultiplexRecord> extractRecords() {
if (isDDL()) {
return Collections.emptyList();
}

JsonNode dataNode = getAndCheck(dataField());
String operation = getAndCheck(FIELD_METADATA).get(FIELD_OP).asText();
List<RichCdcMultiplexRecord> records = new ArrayList<>();

switch (operation) {
case OP_LOAD:
case OP_INSERT:
processRecord(dataNode, RowKind.INSERT, records);
break;
case OP_UPDATE:
Pair<JsonNode, JsonNode> dataAndBeforeNodes = splitBeforeAndData(dataNode);
processRecord(dataAndBeforeNodes.getRight(), RowKind.DELETE, records);
processRecord(dataAndBeforeNodes.getLeft(), RowKind.INSERT, records);
break;
case OP_DELETE:
processRecord(dataNode, RowKind.DELETE, records);
break;
default:
throw new UnsupportedOperationException("Unknown record operation: " + operation);
}

return records;
}

@Override
protected @Nullable String getDatabaseName() {
JsonNode metaNode = getAndCheck(FIELD_METADATA);
return metaNode.get(FIELD_DATABASE).asText();
}

@Override
protected String primaryField() {
return null;
}

@Override
protected String dataField() {
return FIELD_DATA;
}

@Override
protected String format() {
return "aws-dms-json";
}

@Override
protected boolean isDDL() {
String recordType = getAndCheck(FIELD_METADATA).get(FIELD_TYPE).asText();
return !"data".equals(recordType);
}

private Pair<JsonNode, JsonNode> splitBeforeAndData(JsonNode dataNode) {
JsonNode newDataNode = dataNode.deepCopy();
JsonNode beforeDataNode = dataNode.deepCopy();

Iterator<Map.Entry<String, JsonNode>> newDataFields = newDataNode.fields();
while (newDataFields.hasNext()) {
Map.Entry<String, JsonNode> next = newDataFields.next();
if (next.getKey().startsWith(BEFORE_PREFIX)) {
newDataFields.remove();
}
}

Iterator<Map.Entry<String, JsonNode>> beforeDataFields = beforeDataNode.fields();
while (beforeDataFields.hasNext()) {
Map.Entry<String, JsonNode> next = beforeDataFields.next();
if (next.getKey().startsWith(BEFORE_PREFIX)) {
String key = next.getKey().replaceFirst(BEFORE_PREFIX, "");
((ObjectNode) beforeDataNode).set(key, next.getValue());
beforeDataFields.remove();
}
}

return Pair.of(newDataNode, beforeDataNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonDataFormatFactory
org.apache.paimon.flink.action.cdc.format.json.JsonDataFormatFactory
org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellDataFormatFactory
org.apache.paimon.flink.action.cdc.format.ogg.OggDataFormatFactory
org.apache.paimon.flink.action.cdc.format.dms.DMSDataFormatFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.kafka;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.Map;

/** IT cases for {@link KafkaSyncDatabaseAction}. */
public class KafkaAWSDMSSyncDatabaseActionITCase extends KafkaSyncDatabaseActionITCase {

private static final String AWSDMS = "aws-dms";

@Override
protected KafkaSyncDatabaseActionBuilder syncDatabaseActionBuilder(
Map<String, String> kafkaConfig) {
KafkaSyncDatabaseActionBuilder builder = new KafkaSyncDatabaseActionBuilder(kafkaConfig);
builder.withPrimaryKeys("id");
return builder;
}

@Test
@Timeout(60)
public void testSchemaEvolutionMultiTopic() throws Exception {
testSchemaEvolutionMultiTopic(AWSDMS);
}

@Test
@Timeout(60)
public void testSchemaEvolutionOneTopic() throws Exception {
testSchemaEvolutionOneTopic(AWSDMS);
}

@Test
public void testTopicIsEmpty() {
testTopicIsEmpty(AWSDMS);
}

@Test
@Timeout(60)
public void testTableAffixMultiTopic() throws Exception {
testTableAffixMultiTopic(AWSDMS);
}

@Test
@Timeout(60)
public void testTableAffixOneTopic() throws Exception {
testTableAffixOneTopic(AWSDMS);
}

@Test
@Timeout(60)
public void testIncludingTables() throws Exception {
testIncludingTables(AWSDMS);
}

@Test
@Timeout(60)
public void testExcludingTables() throws Exception {
testExcludingTables(AWSDMS);
}

@Test
@Timeout(60)
public void testIncludingAndExcludingTables() throws Exception {
testIncludingAndExcludingTables(AWSDMS);
}
}
Loading

0 comments on commit d7dd184

Please sign in to comment.