Skip to content

Commit

Permalink
[improve]The sql_parser model of schema change support automatic tabl…
Browse files Browse the repository at this point in the history
…e creation (apache#435)
  • Loading branch information
DongLiang-0 authored Jul 23, 2024
1 parent a57f24b commit db76d2a
Show file tree
Hide file tree
Showing 12 changed files with 641 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,22 @@ public String getDefaultValue() {
public void setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
}

@Override
public String toString() {
return "FieldSchema{"
+ "name='"
+ name
+ '\''
+ ", typeString='"
+ typeString
+ '\''
+ ", defaultValue='"
+ defaultValue
+ '\''
+ ", comment='"
+ comment
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,31 @@ public void setTableBuckets(Integer tableBuckets) {
public Integer getTableBuckets() {
return tableBuckets;
}

@Override
public String toString() {
return "TableSchema{"
+ "database='"
+ database
+ '\''
+ ", table='"
+ table
+ '\''
+ ", tableComment='"
+ tableComment
+ '\''
+ ", fields="
+ fields
+ ", keys="
+ String.join(",", keys)
+ ", model="
+ model.name()
+ ", distributeKeys="
+ String.join(",", distributeKeys)
+ ", properties="
+ properties
+ ", tableBuckets="
+ tableBuckets
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.schema;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
Expand All @@ -27,22 +28,32 @@
import net.sf.jsqlparser.statement.alter.AlterExpression.ColumnDataType;
import net.sf.jsqlparser.statement.alter.AlterOperation;
import net.sf.jsqlparser.statement.create.table.ColDataType;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import net.sf.jsqlparser.statement.create.table.Index;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/** Use {@link net.sf.jsqlparser.parser.CCJSqlParserUtil} to parse SQL statements. */
/** Use {@link CCJSqlParserUtil} to parse SQL statements. */
public class SQLParserSchemaManager implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(SQLParserSchemaManager.class);
private static final String DEFAULT = "DEFAULT";
private static final String COMMENT = "COMMENT";
private static final String PRIMARY = "PRIMARY";
private static final String PRIMARY_KEY = "PRIMARY KEY";
private static final String UNIQUE = "UNIQUE";

/**
* Doris' schema change only supports ADD, DROP, and RENAME operations. This method is only used
Expand Down Expand Up @@ -96,6 +107,128 @@ public List<String> parserAlterDDLs(
return ddlList;
}

public TableSchema parseCreateTableStatement(
SourceConnector sourceConnector,
String ddl,
String dorisTable,
Map<String, String> tableProperties) {
try {
Statement statement = CCJSqlParserUtil.parse(ddl);
if (statement instanceof CreateTable) {
CreateTable createTable = (CreateTable) statement;
Map<String, FieldSchema> columnFields = new LinkedHashMap<>();
List<String> pkKeys = new ArrayList<>();
createTable
.getColumnDefinitions()
.forEach(
column -> {
String columnName = column.getColumnName();
ColDataType colDataType = column.getColDataType();
String dataType = parseDataType(colDataType, sourceConnector);
List<String> columnSpecs = column.getColumnSpecs();
String defaultValue = extractDefaultValue(columnSpecs);
String comment = extractComment(columnSpecs);
FieldSchema fieldSchema =
new FieldSchema(
columnName, dataType, defaultValue, comment);
columnFields.put(columnName, fieldSchema);
extractColumnPrimaryKey(columnName, columnSpecs, pkKeys);
});

List<Index> indexes = createTable.getIndexes();
extractIndexesPrimaryKey(indexes, pkKeys);

String[] dbTable = dorisTable.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);
TableSchema tableSchema = new TableSchema();
tableSchema.setDatabase(dbTable[0]);
tableSchema.setTable(dbTable[1]);
tableSchema.setModel(pkKeys.isEmpty() ? DataModel.DUPLICATE : DataModel.UNIQUE);
tableSchema.setFields(columnFields);
tableSchema.setKeys(pkKeys);
tableSchema.setTableComment(
extractTableComment(createTable.getTableOptionsStrings()));
tableSchema.setDistributeKeys(
JsonDebeziumChangeUtils.buildDistributeKeys(pkKeys, columnFields));
tableSchema.setProperties(tableProperties);
if (tableProperties.containsKey("table-buckets")) {
String tableBucketsConfig = tableProperties.get("table-buckets");
Map<String, Integer> tableBuckets =
DatabaseSync.getTableBuckets(tableBucketsConfig);
Integer buckets =
JsonDebeziumChangeUtils.getTableSchemaBuckets(
tableBuckets, tableSchema.getTable());
tableSchema.setTableBuckets(buckets);
}
return tableSchema;
} else {
LOG.warn(
"Unsupported statement type. ddl={}, sourceConnector={}, dorisTable={}",
ddl,
sourceConnector.getConnectorName(),
dorisTable);
}
} catch (JSQLParserException e) {
LOG.warn(
"Failed to parse create table statement. ddl={}, sourceConnector={}, dorisTable={}",
ddl,
sourceConnector.getConnectorName(),
dorisTable);
}
return null;
}

private void extractIndexesPrimaryKey(List<Index> indexes, List<String> pkKeys) {
if (CollectionUtils.isEmpty(indexes)) {
return;
}
indexes.stream()
.filter(
index ->
PRIMARY_KEY.equalsIgnoreCase(index.getType())
|| UNIQUE.equalsIgnoreCase(index.getType()))
.flatMap(index -> index.getColumnsNames().stream())
.distinct()
.filter(
primaryKey ->
pkKeys.stream()
.noneMatch(pkKey -> pkKey.equalsIgnoreCase(primaryKey)))
.forEach(pkKeys::add);
}

private void extractColumnPrimaryKey(
String columnName, List<String> columnSpecs, List<String> pkKeys) {
if (CollectionUtils.isEmpty(columnSpecs)) {
return;
}
for (String columnSpec : columnSpecs) {
if (PRIMARY.equalsIgnoreCase(columnSpec)) {
pkKeys.add(columnName);
}
}
}

private String extractTableComment(List<String> tableOptionsStrings) {
if (CollectionUtils.isEmpty(tableOptionsStrings)) {
return null;
}
return extractAdjacentString(tableOptionsStrings, COMMENT);
}

private String parseDataType(ColDataType colDataType, SourceConnector sourceConnector) {
String dataType = colDataType.getDataType();
int length = 0;
int scale = 0;
if (CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) {
List<String> argumentsStringList = colDataType.getArgumentsStringList();
length = Integer.parseInt(argumentsStringList.get(0));
if (argumentsStringList.size() == 2) {
scale = Integer.parseInt(argumentsStringList.get(1));
}
}
return JsonDebeziumChangeUtils.buildDorisTypeName(sourceConnector, dataType, length, scale);
}

private String processDropColumnOperation(AlterExpression alterExpression, String dorisTable) {
String dropColumnDDL =
SchemaChangeHelper.buildDropColumnDDL(dorisTable, alterExpression.getColumnName());
Expand All @@ -110,19 +243,7 @@ private List<String> processAddColumnOperation(
for (ColumnDataType columnDataType : colDataTypeList) {
String columnName = columnDataType.getColumnName();
ColDataType colDataType = columnDataType.getColDataType();
String datatype = colDataType.getDataType();
Integer length = null;
Integer scale = null;
if (CollectionUtils.isNotEmpty(colDataType.getArgumentsStringList())) {
List<String> argumentsStringList = colDataType.getArgumentsStringList();
length = Integer.parseInt(argumentsStringList.get(0));
if (argumentsStringList.size() == 2) {
scale = Integer.parseInt(argumentsStringList.get(1));
}
}
datatype =
JsonDebeziumChangeUtils.buildDorisTypeName(
sourceConnector, datatype, length, scale);
String datatype = parseDataType(colDataType, sourceConnector);

List<String> columnSpecs = columnDataType.getColumnSpecs();
String defaultValue = extractDefaultValue(columnSpecs);
Expand Down Expand Up @@ -161,6 +282,9 @@ private String processRenameColumnOperation(

@VisibleForTesting
public String extractDefaultValue(List<String> columnSpecs) {
if (CollectionUtils.isEmpty(columnSpecs)) {
return null;
}
return extractAdjacentString(columnSpecs, DEFAULT);
}

Expand All @@ -185,6 +309,9 @@ private String extractAdjacentString(List<String> columnSpecs, String key) {

@VisibleForTesting
public String extractComment(List<String> columnSpecs) {
if (CollectionUtils.isEmpty(columnSpecs)) {
return null;
}
return extractAdjacentString(columnSpecs, COMMENT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
Expand All @@ -30,7 +31,12 @@
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;

import static org.apache.doris.flink.tools.cdc.SourceConnector.MYSQL;
import static org.apache.doris.flink.tools.cdc.SourceConnector.ORACLE;
Expand Down Expand Up @@ -95,4 +101,35 @@ public static String buildDorisTypeName(
}
return dorisTypeName;
}

public static List<String> buildDistributeKeys(
List<String> primaryKeys, Map<String, FieldSchema> fields) {
if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
return primaryKeys;
}
if (!fields.isEmpty()) {
Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
return Collections.singletonList(firstField.getKey());
}
return new ArrayList<>();
}

public static Integer getTableSchemaBuckets(
Map<String, Integer> tableBucketsMap, String tableName) {
if (tableBucketsMap != null) {
// Firstly, if the table name is in the table-buckets map, set the buckets of the table.
if (tableBucketsMap.containsKey(tableName)) {
return tableBucketsMap.get(tableName);
}
// Secondly, iterate over the map to find a corresponding regular expression match,
for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {

Pattern pattern = Pattern.compile(entry.getKey());
if (pattern.matcher(tableName).matches()) {
return entry.getValue();
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
protected SchemaChangeManager schemaChangeManager;
protected JsonDebeziumChangeContext changeContext;
protected SourceConnector sourceConnector;
protected String targetDatabase;
protected String targetTablePrefix;
protected String targetTableSuffix;

public abstract boolean schemaChange(JsonNode recordRoot);

Expand Down Expand Up @@ -189,6 +192,11 @@ protected void extractSourceConnector(JsonNode record) {
}
}

protected String getCreateTableIdentifier(JsonNode record) {
String table = extractJsonNode(record.get("source"), "table");
return targetDatabase + "." + targetTablePrefix + table + targetTableSuffix;
}

public Map<String, String> getTableMapping() {
return tableMapping;
}
Expand Down
Loading

0 comments on commit db76d2a

Please sign in to comment.