diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java index 6c78d5cd4..e1a089ff5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java @@ -69,8 +69,6 @@ public class DatabaseSyncConfig { public static final String SINGLE_SINK = "single-sink"; ////////// doris-table-conf ////////// public static final String TABLE_CONF = "table-conf"; - public static final String REPLICATION_NUM = "replication_num"; - public static final String TABLE_BUCKETS = "table-buckets"; ////////// date-converter-conf ////////// public static final String CONVERTERS = "converters"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java index 912ed6982..6318fc8a5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java @@ -26,8 +26,11 @@ import java.util.Objects; public class DorisTableConfig implements Serializable { - private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; + public static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; // PROPERTIES parameter in doris table creation statement. such as: replication_num=1. + public static final String REPLICATION_NUM = "replication_num"; + public static final String TABLE_BUCKETS = "table-buckets"; + private final Map tableProperties; // The specific parameters extracted from --table-conf need to be parsed and integrated into the // doris table creation statement. such as: table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50". @@ -48,10 +51,9 @@ public DorisTableConfig(Map tableConfig) { if (!tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) { tableConfig.put(LIGHT_SCHEMA_CHANGE, Boolean.toString(true)); } - if (tableConfig.containsKey(DatabaseSyncConfig.TABLE_BUCKETS)) { - this.tableBuckets = - buildTableBucketMap(tableConfig.get(DatabaseSyncConfig.TABLE_BUCKETS)); - tableConfig.remove(DatabaseSyncConfig.TABLE_BUCKETS); + if (tableConfig.containsKey(TABLE_BUCKETS)) { + this.tableBuckets = buildTableBucketMap(tableConfig.get(TABLE_BUCKETS)); + tableConfig.remove(TABLE_BUCKETS); } tableProperties = tableConfig; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index 4b4e3b26a..e180872af 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -17,18 +17,15 @@ package org.apache.doris.flink.container.e2e; +import org.apache.doris.flink.container.AbstractContainerTestBase; +import org.apache.doris.flink.container.ContainerUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; - -import org.apache.doris.flink.container.AbstractE2EService; -import org.apache.doris.flink.container.ContainerUtils; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,19 +34,12 @@ import java.util.List; import java.util.UUID; -public class Doris2DorisE2ECase extends AbstractE2EService { +public class Doris2DorisE2ECase extends AbstractContainerTestBase { private static final Logger LOG = LoggerFactory.getLogger(Doris2DorisE2ECase.class); private static final String DATABASE_SOURCE = "test_doris2doris_source"; private static final String DATABASE_SINK = "test_doris2doris_sink"; private static final String TABLE = "test_tbl"; - @Before - public void setUp() throws InterruptedException { - LOG.info("Doris2DorisE2ECase attempting to acquire semaphore."); - SEMAPHORE.acquire(); - LOG.info("Doris2DorisE2ECase semaphore acquired."); - } - @Test public void testDoris2Doris() throws Exception { LOG.info("Start executing the test case of doris to doris."); @@ -163,14 +153,4 @@ private void initializeDorisTable() { ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, sinkInitSql); LOG.info("Initialization of doris table successful."); } - - @After - public void close() { - try { - // Ensure that semaphore is always released - } finally { - LOG.info("Doris2DorisE2ECase releasing semaphore."); - SEMAPHORE.release(); - } - } }