From 7c29ba6f362df8041a4de1d81a6cf7e445e44e22 Mon Sep 17 00:00:00 2001 From: "taylor.fan" Date: Fri, 18 Oct 2024 18:21:30 +0800 Subject: [PATCH 1/5] [KYUUBI #6726] Support trino stage progress --- .../engine/trino/TrinoProgressMonitor.scala | 163 ++++++++++++++++++ .../trino/operation/ExecuteStatement.scala | 2 + .../trino/operation/TrinoOperation.scala | 23 ++- .../trino/operation/progress/TrinoStage.scala | 34 ++++ .../TrinoOperationProgressSuite.scala | 57 ++++++ 5 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala create mode 100644 externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala create mode 100644 externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala new file mode 100644 index 00000000000..0cb13945aa2 --- /dev/null +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.trino + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.immutable.SortedMap + +import io.trino.client.{StageStats, StatementClient} + +import org.apache.kyuubi.engine.trino.TrinoProgressMonitor.{COLUMN_1_WIDTH, HEADERS} +import org.apache.kyuubi.engine.trino.operation.progress.{TrinoOperationProgressStatus, TrinoStage, TrinoStageProgress} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TJobExecutionStatus + +class TrinoProgressMonitor(trino: StatementClient) { + + private lazy val progressMap: Map[TrinoStage, TrinoStageProgress] = { + if (trino != null) { + val prestoStats = trino.getStats + val stageQueue = scala.collection.mutable.Queue[StageStats]() + val stages = scala.collection.mutable.ListBuffer[(TrinoStage, TrinoStageProgress)]() + val rootStage = prestoStats.getRootStage + if (rootStage != null) { + stageQueue.enqueue(rootStage) + } + while (stageQueue.nonEmpty) { + val stage = stageQueue.dequeue() + val stageId = stage.getStageId + val stageProgress = TrinoStageProgress( + stage.getTotalSplits, + stage.getCompletedSplits, + stage.getRunningSplits, + stage.getQueuedSplits) + stages.append((TrinoStage(stageId), stageProgress)) + val subStages = asScalaBuffer(stage.getSubStages) + stageQueue.enqueue(subStages: _*) + } + SortedMap(stages: _*) + } else { + SortedMap() + } + } + + def headers: util.List[String] = HEADERS + + def rows: util.List[util.List[String]] = { + val progressRows = progressMap.map { + case (stage, progress) => + val complete = progress.completedSplits + val total = progress.totalSplits + val running = progress.runningSplits + val queued = progress.queuedSplits + var state = + if (total > 0) { + TrinoOperationProgressStatus.PENDING + } else { + TrinoOperationProgressStatus.FINISHED + } + if (complete > 0 || running > 0 || queued > 0) { + state = + if (complete < total) { + TrinoOperationProgressStatus.RUNNING + } else { + TrinoOperationProgressStatus.FINISHED + } + } + val stageName = "Stage-" + stage.stageId + val nameWithProgress = getNameWithProgress(stageName, complete, total) + val pending = total - complete - running + util.Arrays.asList( + nameWithProgress, + "0", + state.toString, + String.valueOf(total), + String.valueOf(complete), + String.valueOf(running), + String.valueOf(pending), + String.valueOf(queued), + "") + }.toList.asJavaCollection + new util.ArrayList[util.List[String]](progressRows) + } + + def footerSummary: String = { + "STAGES: %02d/%02d".format(getCompletedStages, progressMap.keySet.size) + } + + def progressedPercentage: Double = { + if (trino != null && trino.getStats != null) { + val progressPercentage = trino.getStats.getProgressPercentage + progressPercentage.orElse(0.0d) + } else { + 0.0d + } + } + + def executionStatus: TJobExecutionStatus = + if (getCompletedStages == progressMap.keySet.size) { + TJobExecutionStatus.COMPLETE + } else { + TJobExecutionStatus.IN_PROGRESS + } + + private def getNameWithProgress(s: String, complete: Int, total: Int): String = { + if (s == null) return "" + val percent = + if (total == 0) 1.0f + else complete.toFloat / total.toFloat + // lets use the remaining space in column 1 as progress bar + val spaceRemaining = COLUMN_1_WIDTH - s.length - 1 + var trimmedVName = s + // if the vertex name is longer than column 1 width, trim it down + if (s.length > COLUMN_1_WIDTH) { + trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2) + trimmedVName += ".." + } else trimmedVName += " " + val toFill = (spaceRemaining * percent).toInt + s"$trimmedVName${"." * toFill}" + } + + private def getCompletedStages: Int = { + var completed = 0 + progressMap.values.foreach { progress => + val complete = progress.completedSplits + val total = progress.totalSplits + if (total > 0 && complete == total) completed += 1 + } + completed + } + +} + +object TrinoProgressMonitor { + + private val HEADERS: util.List[String] = util.Arrays.asList( + "STAGES", + "ATTEMPT", + "STATUS", + "TOTAL", + "COMPLETED", + "RUNNING", + "PENDING", + "FAILED", + "") + + private val COLUMN_1_WIDTH = 16 + +} diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala index 250b8d64b1e..4f1b42e1d1b 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala @@ -41,6 +41,8 @@ class ExecuteStatement( private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) + override protected def supportProgress: Boolean = true + override protected def beforeRun(): Unit = { OperationLog.setCurrentOperationLog(operationLog) setState(OperationState.PENDING) diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala index 822f1726a3b..6afd8c09841 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperation.scala @@ -24,16 +24,19 @@ import io.trino.client.StatementClient import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.Utils +import org.apache.kyuubi.config.KyuubiConf.SESSION_PROGRESS_ENABLE import org.apache.kyuubi.engine.trino.TrinoContext +import org.apache.kyuubi.engine.trino.TrinoProgressMonitor import org.apache.kyuubi.engine.trino.schema.{SchemaHelper, TrinoTRowSetGenerator} import org.apache.kyuubi.engine.trino.session.TrinoSessionImpl import org.apache.kyuubi.operation.AbstractOperation import org.apache.kyuubi.operation.FetchIterator import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation} import org.apache.kyuubi.operation.OperationState +import org.apache.kyuubi.operation.OperationStatus import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session -import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp} abstract class TrinoOperation(session: Session) extends AbstractOperation(session) { @@ -45,6 +48,24 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio protected var iter: FetchIterator[List[Any]] = _ + protected def supportProgress: Boolean = false + + private val progressEnable: Boolean = session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE) + + override def getStatus: OperationStatus = { + if (progressEnable && supportProgress) { + val progressMonitor = new TrinoProgressMonitor(trino) + setOperationJobProgress(new TProgressUpdateResp( + progressMonitor.headers, + progressMonitor.rows, + progressMonitor.progressedPercentage, + progressMonitor.executionStatus, + progressMonitor.footerSummary, + startTime)) + } + super.getStatus + } + override def getResultSetMetadata: TGetResultSetMetadataResp = { val tTableSchema = SchemaHelper.toTTableSchema(schema) val resp = new TGetResultSetMetadataResp diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala new file mode 100644 index 00000000000..91fbed9e2e8 --- /dev/null +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.trino.operation.progress + +case class TrinoStage(stageId: String) extends Comparable[TrinoStage] { + override def compareTo(o: TrinoStage): Int = { + stageId.compareTo(o.stageId) + } +} + +case class TrinoStageProgress( + totalSplits: Int, + completedSplits: Int, + runningSplits: Int, + queuedSplits: Int) + +object TrinoOperationProgressStatus extends Enumeration { + type TrinoOperationProgressStatus = Value + val PENDING, RUNNING, FINISHED = Value +} diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala new file mode 100644 index 00000000000..38aa8a7d740 --- /dev/null +++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.trino.operation + +import scala.collection.JavaConverters._ + +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, SESSION_PROGRESS_ENABLE} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq} + +class TrinoOperationProgressSuite extends TrinoOperationSuite { + override def withKyuubiConf: Map[String, String] = Map( + ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory", + SESSION_PROGRESS_ENABLE.key -> "true") + + test("get operation progress") { + val sql = "select date '2011-11-11' - interval '1' day" + + withSessionHandle { (client, handle) => + val req = new TExecuteStatementReq() + req.setStatement(sql) + req.setRunAsync(true) + req.setSessionHandle(handle) + val resp = client.ExecuteStatement(req) + //scalastyle:off + eventually(Timeout(25.seconds)) { + val statusReq = new TGetOperationStatusReq(resp.getOperationHandle) + val statusResp = client.GetOperationStatus(statusReq) + val headers = statusResp.getProgressUpdateResponse.getHeaderNames + val progress = statusResp.getProgressUpdateResponse.getProgressedPercentage + val rows = statusResp.getProgressUpdateResponse.getRows + val footerSummary = statusResp.getProgressUpdateResponse.getFooterSummary + val status = statusResp.getProgressUpdateResponse.getStatus + println(rows.get(0).asScala) + println("headers:" + String.join(",", headers)) + println("progress: " + progress) + } + } + } +} From f635b38de3ee5dec4a901cf6f63df17704808614 Mon Sep 17 00:00:00 2001 From: "taylor.fan" Date: Mon, 21 Oct 2024 15:30:14 +0800 Subject: [PATCH 2/5] [KYUUBI #6726] add test case --- .../engine/trino/TrinoProgressMonitor.scala | 31 +++++------------ .../trino/operation/progress/TrinoStage.scala | 8 ++--- .../TrinoOperationProgressSuite.scala | 33 +++++++++++++++---- 3 files changed, 37 insertions(+), 35 deletions(-) diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala index 0cb13945aa2..5328f092ec9 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProgressMonitor.scala @@ -24,17 +24,17 @@ import scala.collection.immutable.SortedMap import io.trino.client.{StageStats, StatementClient} import org.apache.kyuubi.engine.trino.TrinoProgressMonitor.{COLUMN_1_WIDTH, HEADERS} -import org.apache.kyuubi.engine.trino.operation.progress.{TrinoOperationProgressStatus, TrinoStage, TrinoStageProgress} +import org.apache.kyuubi.engine.trino.operation.progress.{TrinoStage, TrinoStageProgress} import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TJobExecutionStatus class TrinoProgressMonitor(trino: StatementClient) { private lazy val progressMap: Map[TrinoStage, TrinoStageProgress] = { if (trino != null) { - val prestoStats = trino.getStats + val trinoStats = trino.getStats val stageQueue = scala.collection.mutable.Queue[StageStats]() val stages = scala.collection.mutable.ListBuffer[(TrinoStage, TrinoStageProgress)]() - val rootStage = prestoStats.getRootStage + val rootStage = trinoStats.getRootStage if (rootStage != null) { stageQueue.enqueue(rootStage) } @@ -42,10 +42,11 @@ class TrinoProgressMonitor(trino: StatementClient) { val stage = stageQueue.dequeue() val stageId = stage.getStageId val stageProgress = TrinoStageProgress( + stage.getState, stage.getTotalSplits, stage.getCompletedSplits, stage.getRunningSplits, - stage.getQueuedSplits) + stage.getFailedTasks) stages.append((TrinoStage(stageId), stageProgress)) val subStages = asScalaBuffer(stage.getSubStages) stageQueue.enqueue(subStages: _*) @@ -64,33 +65,18 @@ class TrinoProgressMonitor(trino: StatementClient) { val complete = progress.completedSplits val total = progress.totalSplits val running = progress.runningSplits - val queued = progress.queuedSplits - var state = - if (total > 0) { - TrinoOperationProgressStatus.PENDING - } else { - TrinoOperationProgressStatus.FINISHED - } - if (complete > 0 || running > 0 || queued > 0) { - state = - if (complete < total) { - TrinoOperationProgressStatus.RUNNING - } else { - TrinoOperationProgressStatus.FINISHED - } - } + val failed = progress.failedTasks val stageName = "Stage-" + stage.stageId val nameWithProgress = getNameWithProgress(stageName, complete, total) val pending = total - complete - running util.Arrays.asList( nameWithProgress, - "0", - state.toString, + progress.state, String.valueOf(total), String.valueOf(complete), String.valueOf(running), String.valueOf(pending), - String.valueOf(queued), + String.valueOf(failed), "") }.toList.asJavaCollection new util.ArrayList[util.List[String]](progressRows) @@ -149,7 +135,6 @@ object TrinoProgressMonitor { private val HEADERS: util.List[String] = util.Arrays.asList( "STAGES", - "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala index 91fbed9e2e8..97ae986772c 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala @@ -23,12 +23,8 @@ case class TrinoStage(stageId: String) extends Comparable[TrinoStage] { } case class TrinoStageProgress( + state: String, totalSplits: Int, completedSplits: Int, runningSplits: Int, - queuedSplits: Int) - -object TrinoOperationProgressStatus extends Enumeration { - type TrinoOperationProgressStatus = Value - val PENDING, RUNNING, FINISHED = Value -} + failedTasks: Int) \ No newline at end of file diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala index 38aa8a7d740..21481588f68 100644 --- a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala +++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala @@ -23,7 +23,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, SESSION_PROGRESS_ENABLE} -import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TJobExecutionStatus} class TrinoOperationProgressSuite extends TrinoOperationSuite { override def withKyuubiConf: Map[String, String] = Map( @@ -31,7 +31,7 @@ class TrinoOperationProgressSuite extends TrinoOperationSuite { SESSION_PROGRESS_ENABLE.key -> "true") test("get operation progress") { - val sql = "select date '2011-11-11' - interval '1' day" + val sql = "SELECT DECIMAL '1.2' as col1, DECIMAL '1.23' AS col2" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() @@ -39,7 +39,6 @@ class TrinoOperationProgressSuite extends TrinoOperationSuite { req.setRunAsync(true) req.setSessionHandle(handle) val resp = client.ExecuteStatement(req) - //scalastyle:off eventually(Timeout(25.seconds)) { val statusReq = new TGetOperationStatusReq(resp.getOperationHandle) val statusResp = client.GetOperationStatus(statusReq) @@ -48,9 +47,31 @@ class TrinoOperationProgressSuite extends TrinoOperationSuite { val rows = statusResp.getProgressUpdateResponse.getRows val footerSummary = statusResp.getProgressUpdateResponse.getFooterSummary val status = statusResp.getProgressUpdateResponse.getStatus - println(rows.get(0).asScala) - println("headers:" + String.join(",", headers)) - println("progress: " + progress) + assertResult(Seq( + "STAGES", + "STATUS", + "TOTAL", + "COMPLETED", + "RUNNING", + "PENDING", + "FAILED", + ""))(headers.asScala) + assert(rows.size() == 1) + progress match { + case 100.0 => + assertResult(Seq( + s"Stage-0 ........", + "FINISHED", + "1", + "1", + "0", + "0", + "0", + ""))( + rows.get(0).asScala) + assert("STAGES: 01/01" === footerSummary) + assert(TJobExecutionStatus.COMPLETE === status) + } } } } From 2b1c776e1486b003864de5306c3235fb25274c1f Mon Sep 17 00:00:00 2001 From: "taylor.fan" Date: Mon, 21 Oct 2024 15:33:43 +0800 Subject: [PATCH 3/5] [KYUUBI #6726] reformat code --- .../kyuubi/engine/trino/operation/progress/TrinoStage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala index 97ae986772c..ce1a89ea611 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/progress/TrinoStage.scala @@ -27,4 +27,4 @@ case class TrinoStageProgress( totalSplits: Int, completedSplits: Int, runningSplits: Int, - failedTasks: Int) \ No newline at end of file + failedTasks: Int) From d84904e82b39f1c74ef748689eeb9a5e92f87262 Mon Sep 17 00:00:00 2001 From: "taylor.fan" Date: Mon, 21 Oct 2024 15:45:49 +0800 Subject: [PATCH 4/5] [KYUUBI #6726] reformat code --- .../trino/operation/TrinoOperationProgressSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala index 21481588f68..20289c84a12 100644 --- a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala +++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala @@ -31,7 +31,8 @@ class TrinoOperationProgressSuite extends TrinoOperationSuite { SESSION_PROGRESS_ENABLE.key -> "true") test("get operation progress") { - val sql = "SELECT DECIMAL '1.2' as col1, DECIMAL '1.23' AS col2" + val sql = "select * from (select item from (SELECT sequence(0, 100, 1) as t) as a " + + "CROSS JOIN UNNEST(t) AS temTable (item)) WHERE random() < 0.1" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() @@ -62,8 +63,8 @@ class TrinoOperationProgressSuite extends TrinoOperationSuite { assertResult(Seq( s"Stage-0 ........", "FINISHED", - "1", - "1", + "5", + "5", "0", "0", "0", From 6646c9511222ad10b681ad9201445415de472947 Mon Sep 17 00:00:00 2001 From: "taylor.fan" Date: Thu, 21 Nov 2024 17:42:49 +0800 Subject: [PATCH 5/5] [KYUUBI #6726] update test case result --- .../engine/trino/operation/TrinoOperationProgressSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala index 20289c84a12..0132735ff2f 100644 --- a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala +++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationProgressSuite.scala @@ -63,8 +63,8 @@ class TrinoOperationProgressSuite extends TrinoOperationSuite { assertResult(Seq( s"Stage-0 ........", "FINISHED", - "5", - "5", + "3", + "3", "0", "0", "0",