Skip to content

Commit

Permalink
[Feature](CDC) Add auto create table (apache#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Nov 30, 2023
1 parent e2e7165 commit 0eb3aa7
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer;

public enum EventType {
ALTER,
CREATE
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.EventType;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
Expand All @@ -41,13 +43,15 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -67,7 +71,6 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
private static final String OP_DELETE = "d"; // delete

public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int
private static final String addDropDDLRegex
= "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
Expand Down Expand Up @@ -214,44 +217,67 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
return false;
}

// db,table
Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
if(tuple == null){
EventType eventType = extractEventType(recordRoot);
if(eventType == null){
return false;
}

List<String> ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}

List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
if(eventType.equals(EventType.CREATE)){
TableSchema tableSchema = extractCreateTableSchema(recordRoot);
status = schemaChangeManager.createTable(tableSchema);
if(status){
String cdcTbl = getCdcTableIdentifier(recordRoot);
String dorisTbl = getCreateTableIdentifier(recordRoot);
tableMapping.put(cdcTbl, dorisTbl);
LOG.info("create table ddl status: {}", status);
}
} else if (eventType.equals(EventType.ALTER)){
// db,table
Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
if(tuple == null){
return false;
}
List<String> ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} else{
LOG.info("Unsupported event type {}", eventType);
}
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
}
return status;
}

protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException {
JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
if(!Objects.isNull(tableChanges)){
JsonNode tableChange = tableChanges.get(0);
return tableChange;
}
return null;
}

/**
* Parse Alter Event
*/
@VisibleForTesting
public List<String> extractDDLList(JsonNode record) throws JsonProcessingException {
public List<String> extractDDLList(JsonNode record) throws IOException{
String dorisTable = getDorisTableIdentifier(record);
JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
String ddl = extractJsonNode(historyRecord, "ddl");
if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) {
return new ArrayList<>();
}
LOG.debug("received debezium ddl :{}", ddl);
JsonNode tableChange = tableChanges.get(0);
if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) {
JsonNode tableChange = extractTableChange(record);
if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
return null;
}

Expand Down Expand Up @@ -285,6 +311,47 @@ public List<String> extractDDLList(JsonNode record) throws JsonProcessingExcepti
return SchemaChangeHelper.generateDDLSql(dorisTable);
}

@VisibleForTesting
public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
String dorisTable = getCreateTableIdentifier(record);
JsonNode tableChange = extractTableChange(record);
JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
JsonNode columns = tableChange.get("table").get("columns");
String tblComment = tableChange.get("table").get("comment").asText();
Map<String, FieldSchema> field = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(field, column);
}
List<String> pkList = new ArrayList<>();
for(JsonNode column : pkColumns){
String fieldName = column.asText();
pkList.add(fieldName);
}

TableSchema tableSchema = new TableSchema();
tableSchema.setFields(field);
tableSchema.setKeys(pkList);
tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
tableSchema.setTableComment(tblComment);

String[] split = dorisTable.split("\\.");
Preconditions.checkArgument(split.length == 2);
tableSchema.setDatabase(split[0]);
tableSchema.setTable(split[1]);
return tableSchema;
}

private List<String> buildDistributeKeys(List<String> primaryKeys, Map<String, FieldSchema> fields) {
if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
return primaryKeys;
}
if(!fields.isEmpty()){
Map.Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
return Collections.singletonList(firstField.getKey());
}
return new ArrayList<>();
}

