Skip to content

Commit

Permalink
[improve] adjust the code framework related to CDC. (apache#319)
Browse files Browse the repository at this point in the history
  • Loading branch information
bingquanzhao authored Feb 29, 2024
1 parent 54c94a9 commit 64fe57a
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer.serializer.jsondebezium;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;

import java.io.IOException;
import java.util.Map;

/**
* When cdc connector captures data changes from the source database you need to inherit this class
* to complete the synchronized data changes to Doris schema. Supports data messages serialized to
* json
*/
public abstract class CdcDataChange implements ChangeEvent {

protected abstract DorisRecord serialize(String record, JsonNode recordRoot, String op)
throws IOException;

protected abstract Map<String, Object> extractBeforeRow(JsonNode record);

protected abstract Map<String, Object> extractAfterRow(JsonNode record);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer.serializer.jsondebezium;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.doris.flink.sink.writer.ChangeEvent;

import java.io.IOException;

/**
* When cdc connector captures data changes about source database schema changes, you need to
* inherit this class to complete the synchronized changes to Doris schema. Supports data messages
* serialized to json
*/
public abstract class CdcSchemaChange implements ChangeEvent {

protected abstract String extractDatabase(JsonNode record);

protected abstract String extractTable(JsonNode record);

public abstract boolean schemaChange(JsonNode recordRoot) throws IOException;

protected abstract String getCdcTableIdentifier(JsonNode record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +40,7 @@
* into doris through stream load.<br>
* Supported data changes include: read, insert, update, delete.
*/
public class JsonDebeziumDataChange implements ChangeEvent {
public class JsonDebeziumDataChange extends CdcDataChange {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumDataChange.class);

private static final String OP_READ = "r"; // snapshot read
Expand Down Expand Up @@ -122,11 +121,13 @@ private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException
return updateRow.toString().getBytes(StandardCharsets.UTF_8);
}

private Map<String, Object> extractBeforeRow(JsonNode record) {
@Override
protected Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}

private Map<String, Object> extractAfterRow(JsonNode record) {
@Override
protected Map<String, Object> extractAfterRow(JsonNode record) {
return extractRow(record.get("after"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.tools.cdc.SourceSchema;

import java.util.Map;
Expand All @@ -43,7 +42,7 @@
* comment synchronization, supports multi-column changes, and supports column name rename. Need to
* be enabled by configuring use-new-schema-change.
*/
public abstract class JsonDebeziumSchemaChange implements ChangeEvent {
public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
protected static String addDropDDLRegex =
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
protected Pattern addDropDDLPattern;
Expand All @@ -69,6 +68,7 @@ protected boolean checkTable(JsonNode recordRoot) {
return sourceTableName.equals(dbTbl);
}

@Override
protected String extractDatabase(JsonNode record) {
if (record.get("source").has("schema")) {
// compatible with schema
Expand All @@ -78,6 +78,7 @@ protected String extractDatabase(JsonNode record) {
}
}

@Override
protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}
Expand All @@ -102,6 +103,7 @@ protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
}

@VisibleForTesting
@Override
public String getCdcTableIdentifier(JsonNode record) {
String db = extractJsonNode(record.get("source"), "db");
String schema = extractJsonNode(record.get("source"), "schema");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.WriteMode;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void build() throws Exception {
streamSource.sinkTo(buildDorisSink());
} else {
SingleOutputStreamOperator<Void> parsedStream =
streamSource.process(new ParsingProcessFunction(converter));
streamSource.process(buildProcessFunction());
for (String table : dorisTables) {
OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(table);
Expand Down Expand Up @@ -200,16 +201,26 @@ public DorisSink<String> buildDorisSink() {
return buildDorisSink(null);
}

public ParsingProcessFunction buildProcessFunction() {
return new ParsingProcessFunction(converter);
}

/** create doris sink. */
public DorisSink<String> buildDorisSink(String table) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);

DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(fenodes).setBenodes(benodes).setUsername(user).setPassword(passwd);
dorisBuilder
.setJdbcUrl(jdbcUrl)
.setFenodes(fenodes)
.setBenodes(benodes)
.setUsername(user)
.setPassword(passwd);
sinkConfig
.getOptional(DorisConfigOptions.AUTO_REDIRECT)
.ifPresent(dorisBuilder::setAutoRedirect);
Expand Down Expand Up @@ -284,21 +295,23 @@ public DorisSink<String> buildDorisSink(String table) {
DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(
JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(database)
.setTargetTablePrefix(tablePrefix)
.setTargetTableSuffix(tableSuffix)
.build())
.setSerializer(buildSchemaSerializer(dorisBuilder, executionOptions))
.setDorisOptions(dorisBuilder.build());
return builder.build();
}

public DorisRecordSerializer<String> buildSchemaSerializer(
DorisOptions.Builder dorisBuilder, DorisExecutionOptions executionOptions) {
return JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(database)
.build();
}

/** Filter table that need to be synchronized. */
protected boolean isSyncNeeded(String tableName) {
boolean sync = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.tools.cdc;

import org.apache.doris.flink.catalog.doris.FieldSchema;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.LinkedHashMap;

/**
* JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related
* databases.
*/
public abstract class JdbcSourceSchema extends SourceSchema {

public JdbcSourceSchema(
DatabaseMetaData metaData,
String databaseName,
String schemaName,
String tableName,
String tableComment)
throws Exception {
super(databaseName, schemaName, tableName, tableComment);
fields = new LinkedHashMap<>();
try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
String comment = rs.getString("REMARKS");
String fieldType = rs.getString("TYPE_NAME");
Integer precision = rs.getInt("COLUMN_SIZE");

if (rs.wasNull()) {
precision = null;
}
Integer scale = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
scale = null;
}
String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
}
}

primaryKeys = new ArrayList<>();
try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
primaryKeys.add(fieldName);
}
}
}

public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Map;

public class ParsingProcessFunction extends ProcessFunction<String, Void> {
private ObjectMapper objectMapper = new ObjectMapper();
protected ObjectMapper objectMapper = new ObjectMapper();
private transient Map<String, OutputTag<String>> recordOutputTags;
private DatabaseSync.TableNameConverter converter;

Expand All @@ -46,13 +46,17 @@ public void open(Configuration parameters) throws Exception {
public void processElement(
String record, ProcessFunction<String, Void>.Context context, Collector<Void> collector)
throws Exception {
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String tableName = extractJsonNode(recordRoot.get("source"), "table");
String tableName = getRecordTableName(record);
String dorisName = converter.convert(tableName);
context.output(getRecordOutputTag(dorisName), record);
}

private String extractJsonNode(JsonNode record, String key) {
protected String getRecordTableName(String record) throws Exception {
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
return extractJsonNode(recordRoot.get("source"), "table");
}

protected String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
}

Expand Down
Loading

0 comments on commit 64fe57a

Please sign in to comment.