diff --git a/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverReader.java b/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverReader.java index 723fc845e..a2259caf2 100644 --- a/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverReader.java +++ b/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverReader.java @@ -1,19 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.qlangtech.tis.plugin.datax; @@ -63,7 +63,7 @@ public DefaultDescriptor() { @Override public boolean isSupportIncr() { - return false; + return true; } @Override diff --git a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java index 5745b848f..1e1121fff 100644 --- a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java +++ b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java @@ -287,7 +287,11 @@ protected CreateChunjunSinkFunctionResult createSinFunctionResult( return sinkFuncRef.get(); } - + /** + * Sink端是否支持upset操作? 如支持,则会自动过滤binlog 的 updateBefore操作记录 + * + * @return + */ protected abstract boolean supportUpsetDML(); protected final SyncConf createSyncConf(SelectedTab tab, String targetTabName, Supplier> paramsCreator, DataxWriter dataxWriter) { diff --git a/tis-incr/tis-flink-cdc-common/src/main/java/com/qlangtech/plugins/incr/flink/cdc/TISDeserializationSchema.java b/tis-incr/tis-flink-cdc-common/src/main/java/com/qlangtech/plugins/incr/flink/cdc/TISDeserializationSchema.java index 2c9eb770b..4e1377be3 100644 --- a/tis-incr/tis-flink-cdc-common/src/main/java/com/qlangtech/plugins/incr/flink/cdc/TISDeserializationSchema.java +++ b/tis-incr/tis-flink-cdc-common/src/main/java/com/qlangtech/plugins/incr/flink/cdc/TISDeserializationSchema.java @@ -119,7 +119,7 @@ private void extractAfterRow(DTO dto, Struct value, Schema valueSchema) { Struct after = value.getStruct("after"); Map afterVals = new HashMap<>(); - + /**========================== * 设置环境绑定参数值 ==========================*/ @@ -149,22 +149,26 @@ private void extractAfterRow(DTO dto, Struct value, Schema valueSchema) { private void extractBeforeRow(DTO dto, Struct value, Schema valueSchema) { - Schema beforeSchema = valueSchema.field("before").schema(); + Struct before = getBeforeVal(value); - Map beforeVals = new HashMap<>(); - Object beforeVal = null; - for (Field f : beforeSchema.fields()) { - beforeVal = before.get(f.name()); - if (beforeVal == null) { - continue; - } - try { - beforeVals.put(f.name(), rawValConvert.convert(dto, f, beforeVal)); - } catch (Exception e) { - throw new RuntimeException("field:" + f.name() + ",beforeVal:" + beforeVal, e); + if (before != null) { + Schema beforeSchema = valueSchema.field("before").schema(); + Map beforeVals = new HashMap<>(); + Object beforeVal = null; + for (Field f : beforeSchema.fields()) { + beforeVal = before.get(f.name()); + if (beforeVal == null) { + continue; + } + try { + beforeVals.put(f.name(), rawValConvert.convert(dto, f, beforeVal)); + } catch (Exception e) { + throw new RuntimeException("field:" + f.name() + ",beforeVal:" + beforeVal, e); + } } + dto.setBefore(beforeVals); } - dto.setBefore(beforeVals); + } protected Struct getBeforeVal(Struct value) { diff --git a/tis-incr/tis-flink-cdc-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/mysql/FlinkCDCMysqlSourceFunction.java b/tis-incr/tis-flink-cdc-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/mysql/FlinkCDCMysqlSourceFunction.java index 30fbd2926..5e82718f5 100644 --- a/tis-incr/tis-flink-cdc-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/mysql/FlinkCDCMysqlSourceFunction.java +++ b/tis-incr/tis-flink-cdc-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/mysql/FlinkCDCMysqlSourceFunction.java @@ -113,7 +113,6 @@ public Object convert(DTO dto, Field field, Object val) { } } - public static class MySQLCDCTypeVisitor extends AbstractRowDataMapper.DefaultTypeVisitor { public MySQLCDCTypeVisitor(IColMetaGetter meta, int colIndex) { super(meta, colIndex); diff --git a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostgreSQLSourceFunction.java b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostgreSQLSourceFunction.java index 940e10c4e..a1f997130 100644 --- a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostgreSQLSourceFunction.java +++ b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostgreSQLSourceFunction.java @@ -111,7 +111,7 @@ public JobExecutionResult start(TargetResName dataxName, IDataxReader dataSource .password(dsFactory.password) .debeziumProperties(debeziumProperties) .startupOptions(sourceFactory.getStartupOptions()) - .deserializer(new PostgreSQLDeserializationSchema(tabs, flinkColCreator, contextParamValsGetterMapper)) // converts SourceRecord to JSON String + .deserializer(new PostgreSQLDeserializationSchema(tabs, flinkColCreator, contextParamValsGetterMapper,sourceFactory.getRepIdentity())) // converts SourceRecord to JSON String .build(); diff --git a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.java b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.java index 174389810..822ebe1ab 100644 --- a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.java +++ b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.java @@ -54,7 +54,7 @@ public class FlinkCDCPostreSQLSourceFactory extends MQListenerFactory { // * wal2json_rds_streaming and pgoutput. @Override - public IFlinkColCreator createFlinkColCreator() { + public IFlinkColCreator createFlinkColCreator() { IFlinkColCreator flinkColCreator = (meta, colIndex) -> { return meta.getType().accept(new PGCDCTypeVisitor(meta, colIndex)); }; @@ -70,8 +70,17 @@ public IFlinkColCreator createFlinkColCreator() { /** * https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-optionshttps://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-options */ - @FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require}) + @FormField(ordinal = 1, type = FormFieldType.ENUM, validate = {Validator.require}) public String startupOptions; + // REPLICA IDENTITY + @FormField(ordinal = 2, advance = true, type = FormFieldType.ENUM, validate = {Validator.require}) + public String replicaIdentity; + + + public ReplicaIdentity getRepIdentity() { + return ReplicaIdentity.parse(this.replicaIdentity); + } + /** * 只支持两种option 'latest' 和 'initial' diff --git a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/PostgreSQLDeserializationSchema.java b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/PostgreSQLDeserializationSchema.java index a4b19d26e..8511af7ee 100644 --- a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/PostgreSQLDeserializationSchema.java +++ b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/PostgreSQLDeserializationSchema.java @@ -35,16 +35,26 @@ * @create: 2024-02-21 19:02 **/ public class PostgreSQLDeserializationSchema extends TISDeserializationSchema { + private ReplicaIdentity replicaIdentity; + /** + * @param tabs + * @param flinkColCreator + * @param contextParamValsGetterMapper + * @param replicaIdentity pg的binlog 内容的before内容不是默认传输的,用户可以选择不传输before值 + */ public PostgreSQLDeserializationSchema(List tabs, IFlinkColCreator flinkColCreator - , Map>> contextParamValsGetterMapper) { + , Map>> contextParamValsGetterMapper + , ReplicaIdentity replicaIdentity + ) { super(new PGDTOColValProcess(tabs, flinkColCreator), new DefaultTableNameConvert(), contextParamValsGetterMapper); + this.replicaIdentity = replicaIdentity; } @Override protected Struct getBeforeVal(Struct value) { Struct beforeVal = super.getBeforeVal(value); - if (beforeVal == null) { + if (this.replicaIdentity.isShallContainBeforeVals() && beforeVal == null) { throw new IllegalStateException("lack before vals ,for resolve this issue:https://developer.aliyun.com/ask/575334 \n shall execute alter: ALTER TABLE your_table_name REPLICA IDENTITY FULL;"); } return beforeVal; diff --git a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/ReplicaIdentity.java b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/ReplicaIdentity.java new file mode 100644 index 000000000..6eaf6e2bc --- /dev/null +++ b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/ReplicaIdentity.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.plugins.incr.flink.cdc.postgresql; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-11-11 16:31 + **/ +public enum ReplicaIdentity { + FULL(true), DEFAULT(false); + + private boolean shallContainBeforeVals; + + private ReplicaIdentity(boolean shallContainBeforeVals) { + this.shallContainBeforeVals = shallContainBeforeVals; + } + + public boolean isShallContainBeforeVals() { + return this.shallContainBeforeVals; + } + + public static ReplicaIdentity parse(String token) { + for (ReplicaIdentity ri : ReplicaIdentity.values()) { + if (ri.name().equalsIgnoreCase(token)) { + return ri; + } + } + throw new IllegalStateException("token:" + token + " is illegal"); + } + +} diff --git a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.json b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.json index 1641cb7b8..f0a9b45fb 100644 --- a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.json +++ b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.json @@ -1,4 +1,18 @@ { + "replicaIdentity": { + "label": "REPLICA IDENTITY", + "dftVal": "DEFAULT", + "enum": [ + { + "val": "DEFAULT", + "label": "DEFAULT" + }, + { + "val": "FULL", + "label": "FULL" + } + ] + }, "decodingPluginName": { "dftVal": "decoderbufs", "label": "解码器", diff --git a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.md b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.md index dfcd554e9..692c3439c 100644 --- a/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.md +++ b/tis-incr/tis-flink-cdc-postgresql-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/cdc/postgresql/FlinkCDCPostreSQLSourceFactory.md @@ -9,5 +9,14 @@ Debezium startup options * `Latest`: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started. + +## replicaIdentity + +在 PostgreSQL 中,ALTER TABLE ... REPLICA IDENTITY 命令用于指定在逻辑复制或行级触发器中如何标识已更新或删除的行。https://developer.aliyun.com/ask/575334 + +可选项有以下两个 +* `FULL`: 使用此值需要确保对应的表执行`ALTER TABLE your_table_name REPLICA IDENTITY FULL;`,表记录更新时会带上更新Before值,使用此方式比较耗费性能。 +* `DEFAULT`: 默认值,更新删除操作时不会带上Before值。 + diff --git a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISFlinkCDCStart.java b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISFlinkCDCStart.java index 87d7b3c6b..c958d403d 100644 --- a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISFlinkCDCStart.java +++ b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/TISFlinkCDCStart.java @@ -47,10 +47,8 @@ * @create: 2021-10-12 10:26 **/ public class TISFlinkCDCStart { - // static final String dataxName = "mysql_elastic"; - // public static final String TIS_APP_NAME = "tis_app_name"; - private static final Logger logger = LoggerFactory.getLogger(TISFlinkCDCStart.class); + private static final Logger logger = LoggerFactory.getLogger(TISFlinkCDCStart.class); public static void main(String[] args) throws Exception { @@ -58,19 +56,6 @@ public static void main(String[] args) throws Exception { throw new IllegalArgumentException("args length must be 1,now is:" + args.length); } String dataxName = args[0]; - //-classpath /Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-flink-dependency/target/tis-flink-dependency/WEB-INF/lib/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-flink-cdc-plugin/target/tis-flink-cdc-plugin/WEB-INF/lib/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-elasticsearch7-sink-plugin/target/tis-elasticsearch7-sink-plugin/WEB-INF/lib/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-realtime-flink/target/tis-realtime-flink/WEB-INF/lib/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-realtime-flink-launch/target/tis-realtime-flink-launch.jar:/Users/mozhenghua/j2ee_solution/project/plugins/tis-incr/tis-realtime-flink-launch/target/dependency/*:/Users/mozhenghua/j2ee_solution/project/plugins/tis-datax/tis-datax-elasticsearch-plugin/target/tis-datax-elasticsearch-plugin/WEB-INF/lib/*: - // CenterResource.setNotFetchFromCenterRepository(); - //Thread.currentThread().setContextClassLoader(TIS.get().pluginManager.uberClassLoader); - -// IPluginContext pluginContext = IPluginContext.namedContext(dataxName); -// -// -// List streamFactories = HeteroEnum.INCR_STREAM_CONFIG.getPlugins(pluginContext, null); -// IRCController incrController = null; -// for (IncrStreamFactory factory : streamFactories) { -// incrController = factory.getIncrSync(); -// } -// Objects.requireNonNull(incrController, "stream app:" + dataxName + " incrController can not not be null"); IncrStreamFactory incrStreamFactory = HeteroEnum.getIncrStreamFactory(dataxName); BasicFlinkSourceHandle tableStreamHandle = createFlinkSourceHandle(dataxName); @@ -81,8 +66,6 @@ public static void main(String[] args) throws Exception { public static BasicFlinkSourceHandle createFlinkSourceHandle(String dataxName) { TargetResName name = new TargetResName(dataxName); final String streamSourceHandlerClass = name.getStreamSourceHandlerClass(); - // final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - // TIS.get().extensionLists.clear(BasicFlinkSourceHandle.class); ExtensionList flinkSourceHandles = TIS.get().getExtensionList(BasicFlinkSourceHandle.class); flinkSourceHandles.removeExtensions(); logger.info("start to load extendsion of " + BasicFlinkSourceHandle.class.getSimpleName()); @@ -101,25 +84,13 @@ public static BasicFlinkSourceHandle createFlinkSourceHandle(String dataxName) { } - private static void deploy(TargetResName dataxName, BasicFlinkSourceHandle tableStreamHandle, ReplicasSpec incrSpec, long timestamp) throws Exception { - // FlinkUserCodeClassLoaders - // BasicFlinkSourceHandle tisFlinkSourceHandle = new TISFlinkSourceHandle(); + private static void deploy(TargetResName dataxName + , BasicFlinkSourceHandle tableStreamHandle + , ReplicasSpec incrSpec, long timestamp) throws Exception { + if (tableStreamHandle == null) { throw new IllegalStateException("tableStreamHandle has not been instantiated"); } - // ElasticSearchSinkFactory esSinkFactory = new ElasticSearchSinkFactory(); - - -// IPluginContext pluginContext = IPluginContext.namedContext(dataxName.getName()); -// List sinkFactories = TISSinkFactory.sinkFactory.getPlugins(pluginContext, null); - -// logger.info("sinkFactories size:" + sinkFactories.size()); -// for (TISSinkFactory factory : sinkFactories) { -// sinkFactory = factory; -// break; -// } - -// Objects.requireNonNull(sinkFactory, "sinkFactories.size():" + sinkFactories.size()); IDataxProcessor dataXProcess = DataxProcessor.load(null, dataxName.getName()); DataxReader reader = (DataxReader) dataXProcess.getReader(null); @@ -127,26 +98,16 @@ private static void deploy(TargetResName dataxName, BasicFlinkSourceHandle table tableStreamHandle.setSinkFuncFactory(TISSinkFactory.getIncrSinKFactory(dataxName.getName())); tableStreamHandle.setSourceStreamTableMeta(reader); - // List mqFactories = HeteroEnum.MQ.getPlugins(pluginContext, null); MQListenerFactory mqFactory = HeteroEnum.getIncrSourceListenerFactory(dataxName.getName()); - // IFlinkColCreator sourceFlinkColCreator = mqFactory.createFlinkColCreator(); tableStreamHandle.setSourceFlinkColCreator(mqFactory.createFlinkColCreator()); mqFactory.setConsumerHandle(tableStreamHandle); -// for (MQListenerFactory factory : mqFactories) { -// factory.setConsumerHandle(tableStreamHandle); -// mqFactory = factory; -// } - //Objects.requireNonNull(mqFactory, "mqFactory can not be null, mqFactories size:" + mqFactories.size()); IMQListener mq = mqFactory.create(); - if (reader == null) { throw new IllegalStateException("dataXReader is illegal"); } - // DBConfigGetter rdbmsReader = (DBConfigGetter) reader; - List tabs = reader.getSelectedTabs(); mq.start(dataxName, reader, tabs, dataXProcess); } diff --git a/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestFlinkSinkExecutor.java b/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestFlinkSinkExecutor.java index f3d91388a..bab983ead 100644 --- a/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestFlinkSinkExecutor.java +++ b/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestFlinkSinkExecutor.java @@ -186,8 +186,20 @@ public void test() throws Exception { public interface FlinkTestCase { + /** + * 添加测试记录 + * + * @return + */ public List createTestData(); + /** + * 生成对应的校验逻辑 + * + * @param ddl + * @param statement + * @throws SQLException + */ public void verifyRelevantRow(CreateDDL ddl, Statement statement) throws SQLException; }