Skip to content

Commit

Permalink
[improve]Improve the entry point for creating tableSchema (apache#439)
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored Jul 25, 2024
1 parent 67ca940 commit 102a13c
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.flink.util.StringUtils;

import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
Expand Down Expand Up @@ -353,15 +353,14 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}

List<String> primaryKeys = getCreateDorisKeys(table.getSchema());
TableSchema schema = new TableSchema();
schema.setDatabase(tablePath.getDatabaseName());
schema.setTable(tablePath.getObjectName());
schema.setTableComment(table.getComment());
schema.setFields(getCreateDorisColumns(table.getSchema()));
schema.setKeys(primaryKeys);
schema.setModel(DataModel.UNIQUE);
schema.setDistributeKeys(primaryKeys);
schema.setProperties(getCreateTableProps(options));
TableSchema schema =
DorisSchemaFactory.createTableSchema(
tablePath.getDatabaseName(),
tablePath.getObjectName(),
getCreateDorisColumns(table.getSchema()),
primaryKeys,
getCreateTableProps(options),
table.getComment());

dorisSystem.createTable(schema);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.catalog.doris;

import org.apache.flink.annotation.VisibleForTesting;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;

/**
* Factory that creates doris schema.
*
* <p>In the case where doris schema needs to be created, it is best to create it through this
* factory
*/
public class DorisSchemaFactory {
private static Map<String, Integer> tableBucketMap;

public static TableSchema createTableSchema(
String database,
String table,
Map<String, FieldSchema> columnFields,
List<String> pkKeys,
Map<String, String> tableProperties,
String tableComment) {
TableSchema tableSchema = new TableSchema();
tableSchema.setDatabase(database);
tableSchema.setTable(table);
tableSchema.setModel(
CollectionUtils.isEmpty(pkKeys) ? DataModel.DUPLICATE : DataModel.UNIQUE);
tableSchema.setFields(columnFields);
tableSchema.setKeys(buildKeys(pkKeys, columnFields));
tableSchema.setTableComment(tableComment);
tableSchema.setDistributeKeys(buildDistributeKeys(pkKeys, columnFields));
tableSchema.setProperties(tableProperties);
if (tableProperties.containsKey("table-buckets")) {
if (MapUtils.isEmpty(tableBucketMap)) {
String tableBucketsConfig = tableProperties.get("table-buckets");
tableBucketMap = buildTableBucketMap(tableBucketsConfig);
}
Integer buckets = parseTableSchemaBuckets(tableBucketMap, table);
tableSchema.setTableBuckets(buckets);
}
return tableSchema;
}

private static List<String> buildDistributeKeys(
List<String> primaryKeys, Map<String, FieldSchema> fields) {
return buildKeys(primaryKeys, fields);
}

/**
* Theoretically, the duplicate table of doris does not need to distinguish the key column, but
* in the actual table creation statement, the key column will be automatically added. So if it
* is a duplicate table, primaryKeys is empty, and we uniformly take the first field as the key.
*/
private static List<String> buildKeys(
List<String> primaryKeys, Map<String, FieldSchema> fields) {
if (CollectionUtils.isNotEmpty(primaryKeys)) {
return primaryKeys;
}
if (!fields.isEmpty()) {
Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
return Collections.singletonList(firstField.getKey());
}
return new ArrayList<>();
}

@VisibleForTesting
public static Integer parseTableSchemaBuckets(
Map<String, Integer> tableBucketsMap, String tableName) {
if (tableBucketsMap != null) {
// Firstly, if the table name is in the table-buckets map, set the buckets of the table.
if (tableBucketsMap.containsKey(tableName)) {
return tableBucketsMap.get(tableName);
}
// Secondly, iterate over the map to find a corresponding regular expression match.
for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
Pattern pattern = Pattern.compile(entry.getKey());
if (pattern.matcher(tableName).matches()) {
return entry.getValue();
}
}
}
return null;
}

