diff --git a/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java b/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java index 481f25fe41..e1eacd045c 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java @@ -54,10 +54,12 @@ public FlinkSqlTask(TaskDTO task) { @Override public List explain() { + jobManager.setPlanMode(true); return jobManager.explainSql(task.getStatement()).getSqlExplainResults(); } public ObjectNode getJobPlan() { + jobManager.setPlanMode(true); String planJson = jobManager.getJobPlanJson(task.getStatement()); return JsonUtils.parseObject(planJson); } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java index f9c82b3846..fe22be6923 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java @@ -20,48 +20,31 @@ package org.dinky.cdc; import org.dinky.assertion.Asserts; -import org.dinky.cdc.utils.FlinkStatementUtil; +import org.dinky.cdc.convert.DataTypeConverter; import org.dinky.data.model.Column; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; import org.dinky.data.model.Table; import org.dinky.executor.CustomTableEnvironment; import org.dinky.utils.JsonUtils; +import org.dinky.utils.SplitUtil; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; import org.apache.flink.table.operations.ModifyOperation; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimeType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarBinaryType; -import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; -import java.math.BigDecimal; -import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; @@ -70,57 +53,48 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import javax.xml.bind.DatatypeConverter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - public abstract class AbstractSinkBuilder implements SinkBuilder { - protected ObjectMapper objectMapper = new ObjectMapper(); protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class); + protected ObjectMapper objectMapper = new ObjectMapper(); protected FlinkCDCConfig config; + protected StreamExecutionEnvironment env; + protected CustomTableEnvironment customTableEnvironment; protected List modifyOperations = new ArrayList<>(); - private ZoneId sinkTimeZone = ZoneId.of("UTC"); + protected ZoneId sinkTimeZone = ZoneId.systemDefault(); - protected List typeConverterList = null; - - protected AbstractSinkBuilder() { - initTypeConverterList(); - } + protected AbstractSinkBuilder() {} protected AbstractSinkBuilder(FlinkCDCConfig config) { this.config = config; - initTypeConverterList(); } - protected void initTypeConverterList() { - typeConverterList = Lists.newArrayList( - this::convertVarCharType, - this::convertDateType, - this::convertTimeType, - this::convertVarBinaryType, - this::convertBigIntType, - this::convertFloatType, - this::convertDecimalType, - this::convertTimestampType); + protected ZoneId getSinkTimeZone() { + return this.sinkTimeZone; } - public FlinkCDCConfig getConfig() { - return config; + protected void init(StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment) { + this.env = env; + this.customTableEnvironment = customTableEnvironment; + initSinkTimeZone(); } - public void setConfig(FlinkCDCConfig config) { - this.config = config; + private void initSinkTimeZone() { + final String timeZone = config.getSink().get("timezone"); + config.getSink().remove("timezone"); + if (Asserts.isNotNullString(timeZone)) { + sinkTimeZone = ZoneId.of(timeZone); + logger.info("Sink timezone is {}", sinkTimeZone); + } } protected Properties getProperties() { @@ -139,41 +113,81 @@ protected Properties getProperties() { return properties; } - protected SingleOutputStreamOperator deserialize(DataStreamSource dataStreamSource) { + @SuppressWarnings("rawtypes") + protected SingleOutputStreamOperator deserialize(SingleOutputStreamOperator dataStreamSource) { return dataStreamSource .map((MapFunction) value -> objectMapper.readValue(value, Map.class)) - .returns(Map.class); + .returns(Map.class) + .name("Deserializer"); } - protected SingleOutputStreamOperator shunt( - SingleOutputStreamOperator mapOperator, Table table, String schemaFieldName) { - final String tableName = table.getName(); - final String schemaName = table.getSchema(); - return mapOperator.filter((FilterFunction) value -> { - LinkedHashMap source = (LinkedHashMap) value.get("source"); - return tableName.equals(source.get("table").toString()) - && schemaName.equals(source.get(schemaFieldName).toString()); - }); + @SuppressWarnings("rawtypes") + protected SingleOutputStreamOperator partitionByTableAndPrimarykey( + SingleOutputStreamOperator mapOperator, Map tableMap) { + final String schemaFieldName = config.getSchemaFieldName(); + final Map configSplit = config.getSplit(); + mapOperator.partitionCustom( + new Partitioner() { + @Override + public int partition(String key, int numPartitions) { + return Math.abs(key.hashCode()) % numPartitions; + } + }, + map -> { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + String tableName = getMergedTableName(source, schemaFieldName, configSplit); + Table table = tableMap.get(tableName); + List primaryKeys = table.getColumns().stream() + .map(column -> { + if (column.isKeyFlag()) { + return column.getName(); + } + return ""; + }) + .collect(Collectors.toList()); + + return tableName + String.join("_", primaryKeys); + }); + return mapOperator.name("PartitionByPrimaryKey"); + } + + protected String getMergedTableName(LinkedHashMap source, String schemaFieldName, Map split) { + if (Asserts.isNullMap(split)) { + return source.get(schemaFieldName).toString() + "." + + source.get("table").toString(); + } + return SplitUtil.getReValue(source.get(schemaFieldName).toString(), split) + + "." + + SplitUtil.getReValue(source.get("table").toString(), split); } - protected DataStream shunt(SingleOutputStreamOperator processOperator, Table table, OutputTag tag) { - processOperator.forward(); - return processOperator.getSideOutput(tag).forward(); + @SuppressWarnings("rawtypes") + protected SingleOutputStreamOperator shunt( + SingleOutputStreamOperator mapOperator, String schemaName, String tableName) { + final String schemaFieldName = config.getSchemaFieldName(); + return mapOperator + .filter((FilterFunction) value -> { + LinkedHashMap source = (LinkedHashMap) value.get("source"); + return tableName.equals(source.get("table").toString()) + && schemaName.equals(source.get(schemaFieldName).toString()); + }) + .name("Shunt"); } @SuppressWarnings("rawtypes") - protected DataStream buildRowData( + private DataStream buildRowData( SingleOutputStreamOperator filterOperator, List columnNameList, List columnTypeList, String schemaTableName) { return filterOperator .flatMap(sinkRowDataFunction(columnNameList, columnTypeList, schemaTableName)) - .returns(RowData.class); + .returns(RowData.class) + .name("FlatMapRowData"); } @SuppressWarnings("rawtypes") - protected FlatMapFunction sinkRowDataFunction( + private FlatMapFunction sinkRowDataFunction( List columnNameList, List columnTypeList, String schemaTableName) { return (value, out) -> { try { @@ -203,7 +217,7 @@ protected FlatMapFunction sinkRowDataFunction( } @SuppressWarnings("rawtypes") - protected void rowDataCollect( + private void rowDataCollect( List columnNameList, List columnTypeList, Collector out, @@ -218,13 +232,16 @@ protected void rowDataCollect( } @SuppressWarnings("rawtypes") - protected Object buildRowDataValues(Map value, RowKind rowKind, String columnName, LogicalType columnType) { - Map data = getOriginRowData(rowKind, value); - return convertValue(data.get(columnName), columnType); + private Object buildRowDataValues(Map value, RowKind rowKind, String columnName, LogicalType columnType) { + Map data = getOriginData(rowKind, value); + return DataTypeConverter.convertToRowData(data.get(columnName), columnType, sinkTimeZone); } @SuppressWarnings("rawtypes") - protected Map getOriginRowData(RowKind rowKind, Map value) { + private Map getOriginData(RowKind rowKind, Map value) { + if (Asserts.isNullMap(value)) { + return Collections.emptyMap(); + } switch (rowKind) { case INSERT: case UPDATE_AFTER: @@ -238,268 +255,76 @@ protected Map getOriginRowData(RowKind rowKind, Map value) { return Collections.emptyMap(); } - public void addSink( - StreamExecutionEnvironment env, + protected void addSink( DataStream rowDataDataStream, Table table, List columnNameList, List columnTypeList) {} - protected List createInsertOperations( - CustomTableEnvironment customTableEnvironment, Table table, String viewName, String tableName) { - String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName, config); - logger.info(cdcSqlInsert); - - List operations = customTableEnvironment.getParser().parse(cdcSqlInsert); - logger.info("Create {} FlinkSQL insert into successful...", tableName); - if (operations.isEmpty()) { - return operations; + protected List getSortedSchemaList() { + final List schemaList = config.getSchemaList(); + if (Asserts.isNullCollection(schemaList)) { + throw new IllegalArgumentException("Schema list is empty, please check your configuration and try again."); } - - try { - Operation operation = operations.get(0); - if (operation instanceof ModifyOperation) { - modifyOperations.add((ModifyOperation) operation); + for (Schema schema : schemaList) { + if (Asserts.isNullCollection(schema.getTables())) { + // if schema tables is empty, throw exception + throw new IllegalArgumentException( + "Schema tables is empty, please check your configuration or check your database permission and try again."); } - - } catch (Exception e) { - logger.error("Translate to plan occur exception: {}", e.toString()); - throw e; + // if schema tables is not empty, sort by table name to keep node sort + schema.setTables(schema.getTables().stream() + .sorted(Comparator.comparing(Table::getName)) + .collect(Collectors.toList())); } - return operations; + return schemaList; } + @SuppressWarnings("rawtypes") @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, + public void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { + init(env, customTableEnvironment); + buildPipeline(dataStreamSource, getSortedSchemaList()); + } - final String timeZone = config.getSink().get("timezone"); - config.getSink().remove("timezone"); - if (Asserts.isNotNullString(timeZone)) { - sinkTimeZone = ZoneId.of(timeZone); - logger.info("Sink timezone is {}", sinkTimeZone); - } - - final List schemaList = config.getSchemaList(); - if (Asserts.isNullCollection(schemaList)) { - logger.warn("Schema list is empty, please check your configuration and try again."); - return dataStreamSource; + protected void buildPipeline(DataStreamSource dataStreamSource, List schemaList) { + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); + logger.info("Build deserialize successful..."); + final Map tableMap = new LinkedHashMap<>(); + for (Schema schema : schemaList) { + for (Table table : schema.getTables()) { + tableMap.put(table.getSchemaTableName(), table); + } } - - final String schemaFieldName = config.getSchemaFieldName(); - - if (Asserts.isNotNullCollection(schemaList)) { - SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); - } - // if schema tables is not empty, sort by table name - List tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { - SingleOutputStreamOperator filterOperator = shunt(mapOperator, table, schemaFieldName); - - List columnNameList = new ArrayList<>(); - List columnTypeList = new ArrayList<>(); - - buildColumn(columnNameList, columnTypeList, table.getColumns()); - - DataStream rowDataDataStream = - buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName()); - - addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); - } + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + for (Schema schema : schemaList) { + for (Table table : schema.getTables()) { + SingleOutputStreamOperator singleOutputStreamOperator = + shunt(mapOperator, table.getSchema(), table.getName()); + logger.info("Build shunt successful..."); + final List columnNameList = new ArrayList<>(); + final List columnTypeList = new ArrayList<>(); + buildColumn(columnNameList, columnTypeList, table.getColumns()); + + DataStream rowDataDataStream = buildRowData( + singleOutputStreamOperator, columnNameList, columnTypeList, table.getSchemaTableName()); + logger.info("Build flatRowData successful..."); + addSink(rowDataDataStream, table, columnNameList, columnTypeList); } } - return dataStreamSource; } protected void buildColumn(List columnNameList, List columnTypeList, List columns) { for (Column column : columns) { columnNameList.add(column.getName()); - columnTypeList.add(getLogicalType(column)); + columnTypeList.add(DataTypeConverter.getLogicalType(column)); } } - public LogicalType getLogicalType(Column column) { - switch (column.getJavaType()) { - case BOOLEAN: - case JAVA_LANG_BOOLEAN: - return new BooleanType(); - case BYTE: - case JAVA_LANG_BYTE: - return new TinyIntType(); - case SHORT: - case JAVA_LANG_SHORT: - return new SmallIntType(); - case LONG: - case JAVA_LANG_LONG: - return new BigIntType(); - case FLOAT: - case JAVA_LANG_FLOAT: - return new FloatType(); - case DOUBLE: - case JAVA_LANG_DOUBLE: - return new DoubleType(); - case DECIMAL: - if (column.getPrecision() == null || column.getPrecision() == 0) { - return new DecimalType(38, column.getScale()); - } else { - return new DecimalType(column.getPrecision(), column.getScale()); - } - case INT: - case INTEGER: - return new IntType(); - case TIME: - case LOCALTIME: - return new TimeType(column.isNullable(), column.getPrecision() == null ? 0 : column.getPrecision()); - case DATE: - case LOCAL_DATE: - return new DateType(); - case LOCAL_DATETIME: - case TIMESTAMP: - if (column.getLength() != null) { - return new TimestampType(column.getLength()); - } else { - return new TimestampType(3); - } - case BYTES: - return new VarBinaryType(Integer.MAX_VALUE); - case STRING: - default: - return new VarCharType(); - } - } - - protected Object convertValue(Object value, LogicalType logicalType) { - if (value == null) { - return null; - } - - for (ConvertType convertType : typeConverterList) { - Optional result = convertType.convert(value, logicalType); - if (result.isPresent()) { - return result.get(); - } - } - return value; - } - - protected Optional convertVarBinaryType(Object value, LogicalType logicalType) { - if (logicalType instanceof VarBinaryType) { - // VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC. - if (value instanceof String) { - return Optional.of(DatatypeConverter.parseBase64Binary(value.toString())); - } - - return Optional.of(value); - } - return Optional.empty(); - } - - protected Optional convertBigIntType(Object value, LogicalType logicalType) { - if (logicalType instanceof BigIntType) { - if (value instanceof Integer) { - return Optional.of(((Integer) value).longValue()); - } - - return Optional.of(value); - } - return Optional.empty(); - } - - protected Optional convertFloatType(Object value, LogicalType logicalType) { - if (logicalType instanceof FloatType) { - if (value instanceof Float) { - return Optional.of(value); - } - - if (value instanceof Double) { - return Optional.of(((Double) value).floatValue()); - } - - return Optional.of(Float.parseFloat(value.toString())); - } - return Optional.empty(); - } - - protected Optional convertDecimalType(Object value, LogicalType logicalType) { - if (logicalType instanceof DecimalType) { - final DecimalType decimalType = (DecimalType) logicalType; - return Optional.ofNullable(DecimalData.fromBigDecimal( - new BigDecimal((String) value), decimalType.getPrecision(), decimalType.getScale())); - } - return Optional.empty(); - } - - protected Optional convertTimestampType(Object value, LogicalType logicalType) { - if (logicalType instanceof TimestampType) { - if (value instanceof Integer) { - return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } else if (value instanceof String) { - return Optional.of( - Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime()); - } else { - TimestampType logicalType1 = (TimestampType) logicalType; - if (logicalType1.getPrecision() == 3) { - return Optional.of(Instant.ofEpochMilli((long) value) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } else if (logicalType1.getPrecision() > 3) { - return Optional.of( - Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - return Optional.of(Instant.ofEpochSecond(((long) value)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - } - return Optional.empty(); - } - - protected Optional convertDateType(Object target, LogicalType logicalType) { - if (logicalType instanceof DateType) { - return Optional.of(StringData.fromString(Instant.ofEpochMilli((long) target) - .atZone(ZoneId.systemDefault()) - .toLocalDate() - .toString())); - } - return Optional.empty(); - } - - protected Optional convertTimeType(Object target, LogicalType logicalType) { - if (logicalType instanceof TimeType) { - return Optional.of(StringData.fromString(Instant.ofEpochMilli((long) target) - .atZone(ZoneId.systemDefault()) - .toLocalTime() - .toString())); - } - return Optional.empty(); - } - - protected Optional convertVarCharType(Object target, LogicalType logicalType) { - if (logicalType instanceof VarCharType) { - return Optional.of(StringData.fromString((String) target)); - } - return Optional.empty(); - } - - @FunctionalInterface - public interface ConvertType { - Optional convert(Object target, LogicalType logicalType); - } - @Override public String getSinkSchemaName(Table table) { return config.getSink().getOrDefault("sink.db", table.getSchema()); @@ -576,21 +401,6 @@ private Map parseMappingRoute(String mappingRoute) { return mappingRules; } - protected List getPKList(Table table) { - if (Asserts.isNullCollection(table.getColumns())) { - return new ArrayList<>(); - } - - return table.getColumns().stream() - .filter(Column::isKeyFlag) - .map(Column::getName) - .collect(Collectors.toList()); - } - - protected ZoneId getSinkTimeZone() { - return this.sinkTimeZone; - } - protected Map getTableTopicMap() { String topicMapStr = this.config.getSink().get("table.topic.map"); Map tableTopicMap = new HashMap<>(); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/SinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/SinkBuilder.java index 6bcd11a5a3..ecd521f05d 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/SinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/SinkBuilder.java @@ -32,8 +32,7 @@ public interface SinkBuilder { SinkBuilder create(FlinkCDCConfig config); - DataStreamSource build( - CDCBuilder cdcBuilder, + void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java new file mode 100644 index 0000000000..91481a571b --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java @@ -0,0 +1,522 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc.convert; + +import org.dinky.assertion.Asserts; +import org.dinky.data.model.Column; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.DatatypeConverter; + +public class DataTypeConverter { + + static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1); + static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); + static final long NANOSECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1); + public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000 + + public static LogicalType getLogicalType(Column column) { + switch (column.getJavaType()) { + case BOOLEAN: + case JAVA_LANG_BOOLEAN: + return new BooleanType(); + case BYTE: + case JAVA_LANG_BYTE: + return new TinyIntType(); + case SHORT: + case JAVA_LANG_SHORT: + return new SmallIntType(); + case LONG: + case JAVA_LANG_LONG: + return new BigIntType(); + case FLOAT: + case JAVA_LANG_FLOAT: + return new FloatType(); + case DOUBLE: + case JAVA_LANG_DOUBLE: + return new DoubleType(); + case DECIMAL: + if (column.getPrecision() == null || column.getPrecision() == 0) { + return new DecimalType(38, column.getScale()); + } else { + return new DecimalType(column.getPrecision(), column.getScale()); + } + case INT: + case INTEGER: + return new IntType(); + case TIME: + case LOCALTIME: + return new TimeType( + column.isNullable(), + column.getLength() == 0 + ? (column.getPrecision() == null ? 0 : column.getPrecision()) + : column.getLength()); + case DATE: + case LOCAL_DATE: + return new DateType(); + case LOCAL_DATETIME: + case TIMESTAMP: + if (Asserts.isNotNull(column.getLength())) { + return new TimestampType(column.getLength()); + } else { + return new TimestampType(3); + } + case BYTES: + return new VarBinaryType(Integer.MAX_VALUE); + // return new BinaryType(Asserts.isNull(column.getLength())? Integer.MAX_VALUE: + // column.getLength()); + case STRING: + default: + return new VarCharType(Asserts.isNull(column.getLength()) ? Integer.MAX_VALUE : column.getLength()); + } + } + + public static Object convertToRow(Object value, LogicalType logicalType, ZoneId timeZone) { + if (Asserts.isNull(value)) { + return Optional.empty(); + } + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return convertToBoolean(value); + case TINYINT: + return convertToByte(value); + case SMALLINT: + return convertToShort(value); + case INTEGER: + return convertToInt(value); + case BIGINT: + return convertToLong(value); + case DATE: + return convertToDate(value); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(value, logicalType, timeZone); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestamp(value, logicalType); + case TIMESTAMP_WITH_TIME_ZONE: + return convertToTimestampWithTimeZone(value, logicalType, timeZone); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToTimestampWithLocalTimeZone(value, logicalType); + case FLOAT: + return convertToFloat(value); + case DOUBLE: + return convertToDouble(value); + case CHAR: + case VARCHAR: + return convertToString(value); + case BINARY: + case VARBINARY: + return convertToBinary(value); + case DECIMAL: + return convertToDecimal(value, logicalType); + case ROW: + return value; + case ARRAY: + case MAP: + default: + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } + } + + public static Object convertToRowData(Object value, LogicalType logicalType, ZoneId timeZone) { + if (Asserts.isNull(value)) { + return Optional.empty(); + } + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return convertToBoolean(value); + case TINYINT: + return convertToByte(value); + case SMALLINT: + return convertToShort(value); + case INTEGER: + return convertToInt(value); + case BIGINT: + return convertToLong(value); + case DATE: + return convertToDate(value); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(value, logicalType, timeZone); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestampData(value, logicalType); + case TIMESTAMP_WITH_TIME_ZONE: + return convertToTimestampDataWithTimeZone(value, logicalType, timeZone); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToTimestampDataWithLocalTimeZone(value, logicalType); + case FLOAT: + return convertToFloat(value); + case DOUBLE: + return convertToDouble(value); + case CHAR: + case VARCHAR: + return convertToStringData(value); + case BINARY: + case VARBINARY: + return convertToBinary(value); + case DECIMAL: + return convertToDecimalData(value, logicalType); + case ROW: + return value; + case ARRAY: + case MAP: + default: + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } + } + + private static Object convertToBoolean(Object obj) { + if (obj instanceof Boolean) { + return obj; + } else if (obj instanceof Byte) { + return (byte) obj == 1; + } else if (obj instanceof Short) { + return (short) obj == 1; + } else if (obj instanceof Number) { + return obj.equals(1); + } else { + return Boolean.parseBoolean(obj.toString()); + } + } + + private static Object convertToByte(Object obj) { + return Byte.parseByte(obj.toString()); + } + + private static Object convertToShort(Object obj) { + return Short.parseShort(obj.toString()); + } + + private static Object convertToInt(Object obj) { + if (obj instanceof Integer) { + return obj; + } else if (obj instanceof Long) { + return ((Long) obj).intValue(); + } else { + return Integer.parseInt(obj.toString()); + } + } + + private static Object convertToLong(Object obj) { + if (obj instanceof Integer) { + return ((Integer) obj).longValue(); + } else if (obj instanceof Long) { + return obj; + } else { + return Long.parseLong(obj.toString()); + } + } + + private static Object convertToFloat(Object obj) { + if (obj instanceof Float) { + return obj; + } else if (obj instanceof Double) { + return ((Double) obj).floatValue(); + } else { + return Float.parseFloat(obj.toString()); + } + } + + private static Object convertToDouble(Object obj) { + if (obj instanceof Float) { + return ((Float) obj).doubleValue(); + } else if (obj instanceof Double) { + return obj; + } else { + return Double.parseDouble(obj.toString()); + } + } + + private static Object convertToDate(Object obj) { + return toLocalDate(obj); + } + + private static LocalDate toLocalDate(Object obj) { + if (obj instanceof Long) { + // Assume the value is the epoch day number + return LocalDate.ofEpochDay((Long) obj); + } + if (obj instanceof Integer) { + // Assume the value is the epoch day number + return LocalDate.ofEpochDay((Integer) obj); + } + throw new IllegalArgumentException("Unable to convert to LocalDate from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTime(Object obj, LogicalType logicalType, ZoneId timeZone) { + TimeType timeType = (TimeType) logicalType; + if (obj instanceof Number) { + Number number = (Number) obj; + long value = number.longValue(); + if (value > MILLIS_PER_DAY) { + value = value / 1000; + if (value > MILLIS_PER_DAY) { + value = value / 1000; + } + } + if (timeType.getPrecision() == 0) { + return Instant.ofEpochSecond(value).atZone(timeZone).toLocalTime(); + } + if (timeType.getPrecision() == 3) { + return Instant.ofEpochMilli(value).atZone(timeZone).toLocalTime(); + } + if (timeType.getPrecision() == 6) { + return Instant.ofEpochMilli(value % 1000).atZone(timeZone).toLocalTime(); + } + return Instant.ofEpochMilli(value % 1000000).atZone(timeZone).toLocalTime(); + } + throw new IllegalArgumentException("Unable to convert to LocalTime from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static LocalTime toLocalTime(Object obj, ZoneId timeZone) { + if (obj == null) { + return null; + } + if (obj instanceof Long) { + return Instant.ofEpochMilli((long) obj).atZone(timeZone).toLocalTime(); + } + if (obj instanceof LocalTime) { + return (LocalTime) obj; + } + if (obj instanceof LocalDateTime) { + return ((LocalDateTime) obj).toLocalTime(); + } + if (obj instanceof java.sql.Date) { + throw new IllegalArgumentException( + "Unable to convert to LocalDate from a java.sql.Date value '" + obj + "'"); + } + if (obj instanceof java.sql.Time) { + java.sql.Time time = (java.sql.Time) obj; + long millis = (int) (time.getTime() % MILLISECONDS_PER_SECOND); + int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND); + return LocalTime.of(time.getHours(), time.getMinutes(), time.getSeconds(), nanosOfSecond); + } + if (obj instanceof java.sql.Timestamp) { + java.sql.Timestamp timestamp = (java.sql.Timestamp) obj; + return LocalTime.of( + timestamp.getHours(), timestamp.getMinutes(), timestamp.getSeconds(), timestamp.getNanos()); + } + if (obj instanceof java.util.Date) { + java.util.Date date = (java.util.Date) obj; + long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND); + int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND); + return LocalTime.of(date.getHours(), date.getMinutes(), date.getSeconds(), nanosOfSecond); + } + if (obj instanceof Duration) { + Long value = ((Duration) obj).toNanos(); + if (value >= 0 && value <= NANOSECONDS_PER_DAY) { + return LocalTime.ofNanoOfDay(value); + } else { + throw new IllegalArgumentException( + "Time values must use number of milliseconds greater than 0 and less than 86400000000000"); + } + } + throw new IllegalArgumentException("Unable to convert to LocalTime from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTimestamp(Object obj, LogicalType logicalType) { + if (obj instanceof Integer) { + return Instant.ofEpochMilli(((Integer) obj).longValue()) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (obj instanceof String) { + return Instant.parse((String) obj).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else if (obj instanceof Long) { + TimestampType logicalType1 = (TimestampType) logicalType; + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) obj) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) obj) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) obj)) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + throw new IllegalArgumentException("Unable to convert to TIMESTAMP from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTimestampData(Object obj, LogicalType logicalType) { + return TimestampData.fromLocalDateTime((LocalDateTime) convertToTimestamp(obj, logicalType)); + } + + private static Object convertToTimestampWithTimeZone(Object obj, LogicalType logicalType, ZoneId timeZone) { + if (obj instanceof Integer) { + return Instant.ofEpochMilli(((Integer) obj).longValue()) + .atZone(timeZone) + .toLocalDateTime(); + } else if (obj instanceof String) { + return Instant.parse((String) obj).atZone(timeZone).toLocalDateTime(); + } else if (obj instanceof Long) { + TimestampType logicalType1 = (TimestampType) logicalType; + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) obj).atZone(timeZone).toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) obj) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(timeZone) + .toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) obj)).atZone(timeZone).toLocalDateTime(); + } + throw new IllegalArgumentException("Unable to convert to TIMESTAMP from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTimestampDataWithTimeZone(Object obj, LogicalType logicalType, ZoneId timeZone) { + return TimestampData.fromLocalDateTime( + (LocalDateTime) convertToTimestampWithTimeZone(obj, logicalType, timeZone)); + } + + private static Object convertToTimestampWithLocalTimeZone(Object obj, LogicalType logicalType) { + if (obj instanceof Integer) { + return Instant.ofEpochMilli(((Integer) obj).longValue()) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (obj instanceof String) { + return Instant.parse((String) obj).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else if (obj instanceof Long) { + TimestampType logicalType1 = (TimestampType) logicalType; + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) obj) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) obj) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) obj)) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + throw new IllegalArgumentException("Unable to convert to TIMESTAMP from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTimestampDataWithLocalTimeZone(Object obj, LogicalType logicalType) { + return TimestampData.fromLocalDateTime((LocalDateTime) convertToTimestampWithLocalTimeZone(obj, logicalType)); + } + + private static Object convertToString(Object obj) { + return String.valueOf(obj); + } + + private static Object convertToStringData(Object obj) { + return StringData.fromString(String.valueOf(obj)); + } + + private static Object convertToBinary(Object obj) { + if (obj instanceof byte[]) { + return obj; + } else if (obj instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) obj; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else if (obj instanceof String) { + return DatatypeConverter.parseBase64Binary(String.valueOf(obj)); + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + obj.getClass().getSimpleName()); + } + } + + private static Object convertToDecimal(Object obj, LogicalType logicalType) { + DecimalType decimalType = (DecimalType) logicalType; + if (obj instanceof BigDecimal) { + return obj; + } else if (obj instanceof Number) { + Number number = (Number) obj; + return BigDecimal.valueOf(number.longValue(), decimalType.getScale()); + } else if (obj instanceof String) { + MathContext mathContext = new MathContext(((DecimalType) logicalType).getPrecision()); + return new BigDecimal(String.valueOf(obj), mathContext); + } else { + throw new UnsupportedOperationException( + "Unsupported Decimal value type: " + obj.getClass().getSimpleName()); + } + } + + private static Object convertToDecimalData(Object obj, LogicalType logicalType) { + DecimalType decimalType = (DecimalType) logicalType; + if (obj instanceof BigDecimal) { + return DecimalData.fromBigDecimal( + new BigDecimal(String.valueOf(obj)), decimalType.getPrecision(), decimalType.getScale()); + } else if (obj instanceof Number) { + Number number = (Number) obj; + return DecimalData.fromBigDecimal( + BigDecimal.valueOf(number.longValue(), decimalType.getScale()), + decimalType.getPrecision(), + decimalType.getScale()); + } else if (obj instanceof String) { + return DecimalData.fromBigDecimal( + new BigDecimal(String.valueOf(obj)), decimalType.getPrecision(), decimalType.getScale()); + } else { + throw new UnsupportedOperationException( + "Unsupported Decimal value type: " + obj.getClass().getSimpleName()); + } + } +} diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java index 6caf75a0d7..e975dbc265 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java @@ -40,7 +40,6 @@ import org.dinky.assertion.Asserts; import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; import org.dinky.cdc.SinkBuilder; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; @@ -63,12 +62,10 @@ import org.apache.flink.util.OutputTag; import java.io.Serializable; -import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.stream.Collectors; /** * MysqlCDCBuilder @@ -76,6 +73,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializable { public static final String KEY_WORD = "datastream-kafka"; + private Properties kafkaProducerConfig; public KafkaSinkBuilder() {} @@ -85,7 +83,6 @@ public KafkaSinkBuilder(FlinkCDCConfig config) { @Override public void addSink( - StreamExecutionEnvironment env, DataStream rowDataDataStream, Table table, List columnNameList, @@ -102,18 +99,93 @@ public SinkBuilder create(FlinkCDCConfig config) { } @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, + public void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { - Properties kafkaProducerConfig = getProperties(); + init(env, customTableEnvironment); + + kafkaProducerConfig = getProperties(); if (Asserts.isNotNullString(config.getSink().get("topic"))) { + singleTopicSink(dataStreamSource); + } else { + multipleTopicSink(dataStreamSource); + } + } + + private void singleTopicSink(DataStreamSource dataStreamSource) { + org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = KafkaSink.builder() + .setBootstrapServers(config.getSink().get("brokers")) + .setRecordSerializer(KafkaRecordSerializationSchema.builder() + .setTopic(config.getSink().get("topic")) + .setValueSerializationSchema(new SimpleStringSchema()) + .build()) + .setTransactionalIdPrefix( + config.getSink().get("transactional.id.prefix") == null + ? "" + : config.getSink().get("transactional.id.prefix")) + .setDeliverGuarantee(DeliveryGuarantee.valueOf( + config.getSink().get("delivery.guarantee") == null + ? "NONE" + : config.getSink().get("delivery.guarantee"))); + if (!kafkaProducerConfig.isEmpty()) { + kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); + } + KafkaSink kafkaSink = kafkaSinkBuilder.build(); + dataStreamSource.sinkTo(kafkaSink); + } + + private void multipleTopicSink(DataStreamSource dataStreamSource) { + final Map> tagMap = new LinkedHashMap<>(); + final Map tableMap = new LinkedHashMap<>(); + final Map tableTopicMap = this.getTableTopicMap(); + final ObjectMapper objectMapper = new ObjectMapper(); + + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); + logger.info("Build deserialize successful..."); + + for (Schema schema : getSortedSchemaList()) { + for (Table table : schema.getTables()) { + String sinkTableName = getSinkTableName(table); + OutputTag outputTag = new OutputTag(sinkTableName) {}; + tagMap.put(table, outputTag); + tableMap.put(table.getSchemaTableName(), table); + } + } + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + final String schemaFieldName = config.getSchemaFieldName(); + SingleOutputStreamOperator process = mapOperator.process(new ProcessFunction() { + @Override + public void processElement(Map map, ProcessFunction.Context ctx, Collector out) + throws Exception { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + try { + String result = objectMapper.writeValueAsString(map); + Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + + source.get("table").toString()); + OutputTag outputTag = tagMap.get(table); + ctx.output(outputTag, result); + } catch (Exception e) { + out.collect(objectMapper.writeValueAsString(map)); + } + } + }); + logger.info("Build shunt successful..."); + tagMap.forEach((k, v) -> { + String topic = getSinkTableName(k); + if (tableTopicMap != null) { + String tableName = k.getName(); + String newTopic = tableTopicMap.get(tableName); + if (Asserts.isNotNullString(newTopic)) { + topic = newTopic; + } + } org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = KafkaSink.builder() .setBootstrapServers(config.getSink().get("brokers")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() - .setTopic(config.getSink().get("topic")) + .setTopic(topic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setTransactionalIdPrefix( @@ -127,90 +199,9 @@ public DataStreamSource build( if (!kafkaProducerConfig.isEmpty()) { kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); } + KafkaSink kafkaSink = kafkaSinkBuilder.build(); - dataStreamSource.sinkTo(kafkaSink); - } else { - Map> tagMap = new LinkedHashMap<>(); - Map tableMap = new LinkedHashMap<>(); - Map tableTopicMap = this.getTableTopicMap(); - ObjectMapper objectMapper = new ObjectMapper(); - SingleOutputStreamOperator mapOperator = dataStreamSource - .map(x -> objectMapper.readValue(x, Map.class)) - .returns(Map.class); - final List schemaList = config.getSchemaList(); - - final String schemaFieldName = config.getSchemaFieldName(); - if (Asserts.isNotNullCollection(schemaList)) { - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - logger.error("Schema:{} tables is empty", schema.getName()); - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); - } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { - String sinkTableName = getSinkTableName(table); - OutputTag outputTag = new OutputTag(sinkTableName) {}; - tagMap.put(table, outputTag); - tableMap.put(table.getSchemaTableName(), table); - } - } - SingleOutputStreamOperator process = mapOperator.process(new ProcessFunction() { - - @Override - public void processElement(Map map, ProcessFunction.Context ctx, Collector out) - throws Exception { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - try { - String result = objectMapper.writeValueAsString(map); - Table table = - tableMap.get(source.get(schemaFieldName).toString() + "." - + source.get("table").toString()); - OutputTag outputTag = tagMap.get(table); - ctx.output(outputTag, result); - } catch (Exception e) { - out.collect(objectMapper.writeValueAsString(map)); - } - } - }); - tagMap.forEach((k, v) -> { - String topic = getSinkTableName(k); - if (tableTopicMap != null) { - String tableName = k.getName(); - String newTopic = tableTopicMap.get(tableName); - if (Asserts.isNotNullString(newTopic)) { - topic = newTopic; - } - } - - org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = - KafkaSink.builder() - .setBootstrapServers(config.getSink().get("brokers")) - .setRecordSerializer(KafkaRecordSerializationSchema.builder() - .setTopic(topic) - .setValueSerializationSchema(new SimpleStringSchema()) - .build()) - .setTransactionalIdPrefix( - config.getSink().get("transactional.id.prefix") == null - ? "" - : config.getSink().get("transactional.id.prefix")) - .setDeliverGuarantee(DeliveryGuarantee.valueOf( - config.getSink().get("delivery.guarantee") == null - ? "NONE" - : config.getSink().get("delivery.guarantee"))); - if (!kafkaProducerConfig.isEmpty()) { - kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); - } - - KafkaSink kafkaSink = kafkaSinkBuilder.build(); - process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic); - }); - } - } - return dataStreamSource; + process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic); + }); } } diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java similarity index 80% rename from dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java rename to dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java index 83bfdbb81f..c4aac7f2c6 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java @@ -21,16 +21,13 @@ import org.dinky.assertion.Asserts; import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; import org.dinky.cdc.SinkBuilder; +import org.dinky.cdc.convert.DataTypeConverter; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; import org.dinky.data.model.Table; import org.dinky.executor.CustomTableEnvironment; -import org.dinky.utils.ObjectConvertUtil; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -43,12 +40,10 @@ import java.io.Serializable; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.Comparator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -78,55 +73,38 @@ public SinkBuilder create(FlinkCDCConfig config) { @SuppressWarnings("rawtypes") @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, + public void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { try { - SingleOutputStreamOperator mapOperator = - dataStreamSource.map((MapFunction) value -> objectMapper.readValue(value, Map.class)); - final List schemaList = config.getSchemaList(); - final String schemaFieldName = config.getSchemaFieldName(); - if (!Asserts.isNotNullCollection(schemaList)) { - return dataStreamSource; - } - - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); + init(env, customTableEnvironment); + + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); + logger.info("Build deserialize successful..."); + List sortedSchemaList = getSortedSchemaList(); + final Map tableMap = new LinkedHashMap<>(); + for (Schema schema : sortedSchemaList) { + for (Table table : schema.getTables()) { + tableMap.put(table.getSchemaTableName(), table); } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { + } + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + for (Schema schema : sortedSchemaList) { + for (Table table : schema.getTables()) { final String tableName = table.getName(); final String schemaName = table.getSchema(); - SingleOutputStreamOperator filterOperator = mapOperator.filter((FilterFunction) value -> { - LinkedHashMap source = (LinkedHashMap) value.get("source"); - return tableName.equals(source.get("table").toString()) - && schemaName.equals(source.get(schemaFieldName).toString()); - }); - String topic = getSinkTableName(table); - if (Asserts.isNotNullString(config.getSink().get("topic"))) { - topic = config.getSink().get("topic"); - } else { - Map tableTopicMap = this.getTableTopicMap(); - if (tableTopicMap != null) { - String newTopic = tableTopicMap.get(tableName); - if (Asserts.isNotNullString(newTopic)) { - topic = newTopic; - } - } - } + SingleOutputStreamOperator singleOutputStreamOperator = + shunt(mapOperator, schemaName, tableName); + logger.info("Build shunt successful..."); + List columnNameList = new LinkedList<>(); List columnTypeList = new LinkedList<>(); buildColumn(columnNameList, columnTypeList, table.getColumns()); - SingleOutputStreamOperator stringOperator = - filterOperator.process(new ProcessFunction() { + + SingleOutputStreamOperator stringOperator = singleOutputStreamOperator + .process(new ProcessFunction() { @Override public void processElement(Map value, Context context, Collector collector) @@ -200,14 +178,29 @@ public void processElement(Map value, Context context, Collector collect collector.collect(objectMapper.writeValueAsString(after)); } } - }); - stringOperator.addSink(new FlinkKafkaProducer(topic, new SimpleStringSchema(), getProperties())); + }) + .name("SerializerWithMetadata"); + + String topic = getSinkTableName(table); + if (Asserts.isNotNullString(config.getSink().get("topic"))) { + topic = config.getSink().get("topic"); + } else { + Map tableTopicMap = this.getTableTopicMap(); + if (tableTopicMap != null) { + String newTopic = tableTopicMap.get(tableName); + if (Asserts.isNotNullString(newTopic)) { + topic = newTopic; + } + } + } + + stringOperator.addSink( + new FlinkKafkaProducer(topic, new SimpleStringSchema(), getProperties())); } } } catch (Exception ex) { logger.error("kafka sink error:", ex); } - return dataStreamSource; } private void initializeObjectMapper() { @@ -219,11 +212,6 @@ private void initializeObjectMapper() { objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); } - @Override - protected Object convertValue(Object value, LogicalType logicalType) { - return ObjectConvertUtil.convertValue(value, logicalType); - } - @SuppressWarnings("rawtypes") private void convertAttr( List columnNameList, @@ -237,7 +225,8 @@ private void convertAttr( for (int i = 0; i < columnNameList.size(); i++) { String columnName = columnNameList.get(i); Object columnNameValue = value.remove(columnName); - Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i)); + Object columnNameNewVal = + DataTypeConverter.convertToRowData(columnNameValue, columnTypeList.get(i), getSinkTimeZone()); value.put(columnName, columnNameNewVal); } value.put("__op", op); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/print/PrintSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/print/PrintSinkBuilder.java new file mode 100644 index 0000000000..e7b8957678 --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/print/PrintSinkBuilder.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc.print; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import org.dinky.cdc.AbstractSinkBuilder; +import org.dinky.cdc.SinkBuilder; +import org.dinky.data.model.FlinkCDCConfig; +import org.dinky.data.model.Table; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.Serializable; +import java.util.List; + +public class PrintSinkBuilder extends AbstractSinkBuilder implements Serializable { + + public static final String KEY_WORD = "datastream-print"; + + public PrintSinkBuilder() {} + + public PrintSinkBuilder(FlinkCDCConfig config) { + super(config); + } + + @Override + protected void addSink( + DataStream rowDataDataStream, + Table table, + List columnNameList, + List columnTypeList) { + rowDataDataStream + .addSink(new RichSinkFunction() { + @Override + public void invoke(RowData value, Context context) throws Exception { + System.out.println(value.toString()); + } + }) + .name(String.format("Print Sink(table=[%s.%s])", getSinkSchemaName(table), getSinkTableName(table))); + } + + @Override + public String getHandle() { + return KEY_WORD; + } + + @Override + public SinkBuilder create(FlinkCDCConfig config) { + return new PrintSinkBuilder(config); + } +} diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java index d95d4917f8..ccfbdc1ec6 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java @@ -21,25 +21,25 @@ import org.dinky.assertion.Asserts; import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; +import org.dinky.cdc.convert.DataTypeConverter; +import org.dinky.cdc.utils.FlinkStatementUtil; +import org.dinky.data.model.Column; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; import org.dinky.data.model.Table; -import org.dinky.executor.CustomTableEnvironment; import org.dinky.utils.JsonUtils; import org.dinky.utils.LogUtil; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.Operation; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; @@ -48,18 +48,13 @@ import org.apache.flink.util.OutputTag; import java.io.Serializable; -import java.math.BigDecimal; -import java.time.ZoneId; import java.util.ArrayList; -import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; public abstract class AbstractSqlSinkBuilder extends AbstractSinkBuilder implements Serializable { - protected ZoneId sinkTimeZone = ZoneId.of("UTC"); protected AbstractSqlSinkBuilder() {} @@ -68,7 +63,7 @@ protected AbstractSqlSinkBuilder(FlinkCDCConfig config) { } @SuppressWarnings("rawtypes") - protected FlatMapFunction sqlSinkRowFunction( + private FlatMapFunction sinkRowFunction( List columnNameList, List columnTypeList, String schemaTableName) { return (value, out) -> { try { @@ -107,13 +102,16 @@ private void rowCollect( Map value) { Row row = Row.withPositions(rowKind, columnNameList.size()); for (int i = 0; i < columnNameList.size(); i++) { - row.setField(i, convertValue(value.get(columnNameList.get(i)), columnTypeList.get(i))); + row.setField( + i, + DataTypeConverter.convertToRow( + value.get(columnNameList.get(i)), columnTypeList.get(i), sinkTimeZone)); } out.collect(row); } @SuppressWarnings("rawtypes") - protected DataStream buildRow( + private DataStream buildRow( DataStream filterOperator, List columnNameList, List columnTypeList, @@ -121,31 +119,24 @@ protected DataStream buildRow( TypeInformation[] typeInformation = TypeConversions.fromDataTypeToLegacyInfo( TypeConversions.fromLogicalToDataType(columnTypeList.toArray(new LogicalType[0]))); - return filterOperator.flatMap( - sqlSinkRowFunction(columnNameList, columnTypeList, schemaTableName), - new RowTypeInfo(typeInformation, columnNameList.toArray(new String[0]))); - } - - @Override - protected Optional convertDecimalType(Object value, LogicalType logicalType) { - if (logicalType instanceof DecimalType) { - return Optional.of(new BigDecimal(String.valueOf(value))); - } - return Optional.empty(); + return filterOperator + .flatMap( + sinkRowFunction(columnNameList, columnTypeList, schemaTableName), + new RowTypeInfo(typeInformation, columnNameList.toArray(new String[0]))) + .name("FlatMapRow"); } @SuppressWarnings("rawtypes") - protected void addTableSinkForTags( - CustomTableEnvironment customTableEnvironment, - Map> tagMap, - SingleOutputStreamOperator processOperator) { + private void addTableSinkForTags( + Map> tagMap, SingleOutputStreamOperator processOperator) { tagMap.forEach((table, tag) -> { final String schemaTableName = table.getSchemaTableName(); try { - DataStream filterOperator = shunt(processOperator, table, tag); - logger.info("Build {} shunt successful...", schemaTableName); - List columnNameList = new ArrayList<>(); - List columnTypeList = new ArrayList<>(); + processOperator.forward(); + DataStream filterOperator = + processOperator.getSideOutput(tag).forward(); + final List columnNameList = new ArrayList<>(); + final List columnTypeList = new ArrayList<>(); buildColumn(columnNameList, columnTypeList, table.getColumns()); DataStream rowDataDataStream = buildRow( filterOperator, columnNameList, columnTypeList, schemaTableName) @@ -153,7 +144,7 @@ protected void addTableSinkForTags( logger.info("Build {} flatMap successful...", schemaTableName); logger.info("Start build {} sink...", schemaTableName); - addTableSink(customTableEnvironment, rowDataDataStream, table); + addTableSink(rowDataDataStream, table); } catch (Exception e) { logger.error("Build {} cdc sync failed...", schemaTableName); logger.error(LogUtil.getError(e)); @@ -162,123 +153,63 @@ protected void addTableSinkForTags( } @SuppressWarnings("rawtypes") - protected SingleOutputStreamOperator createMapSingleOutputStreamOperator( - DataStreamSource dataStreamSource, Map> tagMap, Map tableMap) { + private SingleOutputStreamOperator shunt( + SingleOutputStreamOperator mapOperator, + Map> tagMap, + Map tableMap) { final String schemaFieldName = config.getSchemaFieldName(); - SingleOutputStreamOperator mapOperator = - dataStreamSource.map(x -> objectMapper.readValue(x, Map.class)).returns(Map.class); - Map split = config.getSplit(); - partitionByTableAndPrimarykey(mapOperator, tableMap); - return mapOperator.process(new ProcessFunction() { - @Override - public void processElement(Map map, ProcessFunction.Context ctx, Collector out) { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - try { - String tableName = createTableName(source, schemaFieldName, split); - OutputTag outputTag = tagMap.get(tableMap.get(tableName)); - ctx.output(outputTag, map); - } catch (Exception e) { - logger.error(e.getMessage(), e); - out.collect(map); - } - } - }); + final Map configSplit = config.getSplit(); + return mapOperator + .process(new ProcessFunction() { + @Override + public void processElement(Map map, ProcessFunction.Context ctx, Collector out) { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + try { + String tableName = getMergedTableName(source, schemaFieldName, configSplit); + OutputTag outputTag = tagMap.get(tableMap.get(tableName)); + ctx.output(outputTag, map); + } catch (Exception e) { + logger.error(e.getMessage(), e); + out.collect(map); + } + } + }) + .name("Shunt"); } - protected abstract void addTableSink( - CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table); - - /** - * @param source - * @param schemaFieldName - * @param split must keep for flink use. - * @return - */ - protected abstract String createTableName(LinkedHashMap source, String schemaFieldName, Map split); + protected abstract void addTableSink(DataStream rowDataDataStream, Table table); @SuppressWarnings("rawtypes") - @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { - final String timeZone = config.getSink().get("timezone"); - config.getSink().remove("timezone"); - if (Asserts.isNotNullString(timeZone)) { - sinkTimeZone = ZoneId.of(timeZone); - logger.info("Sink timezone is {}", sinkTimeZone); - } - - final List schemaList = config.getSchemaList(); - if (Asserts.isNullCollection(schemaList)) { - return dataStreamSource; - } - - executeCatalogStatement(customTableEnvironment); + protected void buildPipeline(DataStreamSource dataStreamSource, List schemaList) { + executeCatalogStatement(); + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); logger.info("Build deserialize successful..."); - Map> tagMap = new LinkedHashMap<>(); - Map tableMap = new LinkedHashMap<>(); + final Map> tagMap = new LinkedHashMap<>(); + final Map tableMap = new LinkedHashMap<>(); for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); - } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { - String sinkTableName = getSinkTableName(table); - OutputTag outputTag = new OutputTag(sinkTableName) {}; + for (Table table : schema.getTables()) { + OutputTag outputTag = new OutputTag(getSinkTableName(table)) {}; tagMap.put(table, outputTag); tableMap.put(table.getSchemaTableName(), table); } } - SingleOutputStreamOperator processOperator = - createMapSingleOutputStreamOperator(dataStreamSource, tagMap, tableMap); - addTableSinkForTags(customTableEnvironment, tagMap, processOperator); - + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + SingleOutputStreamOperator singleOutputStreamOperator = shunt(mapOperator, tagMap, tableMap); + logger.info("Build shunt successful..."); + addTableSinkForTags(tagMap, singleOutputStreamOperator); + logger.info("Build sink successful..."); List> trans = customTableEnvironment.getPlanner().translate(modifyOperations); for (Transformation item : trans) { env.addOperator(item); } logger.info("A total of {} table cdc sync were build successful...", trans.size()); - return dataStreamSource; } - protected void partitionByTableAndPrimarykey( - SingleOutputStreamOperator mapOperator, Map tableMap) { - mapOperator.partitionCustom( - new Partitioner() { - @Override - public int partition(String key, int numPartitions) { - return Math.abs(key.hashCode()) % numPartitions; - } - }, - map -> { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - String tableName = createTableName(source, config.getSchemaFieldName(), config.getSplit()); - Table table = tableMap.get(tableName); - List primaryKeys = table.getColumns().stream() - .map(column -> { - if (column.isKeyFlag()) { - return column.getName(); - } - return ""; - }) - .collect(Collectors.toList()); - - return tableName + String.join("_", primaryKeys); - }); - mapOperator.name("PartitionByPrimarykey"); - } - - protected void executeCatalogStatement(CustomTableEnvironment customTableEnvironment) {} + protected void executeCatalogStatement() {} /** * replace view name middle to under line for flink use view name @@ -292,4 +223,32 @@ public static String replaceViewNameMiddleLineToUnderLine(String viewName) { } return viewName; } + + protected List createInsertOperations(Table table, String viewName, String tableName) { + String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName, config); + logger.info(cdcSqlInsert); + + List operations = customTableEnvironment.getParser().parse(cdcSqlInsert); + logger.info("Create {} FlinkSQL insert into successful...", tableName); + if (operations.isEmpty()) { + return operations; + } + + Operation operation = operations.get(0); + if (operation instanceof ModifyOperation) { + modifyOperations.add((ModifyOperation) operation); + } + return operations; + } + + protected List getPKList(Table table) { + if (Asserts.isNullCollection(table.getColumns())) { + return new ArrayList<>(); + } + + return table.getColumns().stream() + .filter(Column::isKeyFlag) + .map(Column::getName) + .collect(Collectors.toList()); + } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index b01ff11d06..b869276bd7 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -23,26 +23,15 @@ import org.dinky.cdc.utils.FlinkStatementUtil; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Table; -import org.dinky.executor.CustomTableEnvironment; -import org.dinky.utils.SplitUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.types.Row; import java.io.Serializable; -import java.time.Instant; -import java.time.LocalDate; -import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Optional; public class SQLSinkBuilder extends AbstractSqlSinkBuilder implements Serializable { @@ -55,19 +44,7 @@ private SQLSinkBuilder(FlinkCDCConfig config) { super(config); } - @Override - protected void initTypeConverterList() { - typeConverterList = Arrays.asList( - this::convertDateType, - this::convertTimestampType, - this::convertFloatType, - this::convertDecimalType, - this::convertBigIntType, - this::convertVarBinaryType); - } - - private String addSourceTableView( - CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table) { + private String addSourceTableView(DataStream rowDataDataStream, Table table) { // Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_ String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline()); @@ -78,18 +55,14 @@ private String addSourceTableView( } @Override - protected void addTableSink( - CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table) { - String viewName = addSourceTableView(customTableEnvironment, rowDataDataStream, table); + protected void addTableSink(DataStream rowDataDataStream, Table table) { + final String viewName = addSourceTableView(rowDataDataStream, table); + final String sinkSchemaName = getSinkSchemaName(table); + final String sinkTableName = getSinkTableName(table); - // 下游库名称 - String sinkSchemaName = getSinkSchemaName(table); - // 下游表名称 - String sinkTableName = getSinkTableName(table); - - // 这个地方要根据下游表的数量进行生成 + // Multiple sinks and single sink if (CollectionUtils.isEmpty(config.getSinks())) { - addSinkInsert(customTableEnvironment, table, viewName, sinkTableName, sinkSchemaName, sinkTableName); + addSinkInsert(table, viewName, sinkTableName, sinkSchemaName, sinkTableName); } else { for (int index = 0; index < config.getSinks().size(); index++) { String tableName = sinkTableName; @@ -98,27 +71,20 @@ protected void addTableSink( } config.setSink(config.getSinks().get(index)); - addSinkInsert(customTableEnvironment, table, viewName, tableName, sinkSchemaName, sinkTableName); + addSinkInsert(table, viewName, tableName, sinkSchemaName, sinkTableName); } } } private List addSinkInsert( - CustomTableEnvironment customTableEnvironment, - Table table, - String viewName, - String tableName, - String sinkSchemaName, - String sinkTableName) { + Table table, String viewName, String tableName, String sinkSchemaName, String sinkTableName) { String pkList = StringUtils.join(getPKList(table), "."); - String flinkDDL = FlinkStatementUtil.getFlinkDDL(table, tableName, config, sinkSchemaName, sinkTableName, pkList); logger.info(flinkDDL); customTableEnvironment.executeSql(flinkDDL); logger.info("Create {} FlinkSQL DDL successful...", tableName); - - return createInsertOperations(customTableEnvironment, table, viewName, tableName); + return createInsertOperations(table, viewName, tableName); } @Override @@ -130,57 +96,4 @@ public String getHandle() { public SinkBuilder create(FlinkCDCConfig config) { return new SQLSinkBuilder(config); } - - @Override - protected String createTableName(LinkedHashMap source, String schemaFieldName, Map split) { - return SplitUtil.getReValue(source.get(schemaFieldName).toString(), split) - + "." - + SplitUtil.getReValue(source.get("table").toString(), split); - } - - @Override - protected Optional convertDateType(Object value, LogicalType logicalType) { - if (logicalType instanceof DateType) { - if (value instanceof Integer) { - return Optional.of(LocalDate.ofEpochDay((Integer) value)); - } - if (value instanceof Long) { - return Optional.of( - Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDate()); - } - return Optional.of( - Instant.parse(value.toString()).atZone(sinkTimeZone).toLocalDate()); - } - return Optional.empty(); - } - - @Override - protected Optional convertTimestampType(Object value, LogicalType logicalType) { - if (logicalType instanceof TimestampType) { - if (value instanceof Integer) { - return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } else if (value instanceof String) { - return Optional.of( - Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime()); - } else { - TimestampType logicalType1 = (TimestampType) logicalType; - if (logicalType1.getPrecision() == 3) { - return Optional.of(Instant.ofEpochMilli((long) value) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } else if (logicalType1.getPrecision() > 3) { - return Optional.of( - Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - return Optional.of(Instant.ofEpochSecond(((long) value)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - } - return Optional.empty(); - } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java index 3d4ce976ee..bc59a989a4 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java @@ -24,20 +24,11 @@ import org.dinky.cdc.utils.FlinkStatementUtil; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Table; -import org.dinky.executor.CustomTableEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.types.Row; import java.io.Serializable; -import java.time.Instant; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; public class SQLCatalogSinkBuilder extends AbstractSqlSinkBuilder implements Serializable { @@ -50,18 +41,7 @@ private SQLCatalogSinkBuilder(FlinkCDCConfig config) { } @Override - protected void initTypeConverterList() { - typeConverterList = Arrays.asList( - this::convertDateType, - this::convertTimestampType, - this::convertDecimalType, - this::convertBigIntType, - this::convertVarBinaryType); - } - - @Override - public void addTableSink( - CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table) { + public void addTableSink(DataStream rowDataDataStream, Table table) { String catalogName = config.getSink().get("catalog.name"); String sinkSchemaName = getSinkSchemaName(table); @@ -74,7 +54,7 @@ public void addTableSink( viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream)); logger.info("Create {} temporaryView successful...", viewName); - createInsertOperations(customTableEnvironment, table, viewName, sinkTableName); + createInsertOperations(table, viewName, sinkTableName); } @Override @@ -87,48 +67,8 @@ public SinkBuilder create(FlinkCDCConfig config) { return new SQLCatalogSinkBuilder(config); } - protected void executeCatalogStatement(CustomTableEnvironment customTableEnvironment) { - logger.info("Build catalog successful..."); + protected void executeCatalogStatement() { customTableEnvironment.executeSql(FlinkStatementUtil.getCreateCatalogStatement(config)); - } - - @Override - protected String createTableName(LinkedHashMap source, String schemaFieldName, Map split) { - return source.get(schemaFieldName).toString() + "." - + source.get("table").toString(); - } - - @Override - protected Optional convertDateType(Object value, LogicalType logicalType) { - if (logicalType instanceof DateType) { - if (value instanceof Integer) { - return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) - .atZone(sinkTimeZone) - .toLocalDate()); - } - return Optional.of( - Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDate()); - } - return Optional.empty(); - } - - @Override - protected Optional convertTimestampType(Object value, LogicalType logicalType) { - if (logicalType instanceof TimestampType) { - if (value instanceof Integer) { - return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - - if (value instanceof String) { - return Optional.of( - Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime()); - } - - return Optional.of( - Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime()); - } - return Optional.empty(); + logger.info("Build catalog successful..."); } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder b/dinky-cdc/dinky-cdc-core/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder new file mode 100644 index 0000000000..0a2728a1c7 --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder @@ -0,0 +1,3 @@ +org.dinky.cdc.print.PrintSinkBuilder +org.dinky.cdc.kafka.KafkaSinkBuilder +org.dinky.cdc.kafka.KafkaSinkJsonBuilder \ No newline at end of file diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java index 7623c26e4e..43d7b65626 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java @@ -21,7 +21,6 @@ import org.dinky.assertion.Asserts; import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; import org.dinky.cdc.SinkBuilder; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; @@ -41,12 +40,10 @@ import org.apache.flink.util.OutputTag; import java.io.Serializable; -import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.stream.Collectors; public class DorisSchemaEvolutionSinkBuilder extends AbstractSinkBuilder implements Serializable { @@ -70,12 +67,13 @@ public SinkBuilder create(FlinkCDCConfig config) { @SuppressWarnings("rawtypes") @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, + public void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { + init(env, customTableEnvironment); + Map sink = config.getSink(); Properties properties = getProperties(); @@ -83,51 +81,46 @@ public DataStreamSource build( properties.setProperty("format", "json"); properties.setProperty("read_json_by_line", "true"); - final List schemaList = config.getSchemaList(); - if (!Asserts.isNotNullCollection(schemaList)) { - return dataStreamSource; - } - - SingleOutputStreamOperator mapOperator = - dataStreamSource.map(x -> objectMapper.readValue(x, Map.class)).returns(Map.class); - final String schemaFieldName = config.getSchemaFieldName(); + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); + logger.info("Build deserialize successful..."); - Map> tagMap = new LinkedHashMap<>(); - Map tableMap = new LinkedHashMap<>(); - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); + List sortedSchemaList = getSortedSchemaList(); + final Map tableMap = new LinkedHashMap<>(); + for (Schema schema : sortedSchemaList) { + for (Table table : schema.getTables()) { + tableMap.put(table.getSchemaTableName(), table); } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - + } + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + Map> tagMap = new LinkedHashMap<>(); + for (Schema schema : getSortedSchemaList()) { for (Table table : schema.getTables()) { OutputTag outputTag = new OutputTag(getSinkTableName(table)) {}; tagMap.put(table, outputTag); - tableMap.put(table.getSchemaTableName(), table); } } - - SingleOutputStreamOperator process = mapOperator.process(new ProcessFunction() { - - @Override - public void processElement(Map map, Context ctx, Collector out) throws Exception { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - String result = objectMapper.writeValueAsString(map); - try { - Table table = tableMap.get(source.get(schemaFieldName).toString() - + "." - + source.get("table").toString()); - ctx.output(tagMap.get(table), result); - } catch (Exception e) { - out.collect(result); - } - } - }); + final String schemaFieldName = config.getSchemaFieldName(); + SingleOutputStreamOperator process = mapOperator + .process(new ProcessFunction() { + + @Override + public void processElement(Map map, Context ctx, Collector out) throws Exception { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + String result = objectMapper.writeValueAsString(map); + try { + Table table = + tableMap.get(source.get(schemaFieldName).toString() + + "." + + source.get("table").toString()); + ctx.output(tagMap.get(table), result); + } catch (Exception e) { + out.collect(result); + } + } + }) + .name("Shunt"); + logger.info("Build shunt successful..."); tagMap.forEach((table, v) -> { DorisOptions dorisOptions = DorisOptions.builder() @@ -195,7 +188,6 @@ public void processElement(Map map, Context ctx, Collector out) throws E "Doris Schema Evolution Sink(table=[%s.%s])", getSinkSchemaName(table), getSinkTableName(table))); }); - return dataStreamSource; } @Override diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java index d384e1f69d..c00d33e33e 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java @@ -31,7 +31,6 @@ import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; @@ -67,7 +66,6 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public void addSink( - StreamExecutionEnvironment env, DataStream rowDataDataStream, Table table, List columnNameList, diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java deleted file mode 100644 index 8cdd0df10a..0000000000 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.cdc.kafka; - -import org.dinky.assertion.Asserts; -import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; -import org.dinky.cdc.SinkBuilder; -import org.dinky.data.model.FlinkCDCConfig; -import org.dinky.data.model.Schema; -import org.dinky.data.model.Table; -import org.dinky.executor.CustomTableEnvironment; - -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.connector.base.DeliveryGuarantee; -import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; -import org.apache.flink.connector.kafka.sink.KafkaSink; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; - -import java.io.Serializable; -import java.util.Comparator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - -public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializable { - - public static final String KEY_WORD = "datastream-kafka"; - public static final String TRANSACTIONAL_ID = "transactional.id"; - - public KafkaSinkBuilder() {} - - public KafkaSinkBuilder(FlinkCDCConfig config) { - super(config); - } - - @Override - public String getHandle() { - return KEY_WORD; - } - - @Override - public SinkBuilder create(FlinkCDCConfig config) { - return new KafkaSinkBuilder(config); - } - - @SuppressWarnings("rawtypes") - @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { - Properties kafkaProducerConfig = getProperties(); - if (Asserts.isNotNullString(config.getSink().get("topic"))) { - org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = - KafkaSink.builder() - .setBootstrapServers(config.getSink().get("brokers")) - .setRecordSerializer(KafkaRecordSerializationSchema.builder() - .setTopic(config.getSink().get("topic")) - .setValueSerializationSchema(new SimpleStringSchema()) - .build()) - .setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null - ? "" - : config.getSink().get("transactional.id.prefix")) - .setDeliverGuarantee( - DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null - ? "NONE" - : config.getSink().get("delivery.guarantee"))); - if (!kafkaProducerConfig.isEmpty()) { - kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); - } - KafkaSink kafkaSink = kafkaSinkBuilder.build(); - dataStreamSource.sinkTo(kafkaSink); - } else { - Map> tagMap = new LinkedHashMap<>(); - Map tableTopicMap = this.getTableTopicMap(); - Map tableMap = new LinkedHashMap<>(); - ObjectMapper objectMapper = new ObjectMapper(); - SingleOutputStreamOperator mapOperator = dataStreamSource - .map(x -> objectMapper.readValue(x, Map.class)) - .returns(Map.class); - final List schemaList = config.getSchemaList(); - - final String schemaFieldName = config.getSchemaFieldName(); - if (Asserts.isNotNullCollection(schemaList)) { - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); - } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { - String sinkTableName = getSinkTableName(table); - OutputTag outputTag = new OutputTag(sinkTableName) {}; - tagMap.put(table, outputTag); - tableMap.put(table.getSchemaTableName(), table); - } - } - SingleOutputStreamOperator process = mapOperator.process(new ProcessFunction() { - - @Override - public void processElement(Map map, ProcessFunction.Context ctx, Collector out) - throws Exception { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - try { - String result = objectMapper.writeValueAsString(map); - Table table = - tableMap.get(source.get(schemaFieldName).toString() - + "." - + source.get("table").toString()); - OutputTag outputTag = tagMap.get(table); - ctx.output(outputTag, result); - } catch (Exception e) { - out.collect(objectMapper.writeValueAsString(map)); - } - } - }); - - tagMap.forEach((k, v) -> { - String topic = getSinkTableName(k); - if (tableTopicMap != null) { - String tableName = k.getName(); - String newTopic = tableTopicMap.get(tableName); - if (Asserts.isNotNullString(newTopic)) { - topic = newTopic; - } - } - org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = - KafkaSink.builder() - .setBootstrapServers(config.getSink().get("brokers")) - .setRecordSerializer(KafkaRecordSerializationSchema.builder() - .setTopic(topic) - .setValueSerializationSchema(new SimpleStringSchema()) - .build()) - .setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null - ? "" - : config.getSink().get("transactional.id.prefix")) - .setDeliverGuarantee( - DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null - ? "NONE" - : config.getSink().get("delivery.guarantee"))); - if (!kafkaProducerConfig.isEmpty()) { - kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); - } - - KafkaSink kafkaSink = kafkaSinkBuilder.build(); - process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic); - }); - } - } - return dataStreamSource; - } -} diff --git a/dinky-cdc/dinky-cdc-plus/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder b/dinky-cdc/dinky-cdc-plus/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder index 763c78ccf0..6bc1916a78 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder +++ b/dinky-cdc/dinky-cdc-plus/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder @@ -1,4 +1,2 @@ -org.dinky.cdc.kafka.KafkaSinkBuilder -org.dinky.cdc.kafka.KafkaSinkJsonBuilder org.dinky.cdc.doris.DorisSinkBuilder org.dinky.cdc.doris.DorisSchemaEvolutionSinkBuilder \ No newline at end of file diff --git a/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java index 49973e1678..db12198b4b 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java +++ b/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java @@ -58,6 +58,7 @@ static ResultBuilder build( case DESCRIBE: return new ShowResultBuilder(id); case INSERT: + case EXECUTE: return isMockSinkFunction ? new MockResultBuilder(id, maxRowNum, isAutoCancel) : new InsertResultBuilder(); diff --git a/dinky-core/src/main/java/org/dinky/executor/Executor.java b/dinky-core/src/main/java/org/dinky/executor/Executor.java index d1c42d9959..969db68577 100644 --- a/dinky-core/src/main/java/org/dinky/executor/Executor.java +++ b/dinky-core/src/main/java/org/dinky/executor/Executor.java @@ -270,7 +270,7 @@ public ObjectNode getStreamGraphJsonNode(StreamGraph streamGraph) { } public StreamGraph getStreamGraph() { - return environment.getStreamGraph(); + return environment.getStreamGraph(false); } public StreamGraph getStreamGraphFromCustomStatements(List statements) { @@ -278,17 +278,11 @@ public StreamGraph getStreamGraphFromCustomStatements(List statements) { return getStreamGraph(); } - public ObjectNode getStreamGraphFromDataStream(List statements) { - statements.forEach(this::executeSql); - return getStreamGraphJsonNode(getStreamGraph()); - } - - public JobPlanInfo getJobPlanInfo(List statements) { + public JobPlanInfo getJobPlanInfoFromStatements(List statements) { return tableEnvironment.getJobPlanInfo(statements); } - public JobPlanInfo getJobPlanInfoFromDataStream(List statements) { - statements.forEach(this::executeSql); + public JobPlanInfo getJobPlanInfo() { StreamGraph streamGraph = getStreamGraph(); return new JobPlanInfo(JsonPlanGenerator.generatePlan(streamGraph.getJobGraph())); } diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 3203b9ac1c..4a3e3846c7 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -132,7 +132,7 @@ public JobStatementPlan parseStatements(String[] statements) { jobStatementPlan.addJobStatement(statement, JobStatementType.DDL, operationType); } } - if (jobManager.getConfig().isMockSinkFunction()) { + if (!jobManager.isPlanMode() && jobManager.getConfig().isMockSinkFunction()) { MockStatementExplainer.build(executor.getCustomTableEnvironment()).jobStatementPlanMock(jobStatementPlan); } return jobStatementPlan; diff --git a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java index ad9a316fae..4106cd7f4b 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java @@ -79,6 +79,9 @@ private void mockSink(JobStatementPlan jobStatementPlan) { // mock insert table ddl List jobStatementList = jobStatementPlan.getJobStatementList(); for (int i = 0; i < jobStatementList.size(); i++) { + if (!jobStatementList.get(i).getSqlType().equals(SqlType.CREATE)) { + continue; + } SqlNode sqlNode = tableEnv.parseSql(jobStatementList.get(i).getStatement()); if (sqlNode instanceof SqlCreateTable) { SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode; diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java index 835cf2285d..743afb9570 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java @@ -193,6 +193,6 @@ public StreamGraph getStreamGraph() { @Override public JobPlanInfo getJobPlanInfo() { - return executor.getJobPlanInfo(null); + return executor.getJobPlanInfo(); } } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index 7207151dfe..1360bcc962 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -155,7 +155,7 @@ public StreamGraph getStreamGraph() { @Override public JobPlanInfo getJobPlanInfo() { - return executor.getJobPlanInfo(null); + return executor.getJobPlanInfo(); } private boolean inferStatementSet() { diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java index bc3640c3b8..07cae22b8f 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java @@ -27,6 +27,7 @@ import org.dinky.data.result.InsertResult; import org.dinky.data.result.ResultBuilder; import org.dinky.data.result.SqlExplainResult; +import org.dinky.executor.CustomTableResultImpl; import org.dinky.executor.Executor; import org.dinky.gateway.Gateway; import org.dinky.gateway.result.GatewayResult; @@ -43,6 +44,8 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; import java.time.LocalDateTime; import java.util.ArrayList; @@ -55,6 +58,7 @@ public class JobPipelineRunner extends AbstractJobRunner { private List statements; + private TableResult tableResult; public JobPipelineRunner(JobManager jobManager) { this.jobManager = jobManager; @@ -64,7 +68,7 @@ public JobPipelineRunner(JobManager jobManager) { @Override public void run(JobStatement jobStatement) throws Exception { statements.add(jobStatement); - jobManager.getExecutor().executeSql(jobStatement.getStatement()); + tableResult = jobManager.getExecutor().executeSql(jobStatement.getStatement()); if (statements.size() == 1) { if (jobManager.isUseGateway()) { processWithGateway(); @@ -130,7 +134,9 @@ public SqlExplainResult explain(JobStatement jobStatement) { @Override public StreamGraph getStreamGraph(JobStatement jobStatement) { - explain(jobStatement); + statements.add(jobStatement); + // pipeline job execute to generate stream graph. + jobManager.getExecutor().executeSql(jobStatement.getStatement()); if (statements.size() == 1) { return jobManager.getExecutor().getStreamGraph(); } else { @@ -141,9 +147,11 @@ public StreamGraph getStreamGraph(JobStatement jobStatement) { @Override public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { - explain(jobStatement); + statements.add(jobStatement); + // pipeline job execute to generate stream graph. + jobManager.getExecutor().executeSql(jobStatement.getStatement()); if (statements.size() == 1) { - return jobManager.getExecutor().getJobPlanInfo(statements); + return jobManager.getExecutor().getJobPlanInfo(); } else { throw new DinkyException( "Only one pipeline job is explained. The statement has be skipped: " + jobStatement.getStatement()); @@ -194,6 +202,14 @@ private void processWithoutGateway() throws Exception { add(job.getJobId()); } }); + final List rowList = new ArrayList<>(); + tableResult.getResolvedSchema().getColumns().forEach(column -> rowList.add(Row.of(-1))); + tableResult = CustomTableResultImpl.builder() + .resultKind(tableResult.getResultKind()) + .schema(tableResult.getResolvedSchema()) + .data(rowList) + .jobClient(jobClient) + .build(); } if (config.isUseResult()) { IResult result = ResultBuilder.build( @@ -202,8 +218,9 @@ private void processWithoutGateway() throws Exception { config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), - executor.getTimeZone()) - .getResult(null); + executor.getTimeZone(), + jobManager.getConfig().isMockSinkFunction()) + .getResultWithPersistence(tableResult, jobManager.getHandler()); job.setResult(result); } } diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java index 619027f1b6..9b2f1be2ee 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java @@ -194,7 +194,7 @@ public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { return null; } if (!statements.isEmpty()) { - return jobManager.getExecutor().getJobPlanInfo(statements); + return jobManager.getExecutor().getJobPlanInfoFromStatements(statements); } throw new DinkyException("None jobs in statement."); } diff --git a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java index b8ff7316bb..7240bebbda 100644 --- a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java +++ b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java @@ -27,6 +27,7 @@ import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; import org.dinky.data.model.Table; +import org.dinky.executor.CustomTableResultImpl; import org.dinky.executor.Executor; import org.dinky.metadata.driver.Driver; import org.dinky.trans.AbstractOperation; @@ -37,8 +38,15 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.types.Row; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; @@ -74,6 +82,7 @@ public Operation create(String statement) { @Override public TableResult execute(Executor executor) { + final CustomTableResultImpl.Builder tableResultBuilder = CustomTableResultImpl.builder(); logger.info("Start build CDCSOURCE Task..."); CDCSource cdcSource = CDCSource.build(statement); FlinkCDCConfig config = cdcSource.buildFlinkCDCConfig(); @@ -190,16 +199,26 @@ public TableResult execute(Executor executor) { } DataStreamSource streamSource = cdcBuilder.build(streamExecutionEnvironment); logger.info("Build {} successful...", config.getType()); - sinkBuilder.build( - cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource); + sinkBuilder.build(streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource); logger.info("Build CDCSOURCE Task successful!"); + final List columns = new ArrayList<>(); + final List rowList = new ArrayList<>(); + for (Schema schema : config.getSchemaList()) { + for (Table table : schema.getTables()) { + columns.add((Column) Column.physical( + "default_catalog.default_database." + sinkBuilder.getSinkTableName(table), + new AtomicDataType(new BigIntType()))); + rowList.add(Row.of(-1)); + } + } + tableResultBuilder.schema(ResolvedSchema.of(columns)).data(rowList).resultKind(ResultKind.SUCCESS); } catch (Exception e) { logger.error(e.getMessage(), e); } - return null; + return tableResultBuilder.build(); } - Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { + private Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { Map sink = config.getSink(); String autoCreate = sink.get(FlinkCDCConfig.AUTO_CREATE); if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) {