diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java new file mode 100644 index 000000000..c344aae36 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer.jsondebezium; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.doris.flink.sink.writer.ChangeEvent; +import org.apache.doris.flink.sink.writer.serializer.DorisRecord; + +import java.io.IOException; +import java.util.Map; + +/** + * When cdc connector captures data changes from the source database you need to inherit this class + * to complete the synchronized data changes to Doris schema. Supports data messages serialized to + * json + */ +public abstract class CdcDataChange implements ChangeEvent { + + protected abstract DorisRecord serialize(String record, JsonNode recordRoot, String op) + throws IOException; + + protected abstract Map extractBeforeRow(JsonNode record); + + protected abstract Map extractAfterRow(JsonNode record); +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java new file mode 100644 index 000000000..858a5effd --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer.jsondebezium; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.doris.flink.sink.writer.ChangeEvent; + +import java.io.IOException; + +/** + * When cdc connector captures data changes about source database schema changes, you need to + * inherit this class to complete the synchronized changes to Doris schema. Supports data messages + * serialized to json + */ +public abstract class CdcSchemaChange implements ChangeEvent { + + protected abstract String extractDatabase(JsonNode record); + + protected abstract String extractTable(JsonNode record); + + public abstract boolean schemaChange(JsonNode recordRoot) throws IOException; + + protected abstract String getCdcTableIdentifier(JsonNode record); +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java index 67aef0201..5075adf8c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisOptions; -import org.apache.doris.flink.sink.writer.ChangeEvent; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +40,7 @@ * into doris through stream load.
* Supported data changes include: read, insert, update, delete. */ -public class JsonDebeziumDataChange implements ChangeEvent { +public class JsonDebeziumDataChange extends CdcDataChange { private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumDataChange.class); private static final String OP_READ = "r"; // snapshot read @@ -122,11 +121,13 @@ private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException return updateRow.toString().getBytes(StandardCharsets.UTF_8); } - private Map extractBeforeRow(JsonNode record) { + @Override + protected Map extractBeforeRow(JsonNode record) { return extractRow(record.get("before")); } - private Map extractAfterRow(JsonNode record) { + @Override + protected Map extractAfterRow(JsonNode record) { return extractRow(record.get("after")); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java index f449857c1..ccb204693 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java @@ -27,7 +27,6 @@ import com.fasterxml.jackson.databind.node.NullNode; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.sink.schema.SchemaChangeManager; -import org.apache.doris.flink.sink.writer.ChangeEvent; import org.apache.doris.flink.tools.cdc.SourceSchema; import java.util.Map; @@ -43,7 +42,7 @@ * comment synchronization, supports multi-column changes, and supports column name rename. Need to * be enabled by configuring use-new-schema-change. */ -public abstract class JsonDebeziumSchemaChange implements ChangeEvent { +public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange { protected static String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*"; protected Pattern addDropDDLPattern; @@ -69,6 +68,7 @@ protected boolean checkTable(JsonNode recordRoot) { return sourceTableName.equals(dbTbl); } + @Override protected String extractDatabase(JsonNode record) { if (record.get("source").has("schema")) { // compatible with schema @@ -78,6 +78,7 @@ protected String extractDatabase(JsonNode record) { } } + @Override protected String extractTable(JsonNode record) { return extractJsonNode(record.get("source"), "table"); } @@ -102,6 +103,7 @@ protected Tuple2 getDorisTableTuple(JsonNode record) { } @VisibleForTesting + @Override public String getCdcTableIdentifier(JsonNode record) { String db = extractJsonNode(record.get("source"), "db"); String schema = extractJsonNode(record.get("source"), "schema"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 2aa09b681..8627a476e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -35,6 +35,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.WriteMode; +import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; import org.apache.doris.flink.table.DorisConfigOptions; import org.slf4j.Logger; @@ -159,7 +160,7 @@ public void build() throws Exception { streamSource.sinkTo(buildDorisSink()); } else { SingleOutputStreamOperator parsedStream = - streamSource.process(new ParsingProcessFunction(converter)); + streamSource.process(buildProcessFunction()); for (String table : dorisTables) { OutputTag recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table); @@ -200,16 +201,26 @@ public DorisSink buildDorisSink() { return buildDorisSink(null); } + public ParsingProcessFunction buildProcessFunction() { + return new ParsingProcessFunction(converter); + } + /** create doris sink. */ public DorisSink buildDorisSink(String table) { String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); String benodes = sinkConfig.getString(DorisConfigOptions.BENODES); String user = sinkConfig.getString(DorisConfigOptions.USERNAME); String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, ""); + String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL); DorisSink.Builder builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); - dorisBuilder.setFenodes(fenodes).setBenodes(benodes).setUsername(user).setPassword(passwd); + dorisBuilder + .setJdbcUrl(jdbcUrl) + .setFenodes(fenodes) + .setBenodes(benodes) + .setUsername(user) + .setPassword(passwd); sinkConfig .getOptional(DorisConfigOptions.AUTO_REDIRECT) .ifPresent(dorisBuilder::setAutoRedirect); @@ -284,21 +295,23 @@ public DorisSink buildDorisSink(String table) { DorisExecutionOptions executionOptions = executionBuilder.build(); builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionOptions) - .setSerializer( - JsonDebeziumSchemaSerializer.builder() - .setDorisOptions(dorisBuilder.build()) - .setNewSchemaChange(newSchemaChange) - .setExecutionOptions(executionOptions) - .setTableMapping(tableMapping) - .setTableProperties(tableConfig) - .setTargetDatabase(database) - .setTargetTablePrefix(tablePrefix) - .setTargetTableSuffix(tableSuffix) - .build()) + .setSerializer(buildSchemaSerializer(dorisBuilder, executionOptions)) .setDorisOptions(dorisBuilder.build()); return builder.build(); } + public DorisRecordSerializer buildSchemaSerializer( + DorisOptions.Builder dorisBuilder, DorisExecutionOptions executionOptions) { + return JsonDebeziumSchemaSerializer.builder() + .setDorisOptions(dorisBuilder.build()) + .setNewSchemaChange(newSchemaChange) + .setExecutionOptions(executionOptions) + .setTableMapping(tableMapping) + .setTableProperties(tableConfig) + .setTargetDatabase(database) + .build(); + } + /** Filter table that need to be synchronized. */ protected boolean isSyncNeeded(String tableName) { boolean sync = true; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java new file mode 100644 index 000000000..86d6336dd --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc; + +import org.apache.doris.flink.catalog.doris.FieldSchema; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; + +/** + * JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related + * databases. + */ +public abstract class JdbcSourceSchema extends SourceSchema { + + public JdbcSourceSchema( + DatabaseMetaData metaData, + String databaseName, + String schemaName, + String tableName, + String tableComment) + throws Exception { + super(databaseName, schemaName, tableName, tableComment); + fields = new LinkedHashMap<>(); + try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) { + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + String comment = rs.getString("REMARKS"); + String fieldType = rs.getString("TYPE_NAME"); + Integer precision = rs.getInt("COLUMN_SIZE"); + + if (rs.wasNull()) { + precision = null; + } + Integer scale = rs.getInt("DECIMAL_DIGITS"); + if (rs.wasNull()) { + scale = null; + } + String dorisTypeStr = convertToDorisType(fieldType, precision, scale); + fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment)); + } + } + + primaryKeys = new ArrayList<>(); + try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) { + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + primaryKeys.add(fieldName); + } + } + } + + public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale); +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java index 95b687c31..787d0ae1a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java @@ -29,7 +29,7 @@ import java.util.Map; public class ParsingProcessFunction extends ProcessFunction { - private ObjectMapper objectMapper = new ObjectMapper(); + protected ObjectMapper objectMapper = new ObjectMapper(); private transient Map> recordOutputTags; private DatabaseSync.TableNameConverter converter; @@ -46,13 +46,17 @@ public void open(Configuration parameters) throws Exception { public void processElement( String record, ProcessFunction.Context context, Collector collector) throws Exception { - JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); - String tableName = extractJsonNode(recordRoot.get("source"), "table"); + String tableName = getRecordTableName(record); String dorisName = converter.convert(tableName); context.output(getRecordOutputTag(dorisName), record); } - private String extractJsonNode(JsonNode record, String key) { + protected String getRecordTableName(String record) throws Exception { + JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); + return extractJsonNode(recordRoot.get("source"), "table"); + } + + protected String extractJsonNode(JsonNode record, String key) { return record != null && record.get(key) != null ? record.get(key).asText() : null; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java index c52520251..e09eb00e2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -23,8 +23,6 @@ import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -33,59 +31,23 @@ import java.util.StringJoiner; public abstract class SourceSchema { - private final String databaseName; - private final String schemaName; - private final String tableName; - private final String tableComment; - private final LinkedHashMap fields; - public final List primaryKeys; + protected final String databaseName; + protected final String schemaName; + protected final String tableName; + protected final String tableComment; + protected LinkedHashMap fields; + public List primaryKeys; public DataModel model = DataModel.UNIQUE; public SourceSchema( - DatabaseMetaData metaData, - String databaseName, - String schemaName, - String tableName, - String tableComment) + String databaseName, String schemaName, String tableName, String tableComment) throws Exception { this.databaseName = databaseName; this.schemaName = schemaName; this.tableName = tableName; this.tableComment = tableComment; - - fields = new LinkedHashMap<>(); - try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) { - while (rs.next()) { - String fieldName = rs.getString("COLUMN_NAME"); - String comment = rs.getString("REMARKS"); - String fieldType = rs.getString("TYPE_NAME"); - String defaultValue = rs.getString("COLUMN_DEF"); - Integer precision = rs.getInt("COLUMN_SIZE"); - if (rs.wasNull()) { - precision = null; - } - - Integer scale = rs.getInt("DECIMAL_DIGITS"); - if (rs.wasNull()) { - scale = null; - } - String dorisTypeStr = convertToDorisType(fieldType, precision, scale); - fields.put( - fieldName, new FieldSchema(fieldName, dorisTypeStr, defaultValue, comment)); - } - } - - primaryKeys = new ArrayList<>(); - try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) { - while (rs.next()) { - String fieldName = rs.getString("COLUMN_NAME"); - primaryKeys.add(fieldName); - } - } } - public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale); - public String getTableIdentifier() { return getString(databaseName, schemaName, tableName); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java index bf0eee7f6..f84ca9431 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java @@ -17,11 +17,11 @@ package org.apache.doris.flink.tools.cdc.mysql; -import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; -public class MysqlSchema extends SourceSchema { +public class MysqlSchema extends JdbcSourceSchema { public MysqlSchema( DatabaseMetaData metaData, String databaseName, String tableName, String tableComment) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java index c0857542f..f843b6d25 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java @@ -17,11 +17,11 @@ package org.apache.doris.flink.tools.cdc.oracle; -import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; -public class OracleSchema extends SourceSchema { +public class OracleSchema extends JdbcSourceSchema { public OracleSchema( DatabaseMetaData metaData, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java index ecb6edfa2..32081164a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java @@ -17,11 +17,11 @@ package org.apache.doris.flink.tools.cdc.postgres; -import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; -public class PostgresSchema extends SourceSchema { +public class PostgresSchema extends JdbcSourceSchema { public PostgresSchema( DatabaseMetaData metaData, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java index ce060ea59..6d5ab9aac 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java @@ -17,11 +17,11 @@ package org.apache.doris.flink.tools.cdc.sqlserver; -import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; -public class SqlServerSchema extends SourceSchema { +public class SqlServerSchema extends JdbcSourceSchema { public SqlServerSchema( DatabaseMetaData metaData,