diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 6bdef3301a610e..d0843eb92044f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -235,7 +235,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserExcept } KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, maxBatchIntervalS * Config.routine_load_task_timeout_multiplier * 1000, - taskKafkaProgress, isMultiTable()); + taskKafkaProgress, isMultiTable(), -1, false); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index f1578269529a12..e3292dc671f8b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -49,16 +49,17 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private Map partitionIdToOffset; public KafkaTaskInfo(UUID id, long jobId, - long timeoutMs, Map partitionIdToOffset, boolean isMultiTable) { - super(id, jobId, timeoutMs, isMultiTable); + long timeoutMs, Map partitionIdToOffset, boolean isMultiTable, + long lastScheduledTime, boolean isEof) { + super(id, jobId, timeoutMs, isMultiTable, lastScheduledTime, isEof); this.partitionIdToOffset = partitionIdToOffset; } public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map partitionIdToOffset, boolean isMultiTable) { super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), - kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable); + kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable, + kafkaTaskInfo.getLastScheduledTime(), kafkaTaskInfo.getIsEof()); this.partitionIdToOffset = partitionIdToOffset; - this.isEof = kafkaTaskInfo.getIsEof(); } public List getPartitions() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 1ff825d97b9d17..5075311299d603 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -79,17 +79,20 @@ public abstract class RoutineLoadTaskInfo { // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; - public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean isMultiTable) { + public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean isMultiTable, + long lastScheduledTime, boolean isEof) { this.id = id; this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); this.timeoutMs = timeoutMs; this.isMultiTable = isMultiTable; + this.lastScheduledTime = lastScheduledTime; + this.isEof = isEof; } public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long previousBeId, - boolean isMultiTable) { - this(id, jobId, timeoutMs, isMultiTable); + boolean isMultiTable, long lastScheduledTime, boolean isEof) { + this(id, jobId, timeoutMs, isMultiTable, lastScheduledTime, isEof); this.previousBeId = previousBeId; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 20cb626ff37055..63452a5d59ca11 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -225,7 +225,7 @@ public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTrans Map partitionIdsToOffset = Maps.newHashMap(); partitionIdsToOffset.put(100, 0L); KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, - maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false); + maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false, -1, false); kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1); routineLoadTaskInfoList.add(kafkaTaskInfo); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 95c2423de71fa9..6e11fc5f71a5c5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -70,7 +70,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 LinkedBlockingDeque routineLoadTaskInfoQueue = new LinkedBlockingDeque<>(); KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, - partitionIdToOffset, false); + partitionIdToOffset, false, -1, false); routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1); Map idToRoutineLoadTask = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 420800a4bb3bd0..c4ec468c651856 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -302,7 +302,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, - partitionIdToOffset, false); + partitionIdToOffset, false, -1, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, @@ -368,7 +368,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, - partitionIdToOffset, false); + partitionIdToOffset, false, -1, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, diff --git a/regression-test/suites/load_p0/routine_load/data/test_eof.csv b/regression-test/suites/load_p0/routine_load/data/test_eof.csv new file mode 100644 index 00000000000000..bc857cabcfdb5c --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_eof.csv @@ -0,0 +1 @@ +57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10 18:39:10|2023-02-12|2023-01-27 07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": "New York"} \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy new file mode 100644 index 00000000000000..6eeb9a4e51c7b4 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_eof","p0") { + def kafkaCsvTpoics = [ + "test_eof", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def thread = Thread.start { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + while(true) { + Thread.sleep(1000) + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + } + + sleep(2 * 1000) + + def jobName = "testEof" + def tableName = "test_routine_load_eof" + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 BOOLEAN NOT NULL DEFAULT "TRUE", + kd02 TINYINT NOT NULL DEFAULT "1", + kd03 SMALLINT NOT NULL DEFAULT "2", + kd04 INT NOT NULL DEFAULT "3", + kd05 BIGINT NOT NULL DEFAULT "4", + kd06 LARGEINT NOT NULL DEFAULT "5", + kd07 FLOAT NOT NULL DEFAULT "6.0", + kd08 DOUBLE NOT NULL DEFAULT "7.0", + kd09 DECIMAL NOT NULL DEFAULT "888888888", + kd10 DECIMALV3 NOT NULL DEFAULT "999999999", + kd11 DATE NOT NULL DEFAULT "2023-08-24", + kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00", + kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", + kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00", + kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd18 JSON NULL, + + INDEX idx_inverted_k104 (`k05`) USING INVERTED, + INDEX idx_inverted_k110 (`k11`) USING INVERTED, + INDEX idx_inverted_k113 (`k13`) USING INVERTED, + INDEX idx_inverted_k114 (`k14`) USING INVERTED, + INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), + INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + + INDEX idx_bitmap_k104 (`k02`) USING BITMAP, + INDEX idx_bitmap_k110 (`kd01`) USING BITMAP + + ) + DUPLICATE KEY(k00) + PARTITION BY RANGE(k01) + ( + PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')), + PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')), + PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01')) + ) + DISTRIBUTED BY HASH(k00) BUCKETS 32 + PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" + ); + """ + sql "sync" + + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "test_eof", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state != "RUNNING") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + break; + } + sleep(60 * 1000) + def res = sql "show routine load for ${jobName}" + def statistic = res[0][14].toString() + def json = parseJson(res[0][14]) + log.info("routine load statistic: ${res[0][14].toString()}".toString()) + if (json.committedTaskNum > 20) { + assertEquals(1, 2) + } + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + thread.interrupt() + } +} \ No newline at end of file