@VisibleForTesting
public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) {
this.originFieldSchemaMap = originFieldSchemaMap;
Expand Down Expand Up @@ -334,6 +401,12 @@ public String getCdcTableIdentifier(JsonNode record){
return SourceSchema.getString(db, schema, table);
}

public String getCreateTableIdentifier(JsonNode record){
String db = extractJsonNode(record.get("source"), "db");
String table = extractJsonNode(record.get("source"), "table");
return db + "." + table;
}

public String getDorisTableIdentifier(String cdcTableIdentifier){
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
return dorisOptions.getTableIdentifier();
Expand Down Expand Up @@ -405,6 +478,23 @@ protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}

/**
* Parse event type
*/
protected EventType extractEventType(JsonNode record) throws JsonProcessingException {
JsonNode tableChange = extractTableChange(record);
if(tableChange == null || tableChange.get("type") == null){
return null;
}
String type = tableChange.get("type").asText();
if(EventType.ALTER.toString().equalsIgnoreCase(type)){
return EventType.ALTER;
}else if(EventType.CREATE.toString().equalsIgnoreCase(type)){
return EventType.CREATE;
}
return null;
}

private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null &&
!(record.get(key) instanceof NullNode) ? record.get(key).asText() : null;
Expand All @@ -425,7 +515,7 @@ private Map<String, Object> extractRow(JsonNode recordRow) {
}

private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException {
if (record.has("historyRecord")) {
if (record != null && record.has("historyRecord")) {
return objectMapper.readTree(record.get("historyRecord").asText());
}
// The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction
Expand All @@ -452,8 +542,6 @@ public String extractDDL(JsonNode record) throws JsonProcessingException {
return null;
}



@VisibleForTesting
public void fillOriginSchema(JsonNode columns) {
if (Objects.nonNull(originFieldSchemaMap)) {
Expand Down Expand Up @@ -623,5 +711,4 @@ private String handleType(String type) {
return type;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class DorisConfigOptions {
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");
public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
.key("doris.request.retriesdoris.deserialize.queue.size")
.key("doris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data
.setTableConfig(tableMap)
.setCreateTableOnly(createTableOnly)
.setNewSchemaChange(useNewSchemaChange)
.setSingleSink(singleSink)
.create();
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public abstract class DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
Expand Down Expand Up @@ -83,6 +84,11 @@ public abstract class DatabaseSync {

public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env);

/**
* Get the prefix of a specific tableList, for example, mysql is database, oracle is schema
*/
public abstract String getTableListPrefix();

public DatabaseSync() throws SQLException {
registerDriver();
}
Expand Down Expand Up @@ -132,8 +138,7 @@ public void build() throws Exception {
System.out.println("Create table finished.");
System.exit(0);
}

config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", syncTables) + ")");
config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
DataStreamSource<String> streamSource = buildCdcSource(env);
if(singleSink){
streamSource.sinkTo(buildDorisSink());
Expand Down Expand Up @@ -256,6 +261,24 @@ protected boolean isSyncNeeded(String tableName) {
LOG.debug("table {} is synchronized? {}", tableName, sync);
return sync;
}

protected String getSyncTableList(List<String> syncTables){
if(!singleSink){
return syncTables.stream()
.map(v-> getTableListPrefix() + "\\." + v)
.collect(Collectors.joining("|"));
}else{
// includingTablePattern and ^excludingPattern
String includingPattern = String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables);
if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
return includingPattern;
}else{
String excludingPattern = String.format("?!(%s\\.(%s))$", getTableListPrefix(), excludingTables);
return String.format("(%s)(%s)", includingPattern, excludingPattern);
}
}
}

/**
* Filter table that many tables merge to one
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
.username(config.get(MySqlSourceOptions.USERNAME))
.password(config.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
.tableList(databaseName + "." + tableName);
.tableList(tableName)
//default open add newly table
.scanNewlyAddedTableEnabled(true);

config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
config
Expand Down Expand Up @@ -215,6 +217,12 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
}

@Override
public String getTableListPrefix() {
String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
return databaseName;
}

/**
* set chunkkeyColumn,eg: db.table1:column1,db.table2:column2
* @param sourceBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
.port(port)
.databaseList(databaseName)
.schemaList(schemaName)
.tableList(schemaName + "." + tableName)
.tableList(tableName)
.username(username)
.password(password)
.includeSchemaChanges(true)
Expand All @@ -199,12 +199,18 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
.password(password)
.database(databaseName)
.schemaList(schemaName)
.tableList(schemaName + "." + tableName)
.tableList(tableName)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.deserializer(schema)
.build();
return env.addSource(oracleSource, "Oracle Source");
}
}

@Override
public String getTableListPrefix() {
String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
return schemaName;
}
}
Loading

0 comments on commit 0eb3aa7

Please sign in to comment.