/**
* Build table bucket Map.
*
* @param tableBuckets the string of tableBuckets, eg:student:10,student_info:20,student.*:30
* @return The table name and buckets map. The key is table name, the value is buckets.
*/
@VisibleForTesting
public static Map<String, Integer> buildTableBucketMap(String tableBuckets) {
Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
String[] tableBucketsArray = tableBuckets.split(",");
for (String tableBucket : tableBucketsArray) {
String[] tableBucketArray = tableBucket.split(":");
tableBucketsMap.put(
tableBucketArray[0].trim(), Integer.parseInt(tableBucketArray[1].trim()));
}
return tableBucketsMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
import net.sf.jsqlparser.statement.create.table.CreateTable;
import net.sf.jsqlparser.statement.create.table.Index;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,7 +58,7 @@ public class SQLParserSchemaManager implements Serializable {
* Doris' schema change only supports ADD, DROP, and RENAME operations. This method is only used
* to parse the above schema change operations.
*/
public List<String> parserAlterDDLs(
public List<String> parseAlterDDLs(
SourceConnector sourceConnector, String ddl, String dorisTable) {
List<String> ddlList = new ArrayList<>();
try {
Expand Down Expand Up @@ -137,30 +136,16 @@ public TableSchema parseCreateTableStatement(

List<Index> indexes = createTable.getIndexes();
extractIndexesPrimaryKey(indexes, pkKeys);

String[] dbTable = dorisTable.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);
TableSchema tableSchema = new TableSchema();
tableSchema.setDatabase(dbTable[0]);
tableSchema.setTable(dbTable[1]);
tableSchema.setModel(pkKeys.isEmpty() ? DataModel.DUPLICATE : DataModel.UNIQUE);
tableSchema.setFields(columnFields);
tableSchema.setKeys(pkKeys);
tableSchema.setTableComment(

return DorisSchemaFactory.createTableSchema(
dbTable[0],
dbTable[1],
columnFields,
pkKeys,
tableProperties,
extractTableComment(createTable.getTableOptionsStrings()));
tableSchema.setDistributeKeys(
JsonDebeziumChangeUtils.buildDistributeKeys(pkKeys, columnFields));
tableSchema.setProperties(tableProperties);
if (tableProperties.containsKey("table-buckets")) {
String tableBucketsConfig = tableProperties.get("table-buckets");
Map<String, Integer> tableBuckets =
DatabaseSync.getTableBuckets(tableBucketsConfig);
Integer buckets =
JsonDebeziumChangeUtils.getTableSchemaBuckets(
tableBuckets, tableSchema.getTable());
tableSchema.setTableBuckets(buckets);
}
return tableSchema;
} else {
LOG.warn(
"Unsupported statement type. ddl={}, sourceConnector={}, dorisTable={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
Expand All @@ -31,17 +30,7 @@
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;

import static org.apache.doris.flink.tools.cdc.SourceConnector.MYSQL;
import static org.apache.doris.flink.tools.cdc.SourceConnector.ORACLE;
import static org.apache.doris.flink.tools.cdc.SourceConnector.POSTGRES;
import static org.apache.doris.flink.tools.cdc.SourceConnector.SQLSERVER;

public class JsonDebeziumChangeUtils {

Expand Down Expand Up @@ -101,35 +90,4 @@ public static String buildDorisTypeName(
}
return dorisTypeName;
}

public static List<String> buildDistributeKeys(
List<String> primaryKeys, Map<String, FieldSchema> fields) {
if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
return primaryKeys;
}
if (!fields.isEmpty()) {
Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next();
return Collections.singletonList(firstField.getKey());
}
return new ArrayList<>();
}

public static Integer getTableSchemaBuckets(
Map<String, Integer> tableBucketsMap, String tableName) {
if (tableBucketsMap != null) {
// Firstly, if the table name is in the table-buckets map, set the buckets of the table.
if (tableBucketsMap.containsKey(tableName)) {
return tableBucketsMap.get(tableName);
}
// Secondly, iterate over the map to find a corresponding regular expression match,
for (Entry<String, Integer> entry : tableBucketsMap.entrySet()) {

Pattern pattern = Pattern.compile(entry.getKey());
if (pattern.matcher(tableName).matches()) {
return entry.getValue();
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.exception.IllegalArgumentException;
Expand All @@ -37,7 +37,6 @@
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.EventType;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -221,37 +220,20 @@ public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessi
JsonNode columns = tableChange.get("table").get("columns");
JsonNode comment = tableChange.get("table").get("comment");
String tblComment = comment == null ? "" : comment.asText();
Map<String, FieldSchema> field = new LinkedHashMap<>();
Map<String, FieldSchema> fields = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(field, column);
buildFieldSchema(fields, column);
}
List<String> pkList = new ArrayList<>();
for (JsonNode column : pkColumns) {
String fieldName = column.asText();
pkList.add(fieldName);
}
String[] dbTable = dorisTable.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);

TableSchema tableSchema = new TableSchema();
tableSchema.setFields(field);
tableSchema.setKeys(pkList);
tableSchema.setDistributeKeys(JsonDebeziumChangeUtils.buildDistributeKeys(pkList, field));
tableSchema.setTableComment(tblComment);
tableSchema.setProperties(tableProperties);
tableSchema.setModel(pkList.isEmpty() ? DataModel.DUPLICATE : DataModel.UNIQUE);

String[] split = dorisTable.split("\\.");
Preconditions.checkArgument(split.length == 2);
tableSchema.setDatabase(split[0]);
tableSchema.setTable(split[1]);
if (tableProperties.containsKey("table-buckets")) {
String tableBucketsConfig = tableProperties.get("table-buckets");
Map<String, Integer> tableBuckets = DatabaseSync.getTableBuckets(tableBucketsConfig);
Integer buckets =
JsonDebeziumChangeUtils.getTableSchemaBuckets(
tableBuckets, tableSchema.getTable());
tableSchema.setTableBuckets(buckets);
}
return tableSchema;
return DorisSchemaFactory.createTableSchema(
dbTable[0], dbTable[1], fields, pkList, tableProperties, tblComment);
}

private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public boolean schemaChange(JsonNode recordRoot) {
LOG.warn("Failed to get doris table tuple. record={}", recordRoot);
return false;
}
List<String> ddlList = tryParserAlterDDLs(recordRoot);
List<String> ddlList = tryParseAlterDDLs(recordRoot);
status = executeAlterDDLs(ddlList, recordRoot, dorisTableTuple, status);
}
} catch (Exception ex) {
Expand All @@ -110,12 +110,12 @@ public TableSchema tryParseCreateTableStatement(JsonNode record, String dorisTab
}

@VisibleForTesting
public List<String> tryParserAlterDDLs(JsonNode record) throws IOException {
public List<String> tryParseAlterDDLs(JsonNode record) throws IOException {
String dorisTable =
JsonDebeziumChangeUtils.getDorisTableIdentifier(record, dorisOptions, tableMapping);
JsonNode historyRecord = extractHistoryRecord(record);
String ddl = extractJsonNode(historyRecord, "ddl");
extractSourceConnector(record);
return sqlParserSchemaManager.parserAlterDDLs(sourceConnector, ddl, dorisTable);
return sqlParserSchemaManager.parseAlterDDLs(sourceConnector, ddl, dorisTable);
}
}
Loading

0 comments on commit 102a13c

Please sign in to comment.