From 1a74d619e95ab0b6c1bb7fc7cb9f9bd907143660 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Sat, 30 Mar 2024 12:27:50 +0800 Subject: [PATCH] [cdc][refactor] Refactor records writing in Kafka CDC tests --- .../cdc/kafka/KafkaActionITCaseBase.java | 145 ++++++----- .../KafkaCanalSyncDatabaseActionITCase.java | 138 +++-------- .../KafkaCanalSyncTableActionITCase.java | 211 ++++------------ .../KafkaDebeziumSyncTableActionITCase.java | 21 +- .../action/cdc/kafka/KafkaSchemaITCase.java | 22 +- .../kafka/KafkaSyncDatabaseActionITCase.java | 124 +++------- .../cdc/kafka/KafkaSyncTableActionITCase.java | 234 +++--------------- 7 files changed, 259 insertions(+), 636 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 002280079c6d..6ba066c8ff08 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -21,7 +21,7 @@ import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; import org.apache.paimon.utils.StringUtils; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.util.DockerImageVersions; @@ -33,11 +33,15 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; @@ -50,10 +54,9 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; -import java.io.File; -import java.io.IOException; import java.net.URL; import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -67,8 +70,6 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; -import static org.assertj.core.api.Assertions.assertThat; - /** Base test class for Kafka synchronization. */ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase { @@ -78,7 +79,11 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase { private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; private static final Network NETWORK = Network.newNetwork(); - private static final int zkTimeoutMills = 30000; + private static final int ZK_TIMEOUT_MILLIS = 30000; + + protected static KafkaProducer KAFKA_PRODUCER; + private static KafkaConsumer KAFKA_CONSUMER; + private static AdminClient ADMIN_CLIENT; // Timer for scheduling logging task if the test hangs private final Timer loggingTimer = new Timer("Debug Logging Timer"); @@ -104,6 +109,42 @@ protected void doStart() { // test run .withEnv("KAFKA_LOG_RETENTION_MS", "-1"); + @BeforeAll + public static void beforeAll() { + // create KafkaProducer + Properties producerProperties = getStandardProps(); + producerProperties.setProperty("retries", "0"); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getCanonicalName()); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getCanonicalName()); + KAFKA_PRODUCER = new KafkaProducer<>(producerProperties); + + // create KafkaConsumer + Properties consumerProperties = getStandardProps(); + consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-tests-debugging"); + consumerProperties.setProperty( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + consumerProperties.setProperty( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + KAFKA_CONSUMER = new KafkaConsumer<>(consumerProperties); + + // create AdminClient + ADMIN_CLIENT = AdminClient.create(getStandardProps()); + } + + @AfterAll + public static void afterAll() { + // Close Kafka objects + KAFKA_PRODUCER.close(); + KAFKA_CONSUMER.close(); + ADMIN_CLIENT.close(); + } + @BeforeEach public void setup() { // Probe Kafka broker status per 30 seconds @@ -130,8 +171,7 @@ public void after() throws Exception { } private void deleteTopics() throws ExecutionException, InterruptedException { - final AdminClient adminClient = AdminClient.create(getStandardProps()); - adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get(); + ADMIN_CLIENT.deleteTopics(ADMIN_CLIENT.listTopics().names().get()).all().get(); } // ------------------------ For Debug Logging Purpose ---------------------------------- @@ -156,29 +196,19 @@ private void cancelTimeoutLogger() { } private Map describeExternalTopics() { - try (final AdminClient adminClient = AdminClient.create(getStandardProps())) { + try { final List topics = - adminClient.listTopics().listings().get().stream() + ADMIN_CLIENT.listTopics().listings().get().stream() .filter(listing -> !listing.isInternal()) .map(TopicListing::name) .collect(Collectors.toList()); - - return adminClient.describeTopics(topics).all().get(); + return ADMIN_CLIENT.describeTopics(topics).all().get(); } catch (Exception e) { throw new RuntimeException("Failed to list Kafka topics", e); } } private void logTopicPartitionStatus(Map topicDescriptions) { - final Properties properties = getStandardProps(); - properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-tests-debugging"); - properties.setProperty( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - properties.setProperty( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getCanonicalName()); - final KafkaConsumer consumer = new KafkaConsumer(properties); List partitions = new ArrayList<>(); topicDescriptions.forEach( (topic, description) -> @@ -189,8 +219,9 @@ private void logTopicPartitionStatus(Map topicDescript partitions.add( new TopicPartition( topic, tpInfo.partition())))); - final Map beginningOffsets = consumer.beginningOffsets(partitions); - final Map endOffsets = consumer.endOffsets(partitions); + final Map beginningOffsets = + KAFKA_CONSUMER.beginningOffsets(partitions); + final Map endOffsets = KAFKA_CONSUMER.endOffsets(partitions); partitions.forEach( partition -> LOG.info( @@ -207,8 +238,8 @@ public static Properties getStandardProps() { standardProps.put("enable.auto.commit", false); standardProps.put("auto.offset.reset", "earliest"); standardProps.put("max.partition.fetch.bytes", 256); - standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills); - standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills); + standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS); + standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS); standardProps.put("default.api.timeout.ms", "120000"); return standardProps; } @@ -253,11 +284,12 @@ public KafkaSyncDatabaseActionBuilder(Map kafkaConfig) { } } - public void createTestTopic(String topic, int numPartitions, int replicationFactor) { + protected void createTestTopic(String topic, int numPartitions, int replicationFactor) { Map properties = new HashMap<>(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); - try (AdminClient admin = AdminClient.create(properties)) { - admin.createTopics( + try { + ADMIN_CLIENT + .createTopics( Collections.singletonList( new NewTopic(topic, numPartitions, (short) replicationFactor))) .all() @@ -271,42 +303,41 @@ public void createTestTopic(String topic, int numPartitions, int replicationFact } } - public static List readLines(String resource) throws IOException { - final URL url = - KafkaCanalSyncTableActionITCase.class.getClassLoader().getResource(resource); - assertThat(url).isNotNull(); - java.nio.file.Path path = new File(url.getFile()).toPath(); - return Files.readAllLines(path); + protected void writeRecordsToKafka(String topic, String resourceDirFormat, Object... args) + throws Exception { + writeRecordsToKafka(topic, resourceDirFormat, false, args); } - void writeRecordsToKafka(String topic, List lines) throws Exception { - writeRecordsToKafka(topic, lines, false); + protected void writeRecordsToKafka( + String topic, String resourceDirFormat, boolean wait, Object... args) throws Exception { + URL url = + KafkaCanalSyncTableActionITCase.class + .getClassLoader() + .getResource(String.format(resourceDirFormat, args)); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(r -> send(topic, r, wait)); } - void writeRecordsToKafka(String topic, List lines, boolean wait) throws Exception { - Properties producerProperties = getStandardProps(); - producerProperties.setProperty("retries", "0"); - producerProperties.put( - "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producerProperties.put( - "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - KafkaProducer kafkaProducer = new KafkaProducer(producerProperties); - for (int i = 0; i < lines.size(); i++) { + private boolean isRecordLine(String line) { + try { + objectMapper.readTree(line); + return !StringUtils.isEmpty(line); + } catch (JsonProcessingException e) { + return false; + } + } + + private void send(String topic, String record, boolean wait) { + Future sendFuture = + KAFKA_PRODUCER.send(new ProducerRecord<>(topic, record)); + if (wait) { try { - JsonNode jsonNode = objectMapper.readTree(lines.get(i)); - if (!StringUtils.isEmpty(lines.get(i))) { - Future sendFuture = - kafkaProducer.send(new ProducerRecord<>(topic, lines.get(i))); - if (wait) { - sendFuture.get(); - } - } - } catch (Exception e) { - // ignore + sendFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } } - - kafkaProducer.close(); } /** Kafka container extension for junit5. */ diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java index 47b3419c8827..fa6bd9301bab 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java @@ -58,19 +58,11 @@ public void testSchemaEvolutionMultiTopic() throws Exception { List topics = Arrays.asList(topic1, topic2, topic3); topics.forEach(topic -> createTestTopic(topic, 1, 1)); - // ---------- Write the Canal json into Kafka ------------------- - for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - topics.get(i), - readLines( - "kafka/canal/database/schemaevolution/topic" - + i - + "/canal-data-1.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka( + topics.get(i), + "kafka/canal/database/schemaevolution/topic%s/canal-data-1.txt", + i); } Map kafkaConfig = getBasicKafkaConfig(); @@ -99,19 +91,11 @@ public void testSchemaEvolutionOneTopic() throws Exception { List topics = Collections.singletonList(topic); topics.forEach(t -> createTestTopic(t, 1, 1)); - // ---------- Write the Canal json into Kafka ------------------- - for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - topics.get(0), - readLines( - "kafka/canal/database/schemaevolution/topic" - + i - + "/canal-data-1.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka( + topics.get(0), + "kafka/canal/database/schemaevolution/topic%s/canal-data-1.txt", + i); } Map kafkaConfig = getBasicKafkaConfig(); @@ -155,16 +139,10 @@ private void testSchemaEvolutionImpl(List topics, boolean writeOne, int waitForResult(expected, table2, rowType2, primaryKeys2); for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - writeOne ? topics.get(0) : topics.get(i), - readLines( - "kafka/canal/database/schemaevolution/topic" - + i - + "/canal-data-2.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + "kafka/canal/database/schemaevolution/topic%s/canal-data-2.txt", + i); } rowType1 = @@ -200,16 +178,10 @@ private void testSchemaEvolutionImpl(List topics, boolean writeOne, int waitForResult(expected, table2, rowType2, primaryKeys2); for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - writeOne ? topics.get(0) : topics.get(i), - readLines( - "kafka/canal/database/schemaevolution/topic" - + i - + "/canal-data-3.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + "kafka/canal/database/schemaevolution/topic%s/canal-data-3.txt", + i); } rowType1 = @@ -281,19 +253,9 @@ public void testTableAffixMultiTopic() throws Exception { List topics = Arrays.asList(topic1, topic2); topics.forEach(topic -> createTestTopic(topic, 1, 1)); - // ---------- Write the Canal json into Kafka ------------------- - for (int i = 0; i < topics.size(); i++) { - try { - writeRecordsToKafka( - topics.get(i), - readLines( - "kafka/canal/database/prefixsuffix/topic" - + i - + "/canal-data-1.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka( + topics.get(i), "kafka/canal/database/prefixsuffix/topic%s/canal-data-1.txt", i); } // try synchronization @@ -332,19 +294,9 @@ public void testTableAffixOneTopic() throws Exception { int fileCount = 2; topics.forEach(topic -> createTestTopic(topic, 1, 1)); - // ---------- Write the Canal json into Kafka ------------------- - for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - topics.get(0), - readLines( - "kafka/canal/database/prefixsuffix/topic" - + i - + "/canal-data-1.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka( + topics.get(0), "kafka/canal/database/prefixsuffix/topic%s/canal-data-1.txt", i); } // try synchronization @@ -388,16 +340,10 @@ private void testTableAffixImpl(List topics, boolean writeOne, int fileC waitForResult(expected, table2, rowType2, primaryKeys2); for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - writeOne ? topics.get(0) : topics.get(i), - readLines( - "kafka/canal/database/prefixsuffix/topic" - + i - + "/canal-data-2.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + "kafka/canal/database/prefixsuffix/topic%s/canal-data-2.txt", + i); } rowType1 = RowType.of( @@ -428,16 +374,10 @@ private void testTableAffixImpl(List topics, boolean writeOne, int fileC waitForResult(expected, table2, rowType2, primaryKeys2); for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - writeOne ? topics.get(0) : topics.get(i), - readLines( - "kafka/canal/database/prefixsuffix/topic" - + i - + "/canal-data-3.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + "kafka/canal/database/prefixsuffix/topic%s/canal-data-3.txt", + i); } rowType1 = @@ -510,16 +450,8 @@ private void includingAndExcludingTablesImpl( final String topic1 = "include_exclude" + UUID.randomUUID(); List topics = Collections.singletonList(topic1); topics.forEach(topic -> createTestTopic(topic, 1, 1)); + writeRecordsToKafka(topics.get(0), "kafka/canal/database/include/topic0/canal-data-1.txt"); - // ---------- Write the Canal json into Kafka ------------------- - - try { - writeRecordsToKafka( - topics.get(0), - readLines("kafka/canal/database/include/topic0/canal-data-1.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } // try synchronization Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -542,9 +474,7 @@ private void includingAndExcludingTablesImpl( public void testTypeMappingToString() throws Exception { final String topic = "map-to-string"; createTestTopic(topic, 1, 1); - - // ---------- Write the Canal json into Kafka ------------------- - writeRecordsToKafka(topic, readLines("kafka/canal/database/tostring/canal-data-1.txt")); + writeRecordsToKafka(topic, "kafka/canal/database/tostring/canal-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -590,10 +520,7 @@ public void testCatalogAndTableConfig() { public void testCaseInsensitive() throws Exception { final String topic = "case-insensitive"; createTestTopic(topic, 1, 1); - - // ---------- Write the Canal json into Kafka ------------------- - writeRecordsToKafka( - topic, readLines("kafka/canal/database/case-insensitive/canal-data-1.txt")); + writeRecordsToKafka(topic, "kafka/canal/database/case-insensitive/canal-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -628,8 +555,7 @@ public void testCaseInsensitive() throws Exception { public void testCannotSynchronizeIncompleteJson() throws Exception { final String topic = "incomplete"; createTestTopic(topic, 1, 1); - - writeRecordsToKafka(topic, readLines("kafka/canal/database/incomplete/canal-data-1.txt")); + writeRecordsToKafka(topic, "kafka/canal/database/incomplete/canal-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index 40743fae67fb..bf611768bbe0 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -74,14 +74,8 @@ public void testSchemaEvolutionWithMissingDdl() throws Exception { private void runSingleTableSchemaEvolution(String sourceDir) throws Exception { final String topic = "schema_evolution"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = - readLines(String.format("kafka/canal/table/%s/canal-data-1.txt", sourceDir)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-1.txt", sourceDir); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -111,13 +105,8 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exce List expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]"); waitForResult(expected, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, - readLines(String.format("kafka/canal/table/%s/canal-data-2.txt", sourceDir))); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-2.txt", sourceDir); + rowType = RowType.of( new DataType[] { @@ -137,13 +126,8 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exce "+I[1, 6, six, 60]"); waitForResult(expected, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, - readLines(String.format("kafka/canal/table/%s/canal-data-3.txt", sourceDir))); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-3.txt", sourceDir); + rowType = RowType.of( new DataType[] { @@ -164,13 +148,8 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exce "+I[2, 8, eight, 80000000000]"); waitForResult(expected, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, - readLines(String.format("kafka/canal/table/%s/canal-data-4.txt", sourceDir))); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-4.txt", sourceDir); + rowType = RowType.of( new DataType[] { @@ -195,13 +174,8 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exce "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]"); waitForResult(expected, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, - readLines(String.format("kafka/canal/table/%s/canal-data-5.txt", sourceDir))); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/%s/canal-data-5.txt", sourceDir); + rowType = RowType.of( new DataType[] { @@ -232,14 +206,8 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exce public void testMultipleSchemaEvolutions() throws Exception { final String topic = "schema_evolution_multiple"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = - readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/schemaevolutionmultiple/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -271,12 +239,8 @@ private void testSchemaEvolutionMultipleImpl(String topic) throws Exception { List expected = Collections.singletonList("+I[1, one, 10, string_1]"); waitForResult(expected, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-2.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/schemaevolutionmultiple/canal-data-2.txt"); + rowType = RowType.of( new DataType[] { @@ -300,7 +264,6 @@ private void testSchemaEvolutionMultipleImpl(String topic) throws Exception { @Test @Timeout(60) public void testAllTypes() throws Exception { - // the first round checks for table creation // the second round checks for running the action on an existing table for (int i = 0; i < 2; i++) { @@ -312,14 +275,8 @@ public void testAllTypes() throws Exception { private void testAllTypesOnce() throws Exception { final String topic = "all_type" + UUID.randomUUID(); createTestTopic(topic, 1, 1); + writeRecordsToKafka(topic, "kafka/canal/table/alltype/canal-data.txt"); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/alltype/canal-data.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -578,13 +535,8 @@ private void testAllTypesImpl() throws Exception { public void testNotSupportFormat() throws Exception { final String topic = "not_support"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/schemaevolution/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/schemaevolution/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "togg-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -606,13 +558,8 @@ public void testNotSupportFormat() throws Exception { public void testKafkaNoNonDdlData() throws Exception { final String topic = "no_non_ddl_data"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/nononddldata/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/nononddldata/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -634,13 +581,8 @@ public void testKafkaNoNonDdlData() throws Exception { public void testAssertSchemaCompatible() throws Exception { final String topic = "assert_schema_compatible"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/schemaevolution/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/schemaevolution/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -675,13 +617,8 @@ public void testAssertSchemaCompatible() throws Exception { public void testStarUpOptionSpecific() throws Exception { final String topic = "start_up_specific"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -716,13 +653,8 @@ public void testStarUpOptionSpecific() throws Exception { public void testStarUpOptionLatest() throws Exception { final String topic = "start_up_latest"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines, true); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-1.txt", true); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -735,11 +667,8 @@ public void testStarUpOptionLatest() throws Exception { .build(); runActionWithDefaultEnv(action); - try { - writeRecordsToKafka(topic, readLines("kafka/canal/table/startupmode/canal-data-2.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-2.txt"); + FileStoreTable table = getFileStoreTable(tableName); RowType rowType = @@ -761,13 +690,8 @@ public void testStarUpOptionLatest() throws Exception { public void testStarUpOptionTimestamp() throws Exception { final String topic = "start_up_timestamp"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines, true); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-1.txt", true); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -782,11 +706,8 @@ public void testStarUpOptionTimestamp() throws Exception { .build(); runActionWithDefaultEnv(action); - try { - writeRecordsToKafka(topic, readLines("kafka/canal/table/startupmode/canal-data-2.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-2.txt"); + FileStoreTable table = getFileStoreTable(tableName); RowType rowType = @@ -808,13 +729,8 @@ public void testStarUpOptionTimestamp() throws Exception { public void testStarUpOptionEarliest() throws Exception { final String topic = "start_up_earliest"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -827,11 +743,8 @@ public void testStarUpOptionEarliest() throws Exception { .build(); runActionWithDefaultEnv(action); - try { - writeRecordsToKafka(topic, readLines("kafka/canal/table/startupmode/canal-data-2.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-2.txt"); + FileStoreTable table = getFileStoreTable(tableName); RowType rowType = @@ -855,13 +768,8 @@ public void testStarUpOptionEarliest() throws Exception { public void testStarUpOptionGroup() throws Exception { final String topic = "start_up_group"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -874,11 +782,8 @@ public void testStarUpOptionGroup() throws Exception { .build(); runActionWithDefaultEnv(action); - try { - writeRecordsToKafka(topic, readLines("kafka/canal/table/startupmode/canal-data-2.txt")); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/startupmode/canal-data-2.txt"); + FileStoreTable table = getFileStoreTable(tableName); RowType rowType = @@ -902,13 +807,8 @@ public void testStarUpOptionGroup() throws Exception { public void testComputedColumn() throws Exception { String topic = "computed_column"; createTestTopic(topic, 1, 1); + writeRecordsToKafka(topic, "kafka/canal/table/computedcolumn/canal-data-1.txt"); - List lines = readLines("kafka/canal/table/computedcolumn/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -939,9 +839,7 @@ public void testComputedColumn() throws Exception { public void testTypeMappingToString() throws Exception { final String topic = "map-to-string"; createTestTopic(topic, 1, 1); - - // ---------- Write the Canal json into Kafka ------------------- - writeRecordsToKafka(topic, readLines("kafka/canal/table/tostring/canal-data-1.txt")); + writeRecordsToKafka(topic, "kafka/canal/table/tostring/canal-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -986,9 +884,7 @@ public void testCatalogAndTableConfig() { public void testCDCOperations(boolean ignoreDelete) throws Exception { final String topic = "event-insert" + UUID.randomUUID(); createTestTopic(topic, 1, 1); - - // ---------- Write the Canal json into Kafka ------------------- - writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-row.txt")); + writeRecordsToKafka(topic, "kafka/canal/table/event/event-row.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -1022,7 +918,7 @@ public void testCDCOperations(boolean ignoreDelete) throws Exception { "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]"); waitForResult(expectedRow, table, rowType, primaryKeys); - writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-insert.txt")); + writeRecordsToKafka(topic, "kafka/canal/table/event/event-insert.txt"); // For the INSERT operation List expectedInsert = @@ -1033,7 +929,7 @@ public void testCDCOperations(boolean ignoreDelete) throws Exception { "+I[2, 4, four, NULL, NULL, NULL, NULL]"); waitForResult(expectedInsert, table, rowType, primaryKeys); - writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-update.txt")); + writeRecordsToKafka(topic, "kafka/canal/table/event/event-update.txt"); // For the UPDATE operation List expectedUpdate = @@ -1044,7 +940,7 @@ public void testCDCOperations(boolean ignoreDelete) throws Exception { "+I[2, 4, four, NULL, NULL, NULL, NULL]"); waitForResult(expectedUpdate, table, rowType, primaryKeys); - writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-delete.txt")); + writeRecordsToKafka(topic, "kafka/canal/table/event/event-delete.txt"); // For the DELETE operation List expectedDelete = @@ -1082,8 +978,7 @@ public void testSyncWithInitialEmptyTopic() throws Exception { .build(); runActionWithDefaultEnv(action); - List lines = readLines("kafka/canal/table/initialemptytopic/canal-data-1.txt"); - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, "kafka/canal/table/initialemptytopic/canal-data-1.txt"); RowType rowType = RowType.of( @@ -1106,8 +1001,7 @@ public void testSyncWithInitialEmptyTopic() throws Exception { public void testSynchronizeIncompleteJson() throws Exception { String topic = "incomplete"; createTestTopic(topic, 1, 1); - List lines = readLines("kafka/canal/table/incomplete/canal-data-1.txt"); - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, "kafka/canal/table/incomplete/canal-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -1138,8 +1032,7 @@ public void testSynchronizeIncompleteJson() throws Exception { public void testSynchronizeNonPkTable() throws Exception { String topic = "non_pk"; createTestTopic(topic, 1, 1); - List lines = readLines("kafka/canal/table/nonpk/canal-data-1.txt"); - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, "kafka/canal/table/nonpk/canal-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -1167,7 +1060,7 @@ public void testSynchronizeNonPkTable() throws Exception { public void testMissingDecimalPrecision() throws Exception { String topic = "missing-decimal-precision"; createTestTopic(topic, 1, 1); - writeRecordsToKafka(topic, readLines("kafka/canal/table/incomplete/canal-data-2.txt")); + writeRecordsToKafka(topic, "kafka/canal/table/incomplete/canal-data-2.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -1209,8 +1102,7 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval Collections.singletonList("_id"), Collections.emptyMap()); } else { - List lines = readLines("kafka/canal/table/computedcolumn/canal-data-2.txt"); - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, "kafka/canal/table/computedcolumn/canal-data-2.txt"); } Map kafkaConfig = getBasicKafkaConfig(); @@ -1227,8 +1119,7 @@ public void testComputedColumnWithCaseInsensitive(boolean triggerSchemaRetrieval runActionWithDefaultEnv(action); if (triggerSchemaRetrievalException) { - List lines = readLines("kafka/canal/table/computedcolumn/canal-data-2.txt"); - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, "kafka/canal/table/computedcolumn/canal-data-2.txt"); } RowType rowType = diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index ba96331621d7..5b8d0db00a50 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -22,16 +22,13 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.Properties; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; @@ -125,22 +122,10 @@ public void testMessageWithNullValue() throws Exception { final String topic = "test_null_value"; createTestTopic(topic, 1, 1); - List lines = readLines("kafka/debezium/table/nullvalue/debezium-data-1.txt"); - writeRecordsToKafka(topic, lines); - + writeRecordsToKafka(topic, "kafka/debezium/table/nullvalue/debezium-data-1.txt"); // write null value - Properties producerProperties = getStandardProps(); - producerProperties.setProperty("retries", "0"); - producerProperties.put( - "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producerProperties.put( - "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - KafkaProducer kafkaProducer = new KafkaProducer<>(producerProperties); - kafkaProducer.send(new ProducerRecord<>(topic, null)); - kafkaProducer.close(); - - lines = readLines("kafka/debezium/table/nullvalue/debezium-data-2.txt"); - writeRecordsToKafka(topic, lines); + KAFKA_PRODUCER.send(new ProducerRecord<>(topic, null)); + writeRecordsToKafka(topic, "kafka/debezium/table/nullvalue/debezium-data-2.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java index 5c998c33cf6b..7642f9447b30 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java @@ -48,13 +48,8 @@ public class KafkaSchemaITCase extends KafkaActionITCaseBase { public void testKafkaSchema() throws Exception { final String topic = "test_kafka_schema"; createTestTopic(topic, 1, 1); - // ---------- Write the Canal json into Kafka ------------------- - List lines = readLines("kafka/canal/table/schemaevolution/canal-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/canal/table/schemaevolution/canal-data-1.txt"); + Configuration kafkaConfig = Configuration.fromMap(getBasicKafkaConfig()); kafkaConfig.setString(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.setString(TOPIC.key(), topic); @@ -76,9 +71,7 @@ public void testKafkaSchema() throws Exception { public void testTableOptionsChange() throws Exception { final String topic = "test_table_options_change"; createTestTopic(topic, 1, 1); - - // ---------- Write the Canal json into Kafka ------------------- - writeRecordsToKafka(topic, readLines("kafka/canal/table/optionschange/canal-data-1.txt")); + writeRecordsToKafka(topic, "kafka/canal/table/optionschange/canal-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); @@ -94,7 +87,7 @@ public void testTableOptionsChange() throws Exception { waitingTables(tableName); jobClient.cancel(); - writeRecordsToKafka(topic, readLines("kafka/canal/table/optionschange/canal-data-2.txt")); + writeRecordsToKafka(topic, "kafka/canal/table/optionschange/canal-data-2.txt"); tableConfig.put("sink.savepoint.auto-tag", "true"); tableConfig.put("tag.num-retained-max", "5"); @@ -119,10 +112,8 @@ public void testTableOptionsChange() throws Exception { public void testNewlyAddedTablesOptionsChange() throws Exception { final String topic = "test_database_options_change"; createTestTopic(topic, 1, 1); + writeRecordsToKafka(topic, "kafka/canal/database/schemaevolution/topic0/canal-data-1.txt"); - // ---------- Write the Canal json into Kafka ------------------- - writeRecordsToKafka( - topic, readLines("kafka/canal/database/schemaevolution/topic0/canal-data-1.txt")); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "canal-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -146,8 +137,7 @@ public void testNewlyAddedTablesOptionsChange() throws Exception { tableConfig.put("snapshot.num-retained.max", "10"); tableConfig.put("changelog-producer", "input"); - writeRecordsToKafka( - topic, readLines("kafka/canal/database/schemaevolution/topic1/canal-data-1.txt")); + writeRecordsToKafka(topic, "kafka/canal/database/schemaevolution/topic1/canal-data-1.txt"); KafkaSyncDatabaseAction action2 = syncDatabaseActionBuilder(kafkaConfig).withTableConfig(tableConfig).build(); runActionWithDefaultEnv(action2); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java index cdad175ea15f..16aff6984098 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionITCase.java @@ -49,19 +49,13 @@ protected void testSchemaEvolutionMultiTopic(String format) throws Exception { List topics = Arrays.asList(topic1, topic2); topics.forEach(topic -> createTestTopic(topic, 1, 1)); - // ---------- Write the data into Kafka ------------------- - for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - topics.get(i), - readLines( - String.format( - "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt", - format, i, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka( + topics.get(i), + "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt", + format, + i, + format); } Map kafkaConfig = getBasicKafkaConfig(); @@ -83,19 +77,13 @@ protected void testSchemaEvolutionOneTopic(String format) throws Exception { List topics = Collections.singletonList(topic); topics.forEach(t -> createTestTopic(t, 1, 1)); - // ---------- Write the maxwell json into Kafka ------------------- - for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - topics.get(0), - readLines( - String.format( - "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt", - format, i, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka( + topics.get(0), + "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt", + format, + i, + format); } Map kafkaConfig = getBasicKafkaConfig(); @@ -150,16 +138,12 @@ private void testSchemaEvolutionImpl( waitForResult(expected2, table2, rowType2, getPrimaryKey(format)); for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - writeOne ? topics.get(0) : topics.get(i), - readLines( - String.format( - "kafka/%s/database/schemaevolution/topic%s/%s-data-2.txt", - format, i, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + "kafka/%s/database/schemaevolution/topic%s/%s-data-2.txt", + format, + i, + format); } rowType1 = @@ -244,19 +228,13 @@ protected void testTableAffixMultiTopic(String format) throws Exception { List topics = Arrays.asList(topic1, topic2); topics.forEach(topic -> createTestTopic(topic, 1, 1)); - // ---------- Write the data into Kafka ------------------- - for (int i = 0; i < topics.size(); i++) { - try { - writeRecordsToKafka( - topics.get(i), - readLines( - String.format( - "kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt", - format, i, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka( + topics.get(i), + "kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt", + format, + i, + format); } // try synchronization @@ -298,19 +276,12 @@ protected void testTableAffixOneTopic(String format) throws Exception { int fileCount = 2; topics.forEach(topic -> createTestTopic(topic, 1, 1)); - // ---------- Write the maxwell json into Kafka ------------------- - for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - topics.get(0), - readLines( - String.format( - "kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt", - format, i, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka( + topics.get(0), + String.format( + "kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt", + format, i, format)); } // try synchronization @@ -370,16 +341,12 @@ private void testTableAffixImpl( waitForResult(expected, table2, rowType2, getPrimaryKey(format)); for (int i = 0; i < fileCount; i++) { - try { - writeRecordsToKafka( - writeOne ? topics.get(0) : topics.get(i), - readLines( - String.format( - "kafka/%s/database/prefixsuffix/topic%s/%s-data-2.txt", - format, i, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + "kafka/%s/database/prefixsuffix/topic%s/%s-data-2.txt", + format, + i, + format); } rowType1 = RowType.of( @@ -470,18 +437,9 @@ private void includingAndExcludingTablesImpl( List topics = Collections.singletonList(topic1); topics.forEach(topic -> createTestTopic(topic, 1, 1)); - // ---------- Write the data into Kafka ------------------- + writeRecordsToKafka( + topics.get(0), "kafka/%s/database/include/topic0/%s-data-1.txt", format, format); - try { - writeRecordsToKafka( - topics.get(0), - readLines( - String.format( - "kafka/%s/database/include/topic0/%s-data-1.txt", - format, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } // try synchronization Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); @@ -503,14 +461,8 @@ protected void testCaseInsensitive(String format) throws Exception { final String topic = "case-insensitive"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - writeRecordsToKafka( - topic, - readLines( - String.format( - "kafka/%s/database/case-insensitive/%s-data-1.txt", - format, format))); + topic, "kafka/%s/database/case-insensitive/%s-data-1.txt", format, format); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index e13ab86e9469..07c8f3779de7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -58,16 +58,8 @@ public class KafkaSyncTableActionITCase extends KafkaActionITCaseBase { protected void runSingleTableSchemaEvolution(String sourceDir, String format) throws Exception { final String topic = "schema_evolution"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format( - "kafka/%s/table/%s/%s-data-1.txt", format, sourceDir, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-1.txt", format, sourceDir, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -101,15 +93,8 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir, String form "+I[102, car battery, 12V car battery, 8.1]"); waitForResult(expected, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, - readLines( - String.format( - "kafka/%s/table/%s/%s-data-2.txt", format, sourceDir, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-2.txt", format, sourceDir, format); + rowType = RowType.of( new DataType[] { @@ -128,15 +113,8 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir, String form "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]"); waitForResult(expected, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, - readLines( - String.format( - "kafka/%s/table/%s/%s-data-3.txt", format, sourceDir, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/%s/%s-data-3.txt", format, sourceDir, format); + rowType = RowType.of( new DataType[] { @@ -161,16 +139,8 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir, String form public void testNotSupportFormat(String format) throws Exception { final String topic = "not_support"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format( - "kafka/%s/table/schemaevolution/%s-data-1.txt", format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/schemaevolution/%s-data-1.txt", format, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "togg-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -190,16 +160,8 @@ public void testNotSupportFormat(String format) throws Exception { protected void testAssertSchemaCompatible(String format) throws Exception { final String topic = "assert_schema_compatible"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format( - "kafka/%s/table/schemaevolution/%s-data-1.txt", format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/schemaevolution/%s-data-1.txt", format, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -231,15 +193,8 @@ protected void testAssertSchemaCompatible(String format) throws Exception { protected void testStarUpOptionSpecific(String format) throws Exception { final String topic = "start_up_specific"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt", format, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -273,15 +228,8 @@ protected void testStarUpOptionSpecific(String format) throws Exception { protected void testStarUpOptionLatest(String format) throws Exception { final String topic = "start_up_latest"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format)); - try { - writeRecordsToKafka(topic, lines, true); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt", true, format, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -295,15 +243,8 @@ protected void testStarUpOptionLatest(String format) throws Exception { Thread.sleep(5000); FileStoreTable table = getFileStoreTable(tableName); - try { - writeRecordsToKafka( - topic, - readLines( - String.format( - "kafka/%s/table/startupmode/%s-data-2.txt", format, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt", format, format); RowType rowType = RowType.of( @@ -326,15 +267,8 @@ protected void testStarUpOptionLatest(String format) throws Exception { public void testStarUpOptionTimestamp(String format) throws Exception { final String topic = "start_up_timestamp"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt", format, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -348,15 +282,8 @@ public void testStarUpOptionTimestamp(String format) throws Exception { .build(); runActionWithDefaultEnv(action); - try { - writeRecordsToKafka( - topic, - readLines( - String.format( - "kafka/%s/table/startupmode/%s-data-2.txt", format, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt", format, format); + FileStoreTable table = getFileStoreTable(tableName); RowType rowType = @@ -380,15 +307,8 @@ public void testStarUpOptionTimestamp(String format) throws Exception { public void testStarUpOptionEarliest(String format) throws Exception { final String topic = "start_up_earliest"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt", format, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -400,15 +320,8 @@ public void testStarUpOptionEarliest(String format) throws Exception { .build(); runActionWithDefaultEnv(action); - try { - writeRecordsToKafka( - topic, - readLines( - String.format( - "kafka/%s/table/startupmode/%s-data-2.txt", format, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt", format, format); + FileStoreTable table = getFileStoreTable(tableName); RowType rowType = @@ -434,15 +347,8 @@ public void testStarUpOptionEarliest(String format) throws Exception { public void testStarUpOptionGroup(String format) throws Exception { final String topic = "start_up_group"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-1.txt", format, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -454,15 +360,8 @@ public void testStarUpOptionGroup(String format) throws Exception { .build(); runActionWithDefaultEnv(action); - try { - writeRecordsToKafka( - topic, - readLines( - String.format( - "kafka/%s/table/startupmode/%s-data-2.txt", format, format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/startupmode/%s-data-2.txt", format, format); + FileStoreTable table = getFileStoreTable(tableName); RowType rowType = @@ -488,16 +387,8 @@ public void testStarUpOptionGroup(String format) throws Exception { public void testComputedColumn(String format) throws Exception { String topic = "computed_column"; createTestTopic(topic, 1, 1); + writeRecordsToKafka(topic, "kafka/%s/table/computedcolumn/%s-data-1.txt", format, format); - List lines = - readLines( - String.format( - "kafka/%s/table/computedcolumn/%s-data-1.txt", format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -528,14 +419,8 @@ public void testComputedColumn(String format) throws Exception { protected void testCDCOperations(String format) throws Exception { String topic = "event"; createTestTopic(topic, 1, 1); + writeRecordsToKafka(topic, "kafka/%s/table/event/event-insert.txt", format); - List lines = - readLines(String.format("kafka/%s/table/event/event-insert.txt", format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json"); kafkaConfig.put(TOPIC.key(), topic); @@ -566,13 +451,8 @@ protected void testCDCOperations(String format) throws Exception { "+I[103, scooter, Big 2-wheel scooter , 5.1]"); waitForResult(expectedInsert, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, - readLines(String.format("kafka/%s/table/event/event-update.txt", format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/event/event-update.txt", format); + // For the UPDATE operation List expectedUpdate = Arrays.asList( @@ -581,13 +461,7 @@ protected void testCDCOperations(String format) throws Exception { "+I[103, scooter, Big 2-wheel scooter , 8.1]"); waitForResult(expectedUpdate, table, rowType, primaryKeys); - try { - writeRecordsToKafka( - topic, - readLines(String.format("kafka/%s/table/event/event-delete.txt", format))); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/event/event-delete.txt", format); // For the REPLACE operation List expectedReplace = @@ -600,17 +474,9 @@ protected void testCDCOperations(String format) throws Exception { public void testKafkaBuildSchemaWithDelete(String format) throws Exception { final String topic = "test_kafka_schema"; createTestTopic(topic, 1, 1); - // ---------- Write the Debezium json into Kafka ------------------- - List lines = - readLines( - String.format( - "kafka/%s/table/schema/schemaevolution/%s-data-4.txt", - format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka( + topic, "kafka/%s/table/schema/schemaevolution/%s-data-4.txt", format, format); + Configuration kafkaConfig = Configuration.fromMap(getBasicKafkaConfig()); kafkaConfig.setString(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.setString(TOPIC.key(), topic); @@ -632,9 +498,7 @@ public void testKafkaBuildSchemaWithDelete(String format) throws Exception { public void testWaterMarkSyncTable(String format) throws Exception { String topic = "watermark"; createTestTopic(topic, 1, 1); - writeRecordsToKafka( - topic, - readLines(String.format("kafka/%s/table/watermark/%s-data-1.txt", format, format))); + writeRecordsToKafka(topic, "kafka/%s/table/watermark/%s-data-1.txt", format, format); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); @@ -666,13 +530,7 @@ public void testWaterMarkSyncTable(String format) throws Exception { public void testSchemaIncludeRecord(String format) throws Exception { String topic = "schema_include"; createTestTopic(topic, 1, 1); - - List lines = readLines("kafka/debezium/table/schema/include/debezium-data-1.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write debezium data to Kafka.", e); - } + writeRecordsToKafka(topic, "kafka/debezium/table/schema/include/debezium-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); @@ -706,9 +564,7 @@ public void testSchemaIncludeRecord(String format) throws Exception { public void testAllTypesWithSchemaImpl(String format) throws Exception { String topic = "schema_include_all_type"; createTestTopic(topic, 1, 1); - - List lines = readLines("kafka/debezium/table/schema/alltype/debezium-data-1.txt"); - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, "kafka/debezium/table/schema/alltype/debezium-data-1.txt"); Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); @@ -952,16 +808,8 @@ public void testAllTypesWithSchemaImpl(String format) throws Exception { protected void testTableFiledValNull(String format) throws Exception { final String topic = "table_filed_val_null"; createTestTopic(topic, 1, 1); - // ---------- Write the data into Kafka ------------------- - List lines = - readLines( - String.format( - "kafka/%s/table/schemaevolution/%s-data-4.txt", format, format)); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); - } + writeRecordsToKafka(topic, "kafka/%s/table/schemaevolution/%s-data-4.txt", format, format); + Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); kafkaConfig.put(TOPIC.key(), topic);