From 82abc21ae4847a7e800cb8db6375a49da21cc75c Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Sat, 12 Oct 2024 15:02:50 +0800 Subject: [PATCH] [spark] Support scala 2.13 (#4302) --- ...ink-jdk11.yml => utitcase-flink-jdk11.yml} | 0 ...{unitcase-jdk11.yml => utitcase-jdk11.yml} | 9 ++- .../workflows/utitcase-spark-scala2.13.yml | 63 +++++++++++++++++++ .github/workflows/utitcase-spark.yml | 63 +++++++++++++++++++ .github/workflows/utitcase.yml | 9 ++- paimon-codegen/pom.xml | 10 ++- paimon-e2e-tests/pom.xml | 2 +- paimon-flink/paimon-flink-1.15/pom.xml | 4 +- paimon-flink/paimon-flink-1.16/pom.xml | 2 +- paimon-flink/paimon-flink-1.17/pom.xml | 2 +- paimon-flink/paimon-flink-1.18/pom.xml | 2 +- paimon-flink/paimon-flink-1.19/pom.xml | 2 +- paimon-flink/paimon-flink-cdc/pom.xml | 4 +- paimon-flink/paimon-flink-common/pom.xml | 4 +- paimon-hive/paimon-hive-connector-2.3/pom.xml | 2 +- paimon-hive/paimon-hive-connector-3.1/pom.xml | 2 +- .../paimon-hive-connector-common/pom.xml | 4 +- paimon-spark/paimon-spark-3.2/pom.xml | 48 ++++++-------- paimon-spark/paimon-spark-3.3/pom.xml | 48 ++++++-------- paimon-spark/paimon-spark-3.4/pom.xml | 48 ++++++-------- paimon-spark/paimon-spark-3.5/pom.xml | 24 +++++-- paimon-spark/paimon-spark-common/pom.xml | 46 ++++++-------- .../paimon/spark/PaimonStatistics.scala | 4 +- .../org/apache/paimon/spark/ScanHelper.scala | 2 +- .../analysis/PaimonMergeIntoBase.scala | 2 +- .../catalyst/analysis/PaimonUpdateTable.scala | 2 +- .../EvalSubqueriesForDeleteTable.scala | 5 +- .../MergePaimonScalarSubqueriesBase.scala | 2 +- .../DeleteFromPaimonTableCommand.scala | 2 +- .../spark/commands/MergeIntoPaimonTable.scala | 2 +- .../PaimonAnalyzeTableColumnCommand.scala | 6 +- .../paimon/spark/commands/PaimonCommand.scala | 4 +- .../spark/commands/PaimonSparkWriter.scala | 4 +- .../spark/commands/SparkDataFileMeta.scala | 2 +- .../spark/orphan/SparkOrphanFilesClean.scala | 4 +- .../statistics/StatisticsHelperBase.scala | 4 +- .../paimon/spark/util/OptionUtils.scala | 1 + .../PaimonSqlExtensionsAstBuilder.scala | 6 +- .../paimon/spark/SparkInternalRowTest.java | 10 ++- .../spark/sql/AnalyzeTableTestBase.scala | 14 ++--- pom.xml | 23 ++++++- 41 files changed, 315 insertions(+), 182 deletions(-) rename .github/workflows/{unitcase-flink-jdk11.yml => utitcase-flink-jdk11.yml} (100%) rename .github/workflows/{unitcase-jdk11.yml => utitcase-jdk11.yml} (81%) create mode 100644 .github/workflows/utitcase-spark-scala2.13.yml create mode 100644 .github/workflows/utitcase-spark.yml diff --git a/.github/workflows/unitcase-flink-jdk11.yml b/.github/workflows/utitcase-flink-jdk11.yml similarity index 100% rename from .github/workflows/unitcase-flink-jdk11.yml rename to .github/workflows/utitcase-flink-jdk11.yml diff --git a/.github/workflows/unitcase-jdk11.yml b/.github/workflows/utitcase-jdk11.yml similarity index 81% rename from .github/workflows/unitcase-jdk11.yml rename to .github/workflows/utitcase-jdk11.yml index 1baed87f9027..f1d7c25cbe44 100644 --- a/.github/workflows/unitcase-jdk11.yml +++ b/.github/workflows/utitcase-jdk11.yml @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -name: UTCase and ITCase Non Flink on JDK 11 +name: UTCase and ITCase Others on JDK 11 on: issue_comment: @@ -52,6 +52,11 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B clean install -pl '!paimon-e2e-tests,!org.apache.paimon:paimon-hive-connector-3.1' -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone + test_modules="!paimon-e2e-tests,!org.apache.paimon:paimon-hive-connector-3.1," + for suffix in 3.5 3.4 3.3 3.2 common; do + test_modules+="!org.apache.paimon:paimon-spark-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B clean install -pl "${test_modules}" -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/utitcase-spark-scala2.13.yml b/.github/workflows/utitcase-spark-scala2.13.yml new file mode 100644 index 000000000000..05ee066c94bd --- /dev/null +++ b/.github/workflows/utitcase-spark-scala2.13.yml @@ -0,0 +1,63 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +name: UTCase and ITCase Spark on Scala 2.13 + +on: + push: + pull_request: + paths-ignore: + - 'docs/**' + - '**/*.md' + +env: + JDK_VERSION: 8 + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v2 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'adopt' + - name: Build Spark + run: mvn -T 1C -B clean install -DskipTests -Pscala-2.13 + - name: Test Spark + timeout-minutes: 60 + run: | + # run tests with random timezone to find out timezone related bugs + . .github/workflows/utils.sh + jvm_timezone=$(random_timezone) + echo "JVM timezone is set to $jvm_timezone" + test_modules="" + for suffix in common 3.5 3.4 3.3 3.2; do + test_modules+="org.apache.paimon:paimon-spark-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pscala-2.13 + env: + MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/utitcase-spark.yml b/.github/workflows/utitcase-spark.yml new file mode 100644 index 000000000000..0561b3857072 --- /dev/null +++ b/.github/workflows/utitcase-spark.yml @@ -0,0 +1,63 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +name: UTCase and ITCase Spark + +on: + push: + pull_request: + paths-ignore: + - 'docs/**' + - '**/*.md' + +env: + JDK_VERSION: 8 + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v2 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'adopt' + - name: Build Spark + run: mvn -T 1C -B clean install -DskipTests + - name: Test Spark + timeout-minutes: 60 + run: | + # run tests with random timezone to find out timezone related bugs + . .github/workflows/utils.sh + jvm_timezone=$(random_timezone) + echo "JVM timezone is set to $jvm_timezone" + test_modules="" + for suffix in common 3.5 3.4 3.3 3.2; do + test_modules+="org.apache.paimon:paimon-spark-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone + env: + MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml index 431b44332232..7963e7c210cb 100644 --- a/.github/workflows/utitcase.yml +++ b/.github/workflows/utitcase.yml @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -name: UTCase and ITCase Non Flink +name: UTCase and ITCase Others on: push: @@ -53,6 +53,11 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B clean install -pl '!paimon-e2e-tests' -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone + test_modules="!paimon-e2e-tests," + for suffix in 3.5 3.4 3.3 3.2 common; do + test_modules+="!org.apache.paimon:paimon-spark-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B clean install -pl "${test_modules}" -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/paimon-codegen/pom.xml b/paimon-codegen/pom.xml index 8a43f390990f..4a40d487dc25 100644 --- a/paimon-codegen/pom.xml +++ b/paimon-codegen/pom.xml @@ -41,19 +41,19 @@ under the License. org.scala-lang scala-library - ${scala.version} + ${codegen.scala.version} org.scala-lang scala-reflect - ${scala.version} + ${codegen.scala.version} org.scala-lang scala-compiler - ${scala.version} + ${codegen.scala.version} @@ -86,6 +86,10 @@ under the License. + + + ${codegen.scala.version} + diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml index 7ca13535e470..abe9ca896b99 100644 --- a/paimon-e2e-tests/pom.xml +++ b/paimon-e2e-tests/pom.xml @@ -34,7 +34,7 @@ under the License. 2.8.3-10.0 3.1.1 - flink-sql-connector-hive-2.3.10_${scala.binary.version} + flink-sql-connector-hive-2.3.10_${flink.scala.binary.version} diff --git a/paimon-flink/paimon-flink-1.15/pom.xml b/paimon-flink/paimon-flink-1.15/pom.xml index bfc6ec53c404..21c179226bf0 100644 --- a/paimon-flink/paimon-flink-1.15/pom.xml +++ b/paimon-flink/paimon-flink-1.15/pom.xml @@ -87,14 +87,14 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test-jar test diff --git a/paimon-flink/paimon-flink-1.16/pom.xml b/paimon-flink/paimon-flink-1.16/pom.xml index 0edfc9e20700..c5a334d6fce6 100644 --- a/paimon-flink/paimon-flink-1.16/pom.xml +++ b/paimon-flink/paimon-flink-1.16/pom.xml @@ -90,7 +90,7 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test diff --git a/paimon-flink/paimon-flink-1.17/pom.xml b/paimon-flink/paimon-flink-1.17/pom.xml index 112ecc4c7c7c..06cc4f330964 100644 --- a/paimon-flink/paimon-flink-1.17/pom.xml +++ b/paimon-flink/paimon-flink-1.17/pom.xml @@ -97,7 +97,7 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test diff --git a/paimon-flink/paimon-flink-1.18/pom.xml b/paimon-flink/paimon-flink-1.18/pom.xml index 5aaf42eb4971..add1915a5001 100644 --- a/paimon-flink/paimon-flink-1.18/pom.xml +++ b/paimon-flink/paimon-flink-1.18/pom.xml @@ -90,7 +90,7 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test diff --git a/paimon-flink/paimon-flink-1.19/pom.xml b/paimon-flink/paimon-flink-1.19/pom.xml index ab8c2540e9cf..371ba54d1c8b 100644 --- a/paimon-flink/paimon-flink-1.19/pom.xml +++ b/paimon-flink/paimon-flink-1.19/pom.xml @@ -97,7 +97,7 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml index 48f0d5a13d63..7f5bb3e786da 100644 --- a/paimon-flink/paimon-flink-cdc/pom.xml +++ b/paimon-flink/paimon-flink-cdc/pom.xml @@ -185,14 +185,14 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test test-jar diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index ef0f7fe1776a..3463aa9d96db 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -122,7 +122,7 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test test-jar @@ -130,7 +130,7 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${flink.version} test diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml index e61e493d3a5c..22f93ed0e192 100644 --- a/paimon-hive/paimon-hive-connector-2.3/pom.xml +++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml @@ -81,7 +81,7 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${test.flink.version} test diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml index 3e08196a76af..70afb0e4e26e 100644 --- a/paimon-hive/paimon-hive-connector-3.1/pom.xml +++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml @@ -88,7 +88,7 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${test.flink.version} test diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml index 0f7da151b884..f22b73dea71e 100644 --- a/paimon-hive/paimon-hive-connector-common/pom.xml +++ b/paimon-hive/paimon-hive-connector-common/pom.xml @@ -111,14 +111,14 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner_${flink.scala.binary.version} ${test.flink.version} test org.apache.flink - flink-connector-hive_${scala.binary.version} + flink-connector-hive_${flink.scala.binary.version} ${test.flink.version} test diff --git a/paimon-spark/paimon-spark-3.2/pom.xml b/paimon-spark/paimon-spark-3.2/pom.xml index 24e5198e9b2e..b12f4ba86d00 100644 --- a/paimon-spark/paimon-spark-3.2/pom.xml +++ b/paimon-spark/paimon-spark-3.2/pom.xml @@ -36,6 +36,18 @@ under the License. + + org.apache.paimon + paimon-bundle + ${project.version} + + + * + * + + + + org.apache.paimon paimon-spark-common @@ -60,7 +72,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} @@ -97,7 +109,7 @@ under the License. org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} @@ -141,7 +153,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} tests test @@ -154,23 +166,11 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - - - org.apache.parquet - parquet-column - - - org.apache.parquet - parquet-hadoop - org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} tests test @@ -187,15 +187,11 @@ under the License. org.apache.logging.log4j log4j-slf4j2-impl - - org.apache.orc - orc-core - org.apache.spark - spark-catalyst_2.12 + spark-catalyst_${scala.binary.version} ${spark.version} tests test @@ -208,10 +204,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - @@ -223,7 +215,7 @@ under the License. org.apache.spark - spark-hive_2.12 + spark-hive_${scala.binary.version} ${spark.version} test @@ -235,10 +227,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - com.google.protobuf protobuf-java diff --git a/paimon-spark/paimon-spark-3.3/pom.xml b/paimon-spark/paimon-spark-3.3/pom.xml index 3950ff9f8aca..b89ec3eb4134 100644 --- a/paimon-spark/paimon-spark-3.3/pom.xml +++ b/paimon-spark/paimon-spark-3.3/pom.xml @@ -36,6 +36,18 @@ under the License. + + org.apache.paimon + paimon-bundle + ${project.version} + + + * + * + + + + org.apache.paimon paimon-spark-common @@ -60,7 +72,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} @@ -93,7 +105,7 @@ under the License. org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} @@ -137,7 +149,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} tests test @@ -150,23 +162,11 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - - - org.apache.parquet - parquet-column - - - org.apache.parquet - parquet-hadoop - org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} tests test @@ -183,15 +183,11 @@ under the License. org.apache.logging.log4j log4j-slf4j2-impl - - org.apache.orc - orc-core - org.apache.spark - spark-catalyst_2.12 + spark-catalyst_${scala.binary.version} ${spark.version} tests test @@ -204,10 +200,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - @@ -218,7 +210,7 @@ under the License. org.apache.spark - spark-hive_2.12 + spark-hive_${scala.binary.version} ${spark.version} test @@ -230,10 +222,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - com.google.protobuf protobuf-java diff --git a/paimon-spark/paimon-spark-3.4/pom.xml b/paimon-spark/paimon-spark-3.4/pom.xml index 000acdc26c3c..c7bf217855c1 100644 --- a/paimon-spark/paimon-spark-3.4/pom.xml +++ b/paimon-spark/paimon-spark-3.4/pom.xml @@ -36,6 +36,18 @@ under the License. + + org.apache.paimon + paimon-bundle + ${project.version} + + + * + * + + + + org.apache.paimon paimon-spark-common @@ -60,7 +72,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} @@ -93,7 +105,7 @@ under the License. org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} @@ -137,7 +149,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} tests test @@ -150,23 +162,11 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - - - org.apache.parquet - parquet-column - - - org.apache.parquet - parquet-hadoop - org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} tests test @@ -183,15 +183,11 @@ under the License. org.apache.logging.log4j log4j-slf4j2-impl - - org.apache.orc - orc-core - org.apache.spark - spark-catalyst_2.12 + spark-catalyst_${scala.binary.version} ${spark.version} tests test @@ -204,10 +200,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - @@ -218,7 +210,7 @@ under the License. org.apache.spark - spark-hive_2.12 + spark-hive_${scala.binary.version} ${spark.version} test @@ -230,10 +222,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - com.google.protobuf protobuf-java diff --git a/paimon-spark/paimon-spark-3.5/pom.xml b/paimon-spark/paimon-spark-3.5/pom.xml index 0ba453068c07..08d2181009d6 100644 --- a/paimon-spark/paimon-spark-3.5/pom.xml +++ b/paimon-spark/paimon-spark-3.5/pom.xml @@ -36,6 +36,18 @@ under the License. + + org.apache.paimon + paimon-bundle + ${project.version} + + + * + * + + + + org.apache.paimon paimon-spark-common @@ -60,7 +72,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} @@ -89,7 +101,7 @@ under the License. org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} @@ -133,7 +145,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} tests test @@ -158,7 +170,7 @@ under the License. org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} tests test @@ -183,7 +195,7 @@ under the License. org.apache.spark - spark-catalyst_2.12 + spark-catalyst_${scala.binary.version} ${spark.version} tests test @@ -210,7 +222,7 @@ under the License. org.apache.spark - spark-hive_2.12 + spark-hive_${scala.binary.version} ${spark.version} test diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml index e9ca8b0e1bee..36139e283261 100644 --- a/paimon-spark/paimon-spark-common/pom.xml +++ b/paimon-spark/paimon-spark-common/pom.xml @@ -38,6 +38,18 @@ under the License. + + org.apache.paimon + paimon-bundle + ${project.version} + + + * + * + + + + org.scala-lang scala-library @@ -56,14 +68,14 @@ under the License. org.apache.spark - spark-avro_2.12 + spark-avro_${scala.binary.version} ${spark.version} test org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} @@ -96,7 +108,7 @@ under the License. org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} @@ -133,7 +145,7 @@ under the License. org.apache.spark - spark-hive_2.12 + spark-hive_${scala.binary.version} ${spark.version} test @@ -145,10 +157,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - com.google.protobuf protobuf-java @@ -158,7 +166,7 @@ under the License. org.apache.spark - spark-sql_2.12 + spark-sql_${scala.binary.version} ${spark.version} tests test @@ -171,19 +179,11 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - - - org.apache.parquet - parquet-column - org.apache.spark - spark-core_2.12 + spark-core_${scala.binary.version} ${spark.version} tests test @@ -200,10 +200,6 @@ under the License. org.apache.logging.log4j log4j-slf4j2-impl - - org.apache.orc - orc-core - com.google.protobuf protobuf-java @@ -212,7 +208,7 @@ under the License. org.apache.spark - spark-catalyst_2.12 + spark-catalyst_${scala.binary.version} ${spark.version} tests test @@ -225,10 +221,6 @@ under the License. org.slf4j slf4j-log4j12 - - org.apache.orc - orc-core - diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala index 15a62f266c56..28af4ac0a4fd 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala @@ -129,12 +129,12 @@ object PaimonColumnStats { def apply(v1ColStats: ColumnStat): PaimonColumnStats = { import PaimonImplicits._ PaimonColumnStats( - if (v1ColStats.nullCount.isDefined) OptionalLong.of(v1ColStats.nullCount.get.longValue()) + if (v1ColStats.nullCount.isDefined) OptionalLong.of(v1ColStats.nullCount.get.longValue) else OptionalLong.empty(), v1ColStats.min, v1ColStats.max, if (v1ColStats.distinctCount.isDefined) - OptionalLong.of(v1ColStats.distinctCount.get.longValue()) + OptionalLong.of(v1ColStats.distinctCount.get.longValue) else OptionalLong.empty(), if (v1ColStats.avgLen.isDefined) OptionalLong.of(v1ColStats.avgLen.get.longValue()) else OptionalLong.empty(), diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala index a5d4f10bac33..b5b56ba1d509 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala @@ -76,7 +76,7 @@ trait ScanHelper extends Logging { def closeDataSplit(): Unit = { if (currentSplit.nonEmpty && currentDataFiles.nonEmpty) { val newSplit = - copyDataSplit(currentSplit.get, currentDataFiles, currentDeletionFiles) + copyDataSplit(currentSplit.get, currentDataFiles.toSeq, currentDeletionFiles.toSeq) currentSplits += newSplit } currentDataFiles.clear() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala index ba6108395a7c..f2530b50c04c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -53,7 +53,7 @@ trait PaimonMergeIntoBase merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition) val updateActions = merge.matchedActions.collect { case a: UpdateAction => a } - val primaryKeys = v2Table.getTable.primaryKeys().asScala + val primaryKeys = v2Table.getTable.primaryKeys().asScala.toSeq if (primaryKeys.nonEmpty) { checkUpdateActionValidity( AttributeSet(targetOutput), diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala index 123c67a2fc20..ad3912ddb70d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala @@ -41,7 +41,7 @@ object PaimonUpdateTable table.getTable match { case paimonTable: FileStoreTable => - val primaryKeys = paimonTable.primaryKeys().asScala + val primaryKeys = paimonTable.primaryKeys().asScala.toSeq if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) { throw new RuntimeException("Can't update the primary key column.") } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala index 5d264370adcd..4cf9284f97f6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala @@ -49,7 +49,10 @@ object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with ExpressionHel plan.transformDown { case d @ DeleteFromPaimonTableCommand(_, table, condition) if SubqueryExpression.hasSubquery(condition) && - isPredicatePartitionColumnsOnly(condition, table.partitionKeys().asScala, resolver) => + isPredicatePartitionColumnsOnly( + condition, + table.partitionKeys().asScala.toSeq, + resolver) => try { d.copy(condition = evalSubquery(condition)) } catch { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala index eca8c9cdfced..a59c2a3fe25b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala @@ -95,7 +95,7 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe val newPlan = removeReferences(planWithReferences, cache) val subqueryCTEs = cache.filter(_.merged).map(_.plan.asInstanceOf[CTERelationDef]) if (subqueryCTEs.nonEmpty) { - WithCTE(newPlan, subqueryCTEs) + WithCTE(newPlan, subqueryCTEs.toSeq) } else { newPlan } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 2b3888911226..0087ffb740f2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -60,7 +60,7 @@ case class DeleteFromPaimonTableCommand( } else { val (partitionCondition, otherCondition) = splitPruePartitionAndOtherPredicates( condition, - table.partitionKeys().asScala, + table.partitionKeys().asScala.toSeq, sparkSession.sessionState.conf.resolver) val partitionPredicate = if (partitionCondition.isEmpty) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index 51ae6e086448..c64f20c68aca 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -243,7 +243,7 @@ case class MergeIntoPaimonTable( val outputFields = mutable.ArrayBuffer(tableSchema.fields: _*) outputFields += StructField(ROW_KIND_COL, ByteType) outputFields ++= metadataCols.map(_.toStructField) - val outputSchema = StructType(outputFields) + val outputSchema = StructType(outputFields.toSeq) val joinedRowEncoder = EncoderUtils.encode(joinedPlan.schema) val outputEncoder = EncoderUtils.encode(outputSchema).resolveAndBind() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala index 6b3d968136b4..19f73cb6cc68 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala @@ -25,8 +25,8 @@ import org.apache.paimon.stats.{ColStats, Statistics} import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.BatchWriteBuilder import org.apache.paimon.table.source.DataSplit +import org.apache.paimon.utils.Preconditions.checkState -import org.apache.parquet.Preconditions import org.apache.spark.sql.{PaimonStatsUtils, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.ColumnStat @@ -74,9 +74,7 @@ case class PaimonAnalyzeTableColumnCommand( PaimonStatsUtils.computeColumnStats(sparkSession, relation, attributes) val totalRecordCount = currentSnapshot.totalRecordCount() - Preconditions.checkState( - totalRecordCount >= mergedRecordCount, - s"totalRecordCount: $totalRecordCount should be greater or equal than mergedRecordCount: $mergedRecordCount.") + checkState(totalRecordCount >= mergedRecordCount) val mergedRecordSize = totalSize * (mergedRecordCount.toDouble / totalRecordCount).toLong // convert to paimon stats diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index 62b8ee8498fa..191d7a766b71 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -101,7 +101,7 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon filter.foreach(snapshotReader.withFilter) } - snapshotReader.read().splits().asScala.collect { case s: DataSplit => s } + snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toSeq } protected def findTouchedFiles( @@ -232,7 +232,7 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon .as[(String, Long)] .groupByKey(_._1) .mapGroups { - case (filePath, iter) => + (filePath, iter) => val dv = new BitmapDeletionVector() while (iter.hasNext) { dv.delete(iter.next()._2) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 391ba2b87c93..7d56fe867a1b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -250,7 +250,7 @@ case class PaimonSparkWriter(table: FileStoreTable) { val serializedCommits = deletionVectors .groupByKey(_.partitionAndBucket) .mapGroups { - case (_, iter: Iterator[SparkDeletionVectors]) => + (_, iter: Iterator[SparkDeletionVectors]) => val indexHandler = table.store().newIndexFileHandler() var dvIndexFileMaintainer: AppendDeletionFileMaintainer = null while (iter.hasNext) { @@ -397,7 +397,7 @@ case class PaimonSparkWriter(table: FileStoreTable) { } private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = { - val partitionCols = tableSchema.partitionKeys().asScala.map(col) + val partitionCols = tableSchema.partitionKeys().asScala.map(col).toSeq df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala index b380d36c3f81..9c377b47c4fd 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala @@ -58,7 +58,7 @@ object SparkDataFileMeta { file, dvFactory.create(file.fileName())) } - } + }.toSeq def convertToDataSplits( sparkDataFiles: Array[SparkDataFileMeta], diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index d79105e24eec..488d70e34935 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -62,7 +62,7 @@ case class SparkOrphanFilesClean( val maxBranchParallelism = Math.min(branches.size(), parallelism) // find snapshots using branch and find manifests(manifest, index, statistics) using snapshot val usedManifestFiles = spark.sparkContext - .parallelize(branches.asScala, maxBranchParallelism) + .parallelize(branches.asScala.toSeq, maxBranchParallelism) .mapPartitions(_.flatMap { branch => safelyGetAllSnapshots(branch).asScala.map(snapshot => (branch, snapshot.toJson)) }) @@ -114,7 +114,7 @@ case class SparkOrphanFilesClean( .toDF("used_name") // find candidate files which can be removed - val fileDirs = listPaimonFileDirs.asScala.map(_.toUri.toString) + val fileDirs = listPaimonFileDirs.asScala.map(_.toUri.toString).toSeq val maxFileDirsParallelism = Math.min(fileDirs.size, parallelism) val candidates = spark.sparkContext .parallelize(fileDirs, maxFileDirsParallelism) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala index 32fa48210c7f..627c6a168819 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala @@ -77,11 +77,11 @@ trait StatisticsHelperBase extends SQLConfHelper { private def toV2Stats(v1Stats: logical.Statistics): Statistics = { new Statistics() { override def sizeInBytes(): OptionalLong = if (v1Stats.sizeInBytes != null) - OptionalLong.of(v1Stats.sizeInBytes.longValue()) + OptionalLong.of(v1Stats.sizeInBytes.longValue) else OptionalLong.empty() override def numRows(): OptionalLong = if (v1Stats.rowCount.isDefined) - OptionalLong.of(v1Stats.rowCount.get.longValue()) + OptionalLong.of(v1Stats.rowCount.get.longValue) else OptionalLong.empty() override def columnStats(): java.util.Map[NamedReference, ColumnStatistics] = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala index af7ff7204cda..5b762ffb49de 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala @@ -38,6 +38,7 @@ object OptionUtils extends SQLConfHelper { case (key, value) => key.stripPrefix(PAIMON_OPTION_PREFIX) -> value } + .toMap .asJava) mergedOptions.putAll(extraOptions) mergedOptions diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index 397424c636d1..3424129e957b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -57,8 +57,8 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) /** Creates a [[PaimonCallStatement]] for a stored procedure call. */ override def visitCall(ctx: CallContext): PaimonCallStatement = withOrigin(ctx) { - val name = toSeq(ctx.multipartIdentifier.parts).map(_.getText) - val args = toSeq(ctx.callArgument).map(typedVisit[PaimonCallArgument]) + val name = ctx.multipartIdentifier.parts.asScala.map(_.getText).toSeq + val args = ctx.callArgument.asScala.map(typedVisit[PaimonCallArgument]).toSeq logical.PaimonCallStatement(name, args) } @@ -89,7 +89,7 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) /** Returns a multi-part identifier as Seq[String]. */ override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] = withOrigin(ctx) { - ctx.parts.asScala.map(_.getText) + ctx.parts.asScala.map(_.getText).toSeq } private def toBuffer[T](list: java.util.List[T]) = list.asScala diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java index 9af886d8369f..b98213c0e662 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java @@ -104,8 +104,8 @@ public void test() { + "paimon," + "22.2," + "Map(key2 -> [2.4,3.5], key1 -> [1.2,2.3])," - + "WrappedArray(v1, v5)," - + "WrappedArray(10, 30)," + + "[v1, v5]," + + "[10, 30]," + "true," + "22," + "356," @@ -129,6 +129,12 @@ public void test() { private String sparkRowToString(org.apache.spark.sql.Row row) { return JavaConverters.seqAsJavaList(row.toSeq()).stream() + .map( + x -> + (x instanceof scala.collection.Seq) + ? JavaConverters.seqAsJavaList( + (scala.collection.Seq) x) + : x) .map(Object::toString) // Since the toString result of Spark's binary col is unstable, replace it .map(x -> x.startsWith("[B@") ? "[B@" : x) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala index 1ccf4e38ec4f..4aa73d02600c 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala @@ -403,7 +403,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS") val stats = getScanStatistic("SELECT * FROM T") - Assertions.assertEquals(2L, stats.rowCount.get.longValue()) + Assertions.assertEquals(2L, stats.rowCount.get.longValue) } test("Paimon analyze: spark use col stats") { @@ -418,7 +418,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS") val stats = getScanStatistic("SELECT * FROM T") - Assertions.assertEquals(2L, stats.rowCount.get.longValue()) + Assertions.assertEquals(2L, stats.rowCount.get.longValue) Assertions.assertEquals(if (gteqSpark3_4) 4 else 0, stats.attributeStats.size) } @@ -437,19 +437,19 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { var sql = "SELECT * FROM T WHERE pt < 1" Assertions.assertEquals( if (gteqSpark3_4) 0L else 4L, - getScanStatistic(sql).rowCount.get.longValue()) + getScanStatistic(sql).rowCount.get.longValue) checkAnswer(spark.sql(sql), Nil) // partition push down hit and select without it sql = "SELECT id FROM T WHERE pt < 1" Assertions.assertEquals( if (gteqSpark3_4) 0L else 4L, - getScanStatistic(sql).rowCount.get.longValue()) + getScanStatistic(sql).rowCount.get.longValue) checkAnswer(spark.sql(sql), Nil) // partition push down not hit sql = "SELECT * FROM T WHERE id < 1" - Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue()) + Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue) checkAnswer(spark.sql(sql), Nil) } @@ -468,10 +468,10 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { // For col type such as char, varchar that don't have min and max, filter estimation on stats has no effect. var sqlText = "SELECT * FROM T WHERE pt < '1'" - Assertions.assertEquals(4L, getScanStatistic(sqlText).rowCount.get.longValue()) + Assertions.assertEquals(4L, getScanStatistic(sqlText).rowCount.get.longValue) sqlText = "SELECT id FROM T WHERE pt < '1'" - Assertions.assertEquals(4L, getScanStatistic(sqlText).rowCount.get.longValue()) + Assertions.assertEquals(4L, getScanStatistic(sqlText).rowCount.get.longValue) } }) } diff --git a/pom.xml b/pom.xml index 1f0d47860124..28952f46d6c2 100644 --- a/pom.xml +++ b/pom.xml @@ -80,8 +80,12 @@ under the License. 4.1.100.Final 4.9.3 2.8.5 - 2.12.15 2.12 + 2.12 + 2.12.15 + 2.13.14 + ${scala212.version} + ${scala212.version} 1.1.8.4 0.27 1.8.0 @@ -338,6 +342,19 @@ under the License. + + + scala-2.13 + + 2.13 + ${scala213.version} + + + + scala-2.13 + + + @@ -615,10 +632,10 @@ under the License. - org.apache.flink:flink-table-planner_${scala.binary.version} + org.apache.flink:flink-table-planner_${flink.scala.binary.version} - org.apache.flink:flink-table-planner_${scala.binary.version}:*:*:test + org.apache.flink:flink-table-planner_${flink.scala.binary.version}:*:*:test Direct dependencies on flink-table-planner are not allowed.