Skip to content

Commit

Permalink
add form input property replicaIdentity ,to resolve issue datavane/ti…
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Nov 11, 2024
1 parent cd8cd27 commit 8f91f0a
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.plugin.datax;
Expand Down Expand Up @@ -63,7 +63,7 @@ public DefaultDescriptor() {

@Override
public boolean isSupportIncr() {
return false;
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,11 @@ protected CreateChunjunSinkFunctionResult createSinFunctionResult(
return sinkFuncRef.get();
}


/**
* Sink端是否支持upset操作? 如支持,则会自动过滤binlog 的 updateBefore操作记录
*
* @return
*/
protected abstract boolean supportUpsetDML();

protected final SyncConf createSyncConf(SelectedTab tab, String targetTabName, Supplier<Map<String, Object>> paramsCreator, DataxWriter dataxWriter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void extractAfterRow(DTO dto, Struct value, Schema valueSchema) {
Struct after = value.getStruct("after");

Map<String, Object> afterVals = new HashMap<>();

/**==========================
* 设置环境绑定参数值
==========================*/
Expand Down Expand Up @@ -149,22 +149,26 @@ private void extractAfterRow(DTO dto, Struct value, Schema valueSchema) {


private void extractBeforeRow(DTO dto, Struct value, Schema valueSchema) {
Schema beforeSchema = valueSchema.field("before").schema();

Struct before = getBeforeVal(value);
Map<String, Object> beforeVals = new HashMap<>();
Object beforeVal = null;
for (Field f : beforeSchema.fields()) {
beforeVal = before.get(f.name());
if (beforeVal == null) {
continue;
}
try {
beforeVals.put(f.name(), rawValConvert.convert(dto, f, beforeVal));
} catch (Exception e) {
throw new RuntimeException("field:" + f.name() + ",beforeVal:" + beforeVal, e);
if (before != null) {
Schema beforeSchema = valueSchema.field("before").schema();
Map<String, Object> beforeVals = new HashMap<>();
Object beforeVal = null;
for (Field f : beforeSchema.fields()) {
beforeVal = before.get(f.name());
if (beforeVal == null) {
continue;
}
try {
beforeVals.put(f.name(), rawValConvert.convert(dto, f, beforeVal));
} catch (Exception e) {
throw new RuntimeException("field:" + f.name() + ",beforeVal:" + beforeVal, e);
}
}
dto.setBefore(beforeVals);
}
dto.setBefore(beforeVals);

}

protected Struct getBeforeVal(Struct value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public Object convert(DTO dto, Field field, Object val) {
}
}


public static class MySQLCDCTypeVisitor extends AbstractRowDataMapper.DefaultTypeVisitor {
public MySQLCDCTypeVisitor(IColMetaGetter meta, int colIndex) {
super(meta, colIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public JobExecutionResult start(TargetResName dataxName, IDataxReader dataSource
.password(dsFactory.password)
.debeziumProperties(debeziumProperties)
.startupOptions(sourceFactory.getStartupOptions())
.deserializer(new PostgreSQLDeserializationSchema(tabs, flinkColCreator, contextParamValsGetterMapper)) // converts SourceRecord to JSON String
.deserializer(new PostgreSQLDeserializationSchema(tabs, flinkColCreator, contextParamValsGetterMapper,sourceFactory.getRepIdentity())) // converts SourceRecord to JSON String
.build();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class FlinkCDCPostreSQLSourceFactory extends MQListenerFactory {
// * wal2json_rds_streaming and pgoutput.

@Override
public IFlinkColCreator<FlinkCol> createFlinkColCreator() {
public IFlinkColCreator<FlinkCol> createFlinkColCreator() {
IFlinkColCreator<FlinkCol> flinkColCreator = (meta, colIndex) -> {
return meta.getType().accept(new PGCDCTypeVisitor(meta, colIndex));
};
Expand All @@ -70,8 +70,17 @@ public IFlinkColCreator<FlinkCol> createFlinkColCreator() {
/**
* https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-optionshttps://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-options
*/
@FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require})
@FormField(ordinal = 1, type = FormFieldType.ENUM, validate = {Validator.require})
public String startupOptions;
// REPLICA IDENTITY
@FormField(ordinal = 2, advance = true, type = FormFieldType.ENUM, validate = {Validator.require})
public String replicaIdentity;


public ReplicaIdentity getRepIdentity() {
return ReplicaIdentity.parse(this.replicaIdentity);
}


/**
* 只支持两种option 'latest' 和 'initial'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,26 @@
* @create: 2024-02-21 19:02
**/
public class PostgreSQLDeserializationSchema extends TISDeserializationSchema {
private ReplicaIdentity replicaIdentity;

/**
* @param tabs
* @param flinkColCreator
* @param contextParamValsGetterMapper
* @param replicaIdentity pg的binlog 内容的before内容不是默认传输的,用户可以选择不传输before值
*/
public PostgreSQLDeserializationSchema(List<ISelectedTab> tabs, IFlinkColCreator<FlinkCol> flinkColCreator
, Map<String /*tableName*/, Map<String, Function<RunningContext, Object>>> contextParamValsGetterMapper) {
, Map<String /*tableName*/, Map<String, Function<RunningContext, Object>>> contextParamValsGetterMapper
, ReplicaIdentity replicaIdentity
) {
super(new PGDTOColValProcess(tabs, flinkColCreator), new DefaultTableNameConvert(), contextParamValsGetterMapper);
this.replicaIdentity = replicaIdentity;
}

@Override
protected Struct getBeforeVal(Struct value) {
Struct beforeVal = super.getBeforeVal(value);
if (beforeVal == null) {
if (this.replicaIdentity.isShallContainBeforeVals() && beforeVal == null) {
throw new IllegalStateException("lack before vals ,for resolve this issue:https://developer.aliyun.com/ask/575334 \n shall execute alter: ALTER TABLE your_table_name REPLICA IDENTITY FULL;");
}
return beforeVal;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.plugins.incr.flink.cdc.postgresql;

/**
* @author: 百岁([email protected]
* @create: 2024-11-11 16:31
**/
public enum ReplicaIdentity {
FULL(true), DEFAULT(false);

private boolean shallContainBeforeVals;

private ReplicaIdentity(boolean shallContainBeforeVals) {
this.shallContainBeforeVals = shallContainBeforeVals;
}

public boolean isShallContainBeforeVals() {
return this.shallContainBeforeVals;
}

public static ReplicaIdentity parse(String token) {
for (ReplicaIdentity ri : ReplicaIdentity.values()) {
if (ri.name().equalsIgnoreCase(token)) {
return ri;
}
}
throw new IllegalStateException("token:" + token + " is illegal");
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
{
"replicaIdentity": {
"label": "REPLICA IDENTITY",
"dftVal": "DEFAULT",
"enum": [
{
"val": "DEFAULT",
"label": "DEFAULT"
},
{
"val": "FULL",
"label": "FULL"
}
]
},
"decodingPluginName": {
"dftVal": "decoderbufs",
"label": "解码器",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,14 @@ Debezium startup options

* `Latest`:
Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.

## replicaIdentity

在 PostgreSQL 中,ALTER TABLE ... REPLICA IDENTITY 命令用于指定在逻辑复制或行级触发器中如何标识已更新或删除的行。https://developer.aliyun.com/ask/575334

可选项有以下两个
* `FULL`: 使用此值需要确保对应的表执行`ALTER TABLE your_table_name REPLICA IDENTITY FULL;`,表记录更新时会带上更新Before值,使用此方式比较耗费性能。
* `DEFAULT`: 默认值,更新删除操作时不会带上Before值。


Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,15 @@
* @create: 2021-10-12 10:26
**/
public class TISFlinkCDCStart {
// static final String dataxName = "mysql_elastic";
// public static final String TIS_APP_NAME = "tis_app_name";
private static final Logger logger = LoggerFactory.getLogger(TISFlinkCDCStart.class);

private static final Logger logger = LoggerFactory.getLogger(TISFlinkCDCStart.class);

public static void main(String[] args) throws Exception {

if (args.length != 1) {
throw new IllegalArgumentException("args length must be 1,now is:" + args.length);
}
String dataxName = args[0];
//-classpath /Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-flink-dependency/target/tis-flink-dependency/WEB-INF/lib/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-flink-cdc-plugin/target/tis-flink-cdc-plugin/WEB-INF/lib/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-elasticsearch7-sink-plugin/target/tis-elasticsearch7-sink-plugin/WEB-INF/lib/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-realtime-flink/target/tis-realtime-flink/WEB-INF/lib/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-realtime-flink-launch/target/tis-realtime-flink-launch.jar:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-realtime-flink-launch/target/dependency/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-datax/tis-datax-elasticsearch-plugin/target/tis-datax-elasticsearch-plugin/WEB-INF/lib/*:
// CenterResource.setNotFetchFromCenterRepository();
//Thread.currentThread().setContextClassLoader(TIS.get().pluginManager.uberClassLoader);

// IPluginContext pluginContext = IPluginContext.namedContext(dataxName);
//
//
// List<IncrStreamFactory> streamFactories = HeteroEnum.INCR_STREAM_CONFIG.getPlugins(pluginContext, null);
// IRCController incrController = null;
// for (IncrStreamFactory factory : streamFactories) {
// incrController = factory.getIncrSync();
// }
// Objects.requireNonNull(incrController, "stream app:" + dataxName + " incrController can not not be null");

IncrStreamFactory incrStreamFactory = HeteroEnum.getIncrStreamFactory(dataxName);
BasicFlinkSourceHandle tableStreamHandle = createFlinkSourceHandle(dataxName);
Expand All @@ -81,8 +66,6 @@ public static void main(String[] args) throws Exception {
public static BasicFlinkSourceHandle createFlinkSourceHandle(String dataxName) {
TargetResName name = new TargetResName(dataxName);
final String streamSourceHandlerClass = name.getStreamSourceHandlerClass();
// final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// TIS.get().extensionLists.clear(BasicFlinkSourceHandle.class);
ExtensionList<BasicFlinkSourceHandle> flinkSourceHandles = TIS.get().getExtensionList(BasicFlinkSourceHandle.class);
flinkSourceHandles.removeExtensions();
logger.info("start to load extendsion of " + BasicFlinkSourceHandle.class.getSimpleName());
Expand All @@ -101,52 +84,30 @@ public static BasicFlinkSourceHandle createFlinkSourceHandle(String dataxName) {
}


private static void deploy(TargetResName dataxName, BasicFlinkSourceHandle tableStreamHandle, ReplicasSpec incrSpec, long timestamp) throws Exception {
// FlinkUserCodeClassLoaders
// BasicFlinkSourceHandle tisFlinkSourceHandle = new TISFlinkSourceHandle();
private static void deploy(TargetResName dataxName
, BasicFlinkSourceHandle tableStreamHandle
, ReplicasSpec incrSpec, long timestamp) throws Exception {

if (tableStreamHandle == null) {
throw new IllegalStateException("tableStreamHandle has not been instantiated");
}
// ElasticSearchSinkFactory esSinkFactory = new ElasticSearchSinkFactory();


// IPluginContext pluginContext = IPluginContext.namedContext(dataxName.getName());
// List<TISSinkFactory> sinkFactories = TISSinkFactory.sinkFactory.getPlugins(pluginContext, null);

// logger.info("sinkFactories size:" + sinkFactories.size());
// for (TISSinkFactory factory : sinkFactories) {
// sinkFactory = factory;
// break;
// }

// Objects.requireNonNull(sinkFactory, "sinkFactories.size():" + sinkFactories.size());

IDataxProcessor dataXProcess = DataxProcessor.load(null, dataxName.getName());
DataxReader reader = (DataxReader) dataXProcess.getReader(null);

tableStreamHandle.setSinkFuncFactory(TISSinkFactory.getIncrSinKFactory(dataxName.getName()));
tableStreamHandle.setSourceStreamTableMeta(reader);

// List<MQListenerFactory> mqFactories = HeteroEnum.MQ.getPlugins(pluginContext, null);
MQListenerFactory mqFactory = HeteroEnum.getIncrSourceListenerFactory(dataxName.getName());
// IFlinkColCreator<FlinkCol> sourceFlinkColCreator = mqFactory.createFlinkColCreator();
tableStreamHandle.setSourceFlinkColCreator(mqFactory.createFlinkColCreator());
mqFactory.setConsumerHandle(tableStreamHandle);

// for (MQListenerFactory factory : mqFactories) {
// factory.setConsumerHandle(tableStreamHandle);
// mqFactory = factory;
// }
//Objects.requireNonNull(mqFactory, "mqFactory can not be null, mqFactories size:" + mqFactories.size());

IMQListener mq = mqFactory.create();


if (reader == null) {
throw new IllegalStateException("dataXReader is illegal");
}
// DBConfigGetter rdbmsReader = (DBConfigGetter) reader;

List<ISelectedTab> tabs = reader.getSelectedTabs();
mq.start(dataxName, reader, tabs, dataXProcess);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,20 @@ public void test() throws Exception {


public interface FlinkTestCase {
/**
* 添加测试记录
*
* @return
*/
public List<DTO> createTestData();

/**
* 生成对应的校验逻辑
*
* @param ddl
* @param statement
* @throws SQLException
*/
public void verifyRelevantRow(CreateDDL ddl, Statement statement) throws SQLException;
}

Expand Down

0 comments on commit 8f91f0a

Please sign in to comment.