diff --git a/.github/workflows/utitcase-spark-3.x.yml b/.github/workflows/utitcase-spark-3.x.yml index 5edcfe49007a..2d3df5f4d005 100644 --- a/.github/workflows/utitcase-spark-3.x.yml +++ b/.github/workflows/utitcase-spark-3.x.yml @@ -54,7 +54,7 @@ jobs: jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" test_modules="" - for suffix in common_2.12 3.5 3.4 3.3 3.2; do + for suffix in ut 3.5 3.4 3.3 3.2; do test_modules+="org.apache.paimon:paimon-spark-${suffix}," done test_modules="${test_modules%,}" diff --git a/.github/workflows/utitcase-spark-4.x.yml b/.github/workflows/utitcase-spark-4.x.yml index 7fbac23dda4f..c58fd7c03be2 100644 --- a/.github/workflows/utitcase-spark-4.x.yml +++ b/.github/workflows/utitcase-spark-4.x.yml @@ -54,7 +54,7 @@ jobs: jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" test_modules="" - for suffix in common_2.13 4.0; do + for suffix in ut 4.0; do test_modules+="org.apache.paimon:paimon-spark-${suffix}," done test_modules="${test_modules%,}" diff --git a/paimon-spark/paimon-spark-3.2/pom.xml b/paimon-spark/paimon-spark-3.2/pom.xml index 626bb5bae833..0d259cd45928 100644 --- a/paimon-spark/paimon-spark-3.2/pom.xml +++ b/paimon-spark/paimon-spark-3.2/pom.xml @@ -36,6 +36,12 @@ under the License. + + org.apache.paimon + paimon-spark3-common + ${project.version} + + org.apache.paimon paimon-spark-common_${scala.binary.version} @@ -63,7 +69,14 @@ under the License. org.apache.paimon - paimon-spark-common_${scala.binary.version} + paimon-spark3-common + ${project.version} + test + + + + org.apache.paimon + paimon-spark-ut ${project.version} tests test diff --git a/paimon-spark/paimon-spark-3.3/pom.xml b/paimon-spark/paimon-spark-3.3/pom.xml index 689e4131ccd9..51a95b89e626 100644 --- a/paimon-spark/paimon-spark-3.3/pom.xml +++ b/paimon-spark/paimon-spark-3.3/pom.xml @@ -36,6 +36,12 @@ under the License. + + org.apache.paimon + paimon-spark3-common + ${project.version} + + org.apache.paimon paimon-spark-common_${scala.binary.version} @@ -63,7 +69,14 @@ under the License. org.apache.paimon - paimon-spark-common_${scala.binary.version} + paimon-spark3-common + ${project.version} + test + + + + org.apache.paimon + paimon-spark-ut ${project.version} tests test diff --git a/paimon-spark/paimon-spark-3.4/pom.xml b/paimon-spark/paimon-spark-3.4/pom.xml index d1ded508a927..24a2c3b65473 100644 --- a/paimon-spark/paimon-spark-3.4/pom.xml +++ b/paimon-spark/paimon-spark-3.4/pom.xml @@ -36,6 +36,12 @@ under the License. + + org.apache.paimon + paimon-spark3-common + ${project.version} + + org.apache.paimon paimon-spark-common_${scala.binary.version} @@ -63,7 +69,14 @@ under the License. org.apache.paimon - paimon-spark-common_${scala.binary.version} + paimon-spark3-common + ${project.version} + test + + + + org.apache.paimon + paimon-spark-ut ${project.version} tests test @@ -126,7 +139,7 @@ under the License. - org.apache.paimon:paimon-spark-common_${scala.binary.version} + org.apache.paimon:paimon-spark3-common diff --git a/paimon-spark/paimon-spark-3.5/pom.xml b/paimon-spark/paimon-spark-3.5/pom.xml index 92803cda540e..5e005741d407 100644 --- a/paimon-spark/paimon-spark-3.5/pom.xml +++ b/paimon-spark/paimon-spark-3.5/pom.xml @@ -36,6 +36,12 @@ under the License. + + org.apache.paimon + paimon-spark3-common + ${project.version} + + org.apache.paimon paimon-spark-common_${scala.binary.version} @@ -63,7 +69,14 @@ under the License. org.apache.paimon - paimon-spark-common_${scala.binary.version} + paimon-spark3-common + ${project.version} + test + + + + org.apache.paimon + paimon-spark-ut ${project.version} tests test diff --git a/paimon-spark/paimon-spark-4.0/pom.xml b/paimon-spark/paimon-spark-4.0/pom.xml index 9f819f820ce2..488d73c3b30c 100644 --- a/paimon-spark/paimon-spark-4.0/pom.xml +++ b/paimon-spark/paimon-spark-4.0/pom.xml @@ -36,6 +36,12 @@ under the License. + + org.apache.paimon + paimon-spark4-common + ${project.version} + + org.apache.paimon paimon-spark-common_${scala.binary.version} @@ -60,10 +66,16 @@ under the License. + + org.apache.paimon + paimon-spark4-common + ${project.version} + test + org.apache.paimon - paimon-spark-common_${scala.binary.version} + paimon-spark-ut ${project.version} tests test diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml index 1cfc53f42d48..64574a6ea15d 100644 --- a/paimon-spark/paimon-spark-common/pom.xml +++ b/paimon-spark/paimon-spark-common/pom.xml @@ -38,18 +38,6 @@ under the License. - - org.apache.paimon - ${paimon-sparkx-common} - ${project.version} - - - * - * - - - - org.apache.spark spark-sql_${scala.binary.version} @@ -72,46 +60,6 @@ under the License. org.apache.paimon paimon-bundle - - - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} - tests - test - - - - org.apache.spark - spark-catalyst_${scala.binary.version} - ${spark.version} - tests - test - - - - org.apache.spark - spark-core_${scala.binary.version} - ${spark.version} - tests - test - - - - org.apache.spark - spark-hive_${scala.binary.version} - ${spark.version} - test - - - - org.apache.spark - spark-avro_${scala.binary.version} - ${spark.version} - test - @@ -155,21 +103,6 @@ under the License. src/main/antlr4 - - - - org.apache.maven.plugins - maven-jar-plugin - - - prepare-test-jar - test-compile - - test-jar - - - - diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java similarity index 67% rename from paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java rename to paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java index 147c6c2d77c8..e40a4e0f25a3 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java @@ -18,24 +18,15 @@ package org.apache.paimon.spark; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.InternalArray; -import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.Timestamp; -import org.apache.paimon.spark.util.shim.TypeUtils; +import org.apache.paimon.spark.data.SparkInternalRow; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeChecks; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.MapType; -import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; -import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; import org.apache.spark.sql.catalyst.util.ArrayData; -import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.BinaryType; import org.apache.spark.sql.types.BooleanType; @@ -61,19 +52,21 @@ import java.util.Objects; +import static org.apache.paimon.spark.DataConverter.fromPaimon; import static org.apache.paimon.utils.InternalRowUtils.copyInternalRow; -/** Spark {@link org.apache.spark.sql.catalyst.InternalRow} to wrap {@link InternalRow}. */ -public class SparkInternalRow extends org.apache.spark.sql.paimon.shims.InternalRow { +/** AbstractSparkInternalRow. */ +public abstract class AbstractSparkInternalRow extends SparkInternalRow { - private final RowType rowType; + protected RowType rowType; - private InternalRow row; + protected InternalRow row; - public SparkInternalRow(RowType rowType) { + public AbstractSparkInternalRow(RowType rowType) { this.rowType = rowType; } + @Override public SparkInternalRow replace(InternalRow row) { this.row = row; return this; @@ -96,7 +89,7 @@ public void update(int i, Object value) { @Override public org.apache.spark.sql.catalyst.InternalRow copy() { - return new SparkInternalRow(rowType).replace(copyInternalRow(row, rowType)); + return SparkInternalRow.create(rowType).replace(copyInternalRow(row, rowType)); } @Override @@ -255,7 +248,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - SparkInternalRow that = (SparkInternalRow) o; + AbstractSparkInternalRow that = (AbstractSparkInternalRow) o; return Objects.equals(rowType, that.rowType) && Objects.equals(row, that.row); } @@ -263,78 +256,4 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(rowType, row); } - - // ================== static methods ========================================= - - public static Object fromPaimon(Object o, DataType type) { - if (o == null) { - return null; - } - switch (type.getTypeRoot()) { - case TIMESTAMP_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return fromPaimon((Timestamp) o); - case CHAR: - case VARCHAR: - return fromPaimon((BinaryString) o); - case DECIMAL: - return fromPaimon((org.apache.paimon.data.Decimal) o); - case ARRAY: - return fromPaimon((InternalArray) o, (ArrayType) type); - case MAP: - case MULTISET: - return fromPaimon((InternalMap) o, type); - case ROW: - return fromPaimon((InternalRow) o, (RowType) type); - default: - return o; - } - } - - public static UTF8String fromPaimon(BinaryString string) { - return UTF8String.fromBytes(string.toBytes()); - } - - public static Decimal fromPaimon(org.apache.paimon.data.Decimal decimal) { - return Decimal.apply(decimal.toBigDecimal()); - } - - public static org.apache.spark.sql.catalyst.InternalRow fromPaimon( - InternalRow row, RowType rowType) { - return new SparkInternalRow(rowType).replace(row); - } - - public static long fromPaimon(Timestamp timestamp) { - if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) { - return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp()); - } else { - return timestamp.toMicros(); - } - } - - public static ArrayData fromPaimon(InternalArray array, ArrayType arrayType) { - return fromPaimonArrayElementType(array, arrayType.getElementType()); - } - - private static ArrayData fromPaimonArrayElementType(InternalArray array, DataType elementType) { - return new SparkArrayData(elementType).replace(array); - } - - public static MapData fromPaimon(InternalMap map, DataType mapType) { - DataType keyType; - DataType valueType; - if (mapType instanceof MapType) { - keyType = ((MapType) mapType).getKeyType(); - valueType = ((MapType) mapType).getValueType(); - } else if (mapType instanceof MultisetType) { - keyType = ((MultisetType) mapType).getElementType(); - valueType = new IntType(); - } else { - throw new UnsupportedOperationException("Unsupported type: " + mapType); - } - - return new ArrayBasedMapData( - fromPaimonArrayElementType(map.keyArray(), keyType), - fromPaimonArrayElementType(map.valueArray(), valueType)); - } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java new file mode 100644 index 000000000000..aa1a4c9c6131 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.spark.data.SparkArrayData; +import org.apache.paimon.spark.data.SparkInternalRow; +import org.apache.paimon.spark.util.shim.TypeUtils; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; + +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.UTF8String; + +/** DataConverter. */ +public class DataConverter { + + public static Object fromPaimon(Object o, DataType type) { + if (o == null) { + return null; + } + switch (type.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return fromPaimon((Timestamp) o); + case CHAR: + case VARCHAR: + return fromPaimon((BinaryString) o); + case DECIMAL: + return fromPaimon((org.apache.paimon.data.Decimal) o); + case ARRAY: + return fromPaimon((InternalArray) o, (ArrayType) type); + case MAP: + case MULTISET: + return fromPaimon((InternalMap) o, type); + case ROW: + return fromPaimon((InternalRow) o, (RowType) type); + default: + return o; + } + } + + public static UTF8String fromPaimon(BinaryString string) { + return UTF8String.fromBytes(string.toBytes()); + } + + public static Decimal fromPaimon(org.apache.paimon.data.Decimal decimal) { + return Decimal.apply(decimal.toBigDecimal()); + } + + public static org.apache.spark.sql.catalyst.InternalRow fromPaimon( + InternalRow row, RowType rowType) { + return SparkInternalRow.create(rowType).replace(row); + } + + public static long fromPaimon(Timestamp timestamp) { + if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) { + return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp()); + } else { + return timestamp.toMicros(); + } + } + + public static ArrayData fromPaimon(InternalArray array, ArrayType arrayType) { + return fromPaimonArrayElementType(array, arrayType.getElementType()); + } + + private static ArrayData fromPaimonArrayElementType(InternalArray array, DataType elementType) { + return SparkArrayData.create(elementType).replace(array); + } + + public static MapData fromPaimon(InternalMap map, DataType mapType) { + DataType keyType; + DataType valueType; + if (mapType instanceof MapType) { + keyType = ((MapType) mapType).getKeyType(); + valueType = ((MapType) mapType).getValueType(); + } else if (mapType instanceof MultisetType) { + keyType = ((MultisetType) mapType).getElementType(); + valueType = new IntType(); + } else { + throw new UnsupportedOperationException("Unsupported type: " + mapType); + } + + return new ArrayBasedMapData( + fromPaimonArrayElementType(map.keyArray(), keyType), + fromPaimonArrayElementType(map.valueArray(), valueType)); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java deleted file mode 100644 index 9934047a1825..000000000000 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark; - -import org.apache.paimon.data.InternalArray; -import org.apache.paimon.types.ArrayType; -import org.apache.paimon.types.BigIntType; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypeChecks; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.InternalRowUtils; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader; -import org.apache.spark.sql.catalyst.util.ArrayData; -import org.apache.spark.sql.catalyst.util.MapData; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.unsafe.types.CalendarInterval; -import org.apache.spark.unsafe.types.UTF8String; - -import static org.apache.paimon.spark.SparkInternalRow.fromPaimon; -import static org.apache.paimon.utils.InternalRowUtils.copyArray; - -/** Spark {@link ArrayData} to wrap Paimon {@link InternalArray}. */ -public class SparkArrayData extends org.apache.spark.sql.paimon.shims.ArrayData { - - private final DataType elementType; - - private InternalArray array; - - public SparkArrayData(DataType elementType) { - this.elementType = elementType; - } - - public SparkArrayData replace(InternalArray array) { - this.array = array; - return this; - } - - @Override - public int numElements() { - return array.size(); - } - - @Override - public ArrayData copy() { - return new SparkArrayData(elementType).replace(copyArray(array, elementType)); - } - - @Override - public Object[] array() { - Object[] objects = new Object[numElements()]; - for (int i = 0; i < objects.length; i++) { - objects[i] = fromPaimon(InternalRowUtils.get(array, i, elementType), elementType); - } - return objects; - } - - @Override - public void setNullAt(int i) { - throw new UnsupportedOperationException(); - } - - @Override - public void update(int i, Object value) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isNullAt(int ordinal) { - return array.isNullAt(ordinal); - } - - @Override - public boolean getBoolean(int ordinal) { - return array.getBoolean(ordinal); - } - - @Override - public byte getByte(int ordinal) { - return array.getByte(ordinal); - } - - @Override - public short getShort(int ordinal) { - return array.getShort(ordinal); - } - - @Override - public int getInt(int ordinal) { - return array.getInt(ordinal); - } - - @Override - public long getLong(int ordinal) { - if (elementType instanceof BigIntType) { - return array.getLong(ordinal); - } - - return getTimestampMicros(ordinal); - } - - private long getTimestampMicros(int ordinal) { - return fromPaimon(array.getTimestamp(ordinal, DataTypeChecks.getPrecision(elementType))); - } - - @Override - public float getFloat(int ordinal) { - return array.getFloat(ordinal); - } - - @Override - public double getDouble(int ordinal) { - return array.getDouble(ordinal); - } - - @Override - public Decimal getDecimal(int ordinal, int precision, int scale) { - return fromPaimon(array.getDecimal(ordinal, precision, scale)); - } - - @Override - public UTF8String getUTF8String(int ordinal) { - return fromPaimon(array.getString(ordinal)); - } - - @Override - public byte[] getBinary(int ordinal) { - return array.getBinary(ordinal); - } - - @Override - public CalendarInterval getInterval(int ordinal) { - throw new UnsupportedOperationException(); - } - - @Override - public InternalRow getStruct(int ordinal, int numFields) { - return fromPaimon(array.getRow(ordinal, numFields), (RowType) elementType); - } - - @Override - public ArrayData getArray(int ordinal) { - return fromPaimon(array.getArray(ordinal), (ArrayType) elementType); - } - - @Override - public MapData getMap(int ordinal) { - return fromPaimon(array.getMap(ordinal), elementType); - } - - @Override - public Object get(int ordinal, org.apache.spark.sql.types.DataType dataType) { - return SpecializedGettersReader.read(this, ordinal, dataType, true, true); - } -} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index d4b712fcb8ee..9957f0cdf91f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -52,7 +52,7 @@ import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.internal.SessionState; import org.apache.spark.sql.internal.StaticSQLConf; -import org.apache.spark.sql.paimon.shims; +import org.apache.spark.sql.paimon.shims.SparkShimLoader; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.slf4j.Logger; @@ -203,7 +203,8 @@ public Table createTable( return sparkCatalog.createTable(ident, schema, partitions, properties); } else { // delegate to the session catalog - return shims.createTable(asTableCatalog(), ident, schema, partitions, properties); + return SparkShimLoader.getSparkShim() + .createTable(asTableCatalog(), ident, schema, partitions, properties); } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 54970bfe3cb2..9a305ca59a0f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions import org.apache.paimon.metastore.MetastoreClient import org.apache.paimon.operation.FileStoreCommit +import org.apache.paimon.spark.data.SparkInternalRow import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.BatchWriteBuilder import org.apache.paimon.types.RowType @@ -116,7 +117,7 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { s"the partition schema '${partitionSchema.sql}'." ) table.newReadBuilder.newScan.listPartitions.asScala - .map(binaryRow => SparkInternalRow.fromPaimon(binaryRow, partitionRowType)) + .map(binaryRow => DataConverter.fromPaimon(binaryRow, partitionRowType)) .filter( sparkInternalRow => { partitionCols.zipWithIndex diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala index fa9072df3149..526178e28ec3 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark import org.apache.paimon.data.{InternalRow => PaimonInternalRow} import org.apache.paimon.reader.RecordReader +import org.apache.paimon.spark.data.SparkInternalRow import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.table.source.{DataSplit, Split} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala index 94de0bec3b50..59b07a794481 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala @@ -18,10 +18,11 @@ package org.apache.paimon.spark -import org.apache.paimon.data +import org.apache.paimon.data.{InternalRow => PaimonInternalRow} import org.apache.paimon.disk.IOManager import org.apache.paimon.reader.RecordReader import org.apache.paimon.spark.SparkUtils.createIOManager +import org.apache.paimon.spark.data.SparkInternalRow import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.table.source.{ReadBuilder, Split} import org.apache.paimon.types.RowType @@ -45,13 +46,13 @@ case class PaimonPartitionReaderFactory( val dataFields = new JList(readBuilder.readType().getFields) dataFields.addAll(metadataColumns.map(_.toPaimonDataField).asJava) val rowType = new RowType(dataFields) - new SparkInternalRow(rowType) + SparkInternalRow.create(rowType) } override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { partition match { case paimonInputPartition: PaimonInputPartition => - val readFunc: Split => RecordReader[data.InternalRow] = + val readFunc: Split => RecordReader[PaimonInternalRow] = (split: Split) => readBuilder.newRead().withIOManager(ioManager).createReader(split) PaimonPartitionReader(readFunc, paimonInputPartition, row, metadataColumns) case _ => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala index 28af4ac0a4fd..8dd464933032 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.paimon.spark.data.SparkInternalRow import org.apache.paimon.stats.ColStats import org.apache.paimon.types.{DataField, DataType, RowType} @@ -118,8 +119,10 @@ object PaimonColumnStats { def apply(dateType: DataType, paimonColStats: ColStats[_]): PaimonColumnStats = { PaimonColumnStats( paimonColStats.nullCount, - Optional.ofNullable(SparkInternalRow.fromPaimon(paimonColStats.min().orElse(null), dateType)), - Optional.ofNullable(SparkInternalRow.fromPaimon(paimonColStats.max().orElse(null), dateType)), + Optional.ofNullable( + DataConverter + .fromPaimon(paimonColStats.min().orElse(null), dateType)), + Optional.ofNullable(DataConverter.fromPaimon(paimonColStats.max().orElse(null), dateType)), paimonColStats.distinctCount, paimonColStats.avgLen, paimonColStats.maxLen diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala index cd9718cf44eb..41e7fd3c3ce9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala @@ -20,7 +20,8 @@ package org.apache.paimon.spark.aggregate import org.apache.paimon.data.BinaryRow import org.apache.paimon.manifest.PartitionEntry -import org.apache.paimon.spark.{SparkInternalRow, SparkTypeUtils} +import org.apache.paimon.spark.SparkTypeUtils +import org.apache.paimon.spark.data.SparkInternalRow import org.apache.paimon.table.{DataTable, Table} import org.apache.paimon.utils.{InternalRowUtils, ProjectedRow} @@ -104,7 +105,7 @@ class LocalAggregator(table: Table) { ProjectedRow.from(requiredGroupByIndexMapping.toArray).replaceRow(partitionRow) // `ProjectedRow` does not support `hashCode`, so do a deep copy val genericRow = InternalRowUtils.copyInternalRow(projectedRow, partitionType) - new SparkInternalRow(partitionType).replace(genericRow) + SparkInternalRow.create(partitionType).replace(genericRow) } def update(partitionEntry: PartitionEntry): Unit = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala index f567d925ea57..875bf725c2eb 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala @@ -55,6 +55,9 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { case s @ ShowColumns(PaimonRelation(table), _, _) if s.resolved => PaimonShowColumnsCommand(table) + +// case s@ShowTableExtended(PaimonRelation(table), _, _) if s.resolved => +// PaimonShowColumnsCommand(table) } private def paimonWriteResolved(query: LogicalPlan, table: NamedRelation): Boolean = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala index b0b1a76e7a1f..3428ed89f004 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{SCALAR_SUBQUERY, SCALAR_SUBQUERY_REFERENCE, TreePattern} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.paimon.shims +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.types.{DataType, StructType} import scala.collection.mutable.ArrayBuffer @@ -344,7 +344,7 @@ trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with PredicateHe val Seq(newPlanSupportsHashAggregate, cachedPlanSupportsHashAggregate) = aggregateExpressionsSeq.zip(groupByExpressionSeq).map { case (aggregateExpressions, groupByExpressions) => - shims.Aggregate.supportsHashAggregate( + SparkShimLoader.getSparkShim.supportsHashAggregate( aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes), groupByExpressions) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala index f252b3bb130b..57a8a8e4abfd 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala @@ -22,7 +22,7 @@ import org.apache.paimon.crosspartition.{GlobalIndexAssigner, KeyPartOrRow} import org.apache.paimon.data.{BinaryRow, GenericRow, InternalRow => PaimonInternalRow, JoinedRow} import org.apache.paimon.disk.IOManager import org.apache.paimon.index.HashBucketAssigner -import org.apache.paimon.spark.{SparkInternalRow, SparkRow} +import org.apache.paimon.spark.{DataConverter, SparkRow} import org.apache.paimon.spark.SparkUtils.createIOManager import org.apache.paimon.spark.util.EncoderUtils import org.apache.paimon.table.FileStoreTable @@ -179,7 +179,7 @@ class GlobalIndexAssignerIterator( extraRow.setField(1, bucket) queue.enqueue( encoderGroup.internalToRow( - SparkInternalRow.fromPaimon(new JoinedRow(row, extraRow), rowType))) + DataConverter.fromPaimon(new JoinedRow(row, extraRow), rowType))) } ) rowIterator.foreach { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index f557a0cf38ee..da05e37b2c13 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -28,7 +28,7 @@ import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.CommitMessage import org.apache.paimon.types.RowKind -import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} import org.apache.spark.sql.PaimonUtils._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, lit, monotonically_increasing_id, sum} -import org.apache.spark.sql.paimon.shims.ExpressionUtils.{column, convertToExpression} +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.types.{ByteType, StructField, StructType} import scala.collection.mutable @@ -153,12 +153,12 @@ case class MergeIntoPaimonTable( } if (hasUpdate(matchedActions)) { touchedFilePathsSet ++= findTouchedFiles( - targetDS.join(sourceDS, column(mergeCondition), "inner"), + targetDS.join(sourceDS, toColumn(mergeCondition), "inner"), sparkSession) } if (hasUpdate(notMatchedBySourceActions)) { touchedFilePathsSet ++= findTouchedFiles( - targetDS.join(sourceDS, column(mergeCondition), "left_anti"), + targetDS.join(sourceDS, toColumn(mergeCondition), "left_anti"), sparkSession) } @@ -200,7 +200,7 @@ case class MergeIntoPaimonTable( val sourceDS = createDataset(sparkSession, sourceTable) .withColumn(SOURCE_ROW_COL, lit(true)) - val joinedDS = sourceDS.join(targetDS, column(mergeCondition), "fullOuter") + val joinedDS = sourceDS.join(targetDS, toColumn(mergeCondition), "fullOuter") val joinedPlan = joinedDS.queryExecution.analyzed def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = { @@ -209,9 +209,9 @@ case class MergeIntoPaimonTable( val targetOutput = filteredTargetPlan.output val targetRowNotMatched = resolveOnJoinedPlan( - Seq(convertToExpression(sparkSession, col(SOURCE_ROW_COL).isNull))).head + Seq(toExpression(sparkSession, col(SOURCE_ROW_COL).isNull))).head val sourceRowNotMatched = resolveOnJoinedPlan( - Seq(convertToExpression(sparkSession, col(TARGET_ROW_COL).isNull))).head + Seq(toExpression(sparkSession, col(TARGET_ROW_COL).isNull))).head val matchedExprs = matchedActions.map(_.condition.getOrElse(TrueLiteral)) val notMatchedExprs = notMatchedActions.map(_.condition.getOrElse(TrueLiteral)) val notMatchedBySourceExprs = notMatchedBySourceActions.map(_.condition.getOrElse(TrueLiteral)) @@ -275,7 +275,7 @@ case class MergeIntoPaimonTable( .withColumn(ROW_ID_COL, monotonically_increasing_id()) val sourceDS = createDataset(sparkSession, sourceTable) val count = sourceDS - .join(targetDS, column(mergeCondition), "inner") + .join(targetDS, toColumn(mergeCondition), "inner") .select(col(ROW_ID_COL), lit(1).as("one")) .groupBy(ROW_ID_COL) .agg(sum("one").as("count")) @@ -288,6 +288,14 @@ case class MergeIntoPaimonTable( } } } + + private def toColumn(expr: Expression): Column = { + SparkShimLoader.getSparkShim.column(expr) + } + + private def toExpression(spark: SparkSession, col: Column): Expression = { + SparkShimLoader.getSparkShim.convertToExpression(spark, col) + } } object MergeIntoPaimonTable { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index f2ea965d1407..47e3f77d0e2c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, Project, SupportsSubquery} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.lit -import org.apache.spark.sql.paimon.shims.ExpressionUtils.column +import org.apache.spark.sql.paimon.shims.SparkShimLoader case class UpdatePaimonTableCommand( relation: DataSourceV2Relation, @@ -133,7 +133,8 @@ case class UpdatePaimonTableCommand( sparkSession: SparkSession, touchedDataSplits: Array[DataSplit]): Seq[CommitMessage] = { val updateColumns = updateExpressions.zip(relation.output).map { - case (update, origin) => column(update).as(origin.name, origin.metadata) + case (update, origin) => + SparkShimLoader.getSparkShim.column(update).as(origin.name, origin.metadata) } val toUpdateScanRelation = createNewRelation(touchedDataSplits, relation) @@ -156,7 +157,7 @@ case class UpdatePaimonTableCommand( } else { If(condition, update, origin) } - column(updated).as(origin.name, origin.metadata) + SparkShimLoader.getSparkShim.column(updated).as(origin.name, origin.metadata) } val data = createDataset(sparkSession, toUpdateScanRelation).select(updateColumns: _*) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala new file mode 100644 index 000000000000..c6539a493cee --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.data.InternalArray +import org.apache.paimon.spark.DataConverter +import org.apache.paimon.types.{ArrayType => PaimonArrayType, BigIntType, DataType => PaimonDataType, DataTypeChecks, RowType} +import org.apache.paimon.utils.InternalRowUtils + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.paimon.shims.SparkShimLoader +import org.apache.spark.sql.types.{DataType, Decimal} +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +abstract class SparkArrayData extends org.apache.spark.sql.catalyst.util.ArrayData { + + def replace(array: InternalArray): SparkArrayData +} + +abstract class AbstractSparkArrayData extends SparkArrayData { + + val elementType: PaimonDataType + + var paimonArray: InternalArray = _ + + override def replace(array: InternalArray): SparkArrayData = { + this.paimonArray = array + this + } + + override def numElements(): Int = paimonArray.size() + + override def copy(): ArrayData = { + SparkArrayData.create(elementType).replace(InternalRowUtils.copyArray(paimonArray, elementType)) + } + + override def array: Array[Any] = { + Array.range(0, numElements()).map { + i => + DataConverter + .fromPaimon(InternalRowUtils.get(paimonArray, i, elementType), elementType) + } + } + + override def setNullAt(i: Int): Unit = throw new UnsupportedOperationException() + + override def update(i: Int, value: Any): Unit = throw new UnsupportedOperationException() + + override def isNullAt(ordinal: Int): Boolean = paimonArray.isNullAt(ordinal) + + override def getBoolean(ordinal: Int): Boolean = paimonArray.getBoolean(ordinal) + + override def getByte(ordinal: Int): Byte = paimonArray.getByte(ordinal) + + override def getShort(ordinal: Int): Short = paimonArray.getShort(ordinal) + + override def getInt(ordinal: Int): Int = paimonArray.getInt(ordinal) + + override def getLong(ordinal: Int): Long = elementType match { + case _: BigIntType => paimonArray.getLong(ordinal) + case _ => + DataConverter.fromPaimon( + paimonArray.getTimestamp(ordinal, DataTypeChecks.getPrecision(elementType))) + } + + override def getFloat(ordinal: Int): Float = paimonArray.getFloat(ordinal) + + override def getDouble(ordinal: Int): Double = paimonArray.getDouble(ordinal) + + override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = + DataConverter.fromPaimon(paimonArray.getDecimal(ordinal, precision, scale)) + + override def getUTF8String(ordinal: Int): UTF8String = + DataConverter.fromPaimon(paimonArray.getString(ordinal)) + + override def getBinary(ordinal: Int): Array[Byte] = paimonArray.getBinary(ordinal) + + override def getInterval(ordinal: Int): CalendarInterval = + throw new UnsupportedOperationException() + + override def getStruct(ordinal: Int, numFields: Int): InternalRow = DataConverter + .fromPaimon(paimonArray.getRow(ordinal, numFields), elementType.asInstanceOf[RowType]) + + override def getArray(ordinal: Int): ArrayData = DataConverter.fromPaimon( + paimonArray.getArray(ordinal), + elementType.asInstanceOf[PaimonArrayType]) + + override def getMap(ordinal: Int): MapData = + DataConverter.fromPaimon(paimonArray.getMap(ordinal), elementType) + + override def get(ordinal: Int, dataType: DataType): AnyRef = + SpecializedGettersReader.read(this, ordinal, dataType, true, true) + +} + +object SparkArrayData { + def create(elementType: PaimonDataType): SparkArrayData = { + SparkShimLoader.getSparkShim.createSparkArrayData(elementType) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala new file mode 100644 index 000000000000..436df4e78464 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.data.{Decimal => PaimonDecimal, InternalRow => PaimonInternalRow, _} +import org.apache.paimon.spark.DataConverter +import org.apache.paimon.spark.util.shim.TypeUtils +import org.apache.paimon.types.{ArrayType => PaimonArrayType, BigIntType, DataType => PaimonDataType, DataTypeChecks, IntType, MapType => PaimonMapType, MultisetType, RowType} +import org.apache.paimon.types.DataTypeRoot._ +import org.apache.paimon.utils.InternalRowUtils.copyInternalRow + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, MapData} +import org.apache.spark.sql.paimon.shims.SparkShimLoader +import org.apache.spark.sql.types +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +import java.util.Objects + +abstract class SparkInternalRow extends InternalRow { + def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow +} + +//abstract class AbstractSparkInternalRow extends SparkInternalRow { +// +// protected val rowType: RowType +// protected var row: PaimonInternalRow = _ +// +// override def replace(row: PaimonInternalRow): SparkInternalRow = { +// this.row = row +// this +// } +// +// override def numFields: Int = row.getFieldCount +// +// override def setNullAt(ordinal: Int): Unit = throw new UnsupportedOperationException() +// +// override def update(ordinal: Int, value: Any): Unit = throw new UnsupportedOperationException() +// +// override def copy(): InternalRow = { +// SparkInternalRow.create(rowType).replace(copyInternalRow(row, rowType)) +// } +// +// override def isNullAt(ordinal: Int): Boolean = row.isNullAt(ordinal) +// +// override def getBoolean(ordinal: Int): Boolean = row.getBoolean(ordinal) +// +// override def getByte(ordinal: Int): Byte = row.getByte(ordinal) +// +// override def getShort(ordinal: Int): Short = row.getShort(ordinal) +// +// override def getInt(ordinal: Int): Int = row.getInt(ordinal) +// +// override def getLong(ordinal: Int): Long = rowType.getTypeAt(ordinal) match { +// case _: BigIntType => +// row.getLong(ordinal) +// case _ => +// DataConverter.fromPaimon( +// row.getTimestamp(ordinal, DataTypeChecks.getPrecision(rowType.getTypeAt(ordinal)))) +// } +// +// override def getFloat(ordinal: Int): Float = row.getFloat(ordinal) +// +// override def getDouble(ordinal: Int): Double = row.getDouble(ordinal) +// +// override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = +// DataConverter.fromPaimon(row.getDecimal(ordinal, precision, scale)) +// +// override def getUTF8String(ordinal: Int): UTF8String = +// DataConverter.fromPaimon(row.getString(ordinal)) +// +// override def getBinary(ordinal: Int): Array[Byte] = row.getBinary(ordinal) +// +// override def getInterval(ordinal: Int): CalendarInterval = +// throw new UnsupportedOperationException() +// +// override def getStruct(ordinal: Int, numFields: Int): InternalRow = { +// DataConverter.fromPaimon( +// row.getRow(ordinal, numFields), +// rowType.getTypeAt(ordinal).asInstanceOf[RowType]) +// } +// +// override def getArray(ordinal: Int): ArrayData = { +// DataConverter.fromPaimon( +// row.getArray(ordinal), +// rowType.getTypeAt(ordinal).asInstanceOf[PaimonArrayType]) +// } +// +// override def getMap(ordinal: Int): MapData = { +// DataConverter.fromPaimon(row.getMap(ordinal), rowType.getTypeAt(ordinal)) +// } +// +// override def get(ordinal: Int, dataType: DataType): AnyRef = { +// if (isNullAt(ordinal) || dataType.isInstanceOf[NullType]) { +// null +// } else { +// dataType match { +// case _: BooleanType => getBinary(ordinal) +// case _: ByteType => getByte(ordinal) +// case _: ShortType => getShort(ordinal) +// case _: IntegerType => getInt(ordinal) +// case _: LongType => getLong(ordinal) +// case _: FloatType => getFloat(ordinal) +// case _: DoubleType => getDouble(ordinal) +// case _: StringType | _: CharType | _: VarcharType => getUTF8String(ordinal) +// case dt: DecimalType => getDecimal(ordinal, dt.precision, dt.scale) +// case _: DateType => getInt(ordinal) +// case _: TimestampType => getLong(ordinal) +// case _: CalendarIntervalType => getInterval(ordinal) +// case _: BinaryType => getBinary(ordinal) +// case st: StructType => getStruct(ordinal, st.size) +// case _: ArrayType => getArray(ordinal) +// case _: MapType => getMap(ordinal) +// case _: UserDefinedType[_] => +// get(ordinal, dataType.asInstanceOf[UserDefinedType[_]].sqlType) +// } +// } +// } +// +// private def getAs[T](ordinal: Int, dataType: DataType): T = { +// if (isNullAt(ordinal) || dataType.isInstanceOf[NullType]) { +// null +// } else { +// dataType match { +// case _: BooleanType => row.getBinary(ordinal). +// case _: ByteType => getByte(ordinal) +// case _: ShortType => getShort(ordinal) +// case _: IntegerType => getInt(ordinal) +// case _: LongType => getLong(ordinal) +// case _: FloatType => getFloat(ordinal) +// case _: DoubleType => getDouble(ordinal) +// case _: StringType | _: CharType | _: VarcharType => getUTF8String(ordinal) +// case dt: DecimalType => getDecimal(ordinal, dt.precision, dt.scale) +// case _: DateType => getInt(ordinal) +// case _: TimestampType => getLong(ordinal) +// case _: CalendarIntervalType => getInterval(ordinal) +// case _: BinaryType => getBinary(ordinal) +// case st: StructType => getStruct(ordinal, st.size) +// case _: ArrayType => getArray(ordinal) +// case _: MapType => getMap(ordinal) +// case _: UserDefinedType[_] => +// get(ordinal, dataType.asInstanceOf[UserDefinedType[_]].sqlType) +// } +// } +// } +// +// override def equals(obj: Any): Boolean = { +// if (this == obj) { +// return true +// } +// if (obj == null || getClass != obj.getClass) { +// return false +// } +// val that = obj.asInstanceOf[AbstractSparkInternalRow] +// Objects.equals(rowType, that.rowType) && Objects.equals(row, that.row) +// } +// +// override def hashCode(): Int = Objects.hash(rowType, row) +//} + +object SparkInternalRow { + + def create(rowType: RowType): SparkInternalRow = { + SparkShimLoader.getSparkShim.createSparkInternalRow(rowType) + } + +// def fromPaimon(o: Any, dataType: PaimonDataType): Any = { +// if (o == null) { +// return null +// } +// dataType.getTypeRoot match { +// case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => +// fromPaimon(o.asInstanceOf[Timestamp]) +// case CHAR | VARCHAR => +// fromPaimon(o.asInstanceOf[BinaryString]) +// case DECIMAL => +// fromPaimon(o.asInstanceOf[PaimonDecimal]) +// case ARRAY => +// fromPaimonArray(o.asInstanceOf[InternalArray], dataType.asInstanceOf[PaimonArrayType]) +// case MAP | MULTISET => +// fromPaimonMap(o.asInstanceOf[InternalMap], dataType) +// case ROW => +// fromPaimonRow(o.asInstanceOf[PaimonInternalRow], dataType.asInstanceOf[RowType]) +// } +// } +// +// def fromPaimon(string: BinaryString): UTF8String = { +// UTF8String.fromBytes(string.toBytes) +// } +// +// def fromPaimon(decimal: PaimonDecimal): Decimal = { +// Decimal.apply(decimal.toBigDecimal) +// } +// +// def fromPaimonRow(row: PaimonInternalRow, rowType: RowType): InternalRow = { +// create(rowType).replace(row) +// } +// +// def fromPaimon(timestamp: Timestamp): Long = { +// if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) { +// DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp) +// } else { +// timestamp.toMicros +// } +// } +// +// def fromPaimonArray(array: InternalArray, arrayType: PaimonArrayType): ArrayData = { +// fromPaimonArrayElementType(array, arrayType.getElementType) +// } +// +// def fromPaimonMap(map: InternalMap, mapType: PaimonDataType): MapData = { +// val (keyType, valueType) = mapType match { +// case mt: PaimonMapType => (mt.getKeyType, mt.getValueType) +// case mst: MultisetType => (mst.getElementType, new IntType()) +// case _ => throw new UnsupportedOperationException("Unsupported type: " + mapType) +// } +// new ArrayBasedMapData( +// fromPaimonArrayElementType(map.keyArray(), keyType), +// fromPaimonArrayElementType(map.valueArray(), valueType)) +// } +// +// private def fromPaimonArrayElementType( +// array: InternalArray, +// elementType: PaimonDataType): ArrayData = { +// SparkArrayData.create(elementType).replace(array); +// } + +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index 6f47a77ef308..e8f75d394a81 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -25,14 +25,16 @@ import org.apache.paimon.spark.execution.PaimonStrategy import org.apache.paimon.spark.execution.adaptive.DisableUnnecessaryPaimonBucketedScan import org.apache.spark.sql.SparkSessionExtensions -import org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser +import org.apache.spark.sql.paimon.shims.SparkShimLoader /** Spark session extension to extends the syntax and adds the rules. */ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { // parser extensions - extensions.injectParser { case (_, parser) => new PaimonSparkSqlExtensionsParser(parser) } + extensions.injectParser { + case (_, parser) => SparkShimLoader.getSparkShim.createSparkParser(parser) + } // analyzer extensions extensions.injectResolutionRule(spark => new PaimonAnalysis(spark)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala similarity index 98% rename from paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala index 9ece186930d7..c1d61e973834 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala @@ -47,8 +47,8 @@ import java.util.Locale * @param delegate * The extension parser. */ -class PaimonSparkSqlExtensionsParser(val delegate: ParserInterface) - extends org.apache.spark.sql.paimon.shims.ParserInterface +abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterface) + extends org.apache.spark.sql.catalyst.parser.ParserInterface with Logging { private lazy val substitutor = new VariableSubstitution() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala index 265c82866195..2ab3dc494524 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala @@ -23,10 +23,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.ExternalCatalog import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.util.Utils - -import scala.reflect.ClassTag -import scala.util.control.NonFatal +import org.apache.spark.sql.paimon.ReflectUtils object PaimonCatalogUtils { @@ -37,22 +34,10 @@ object PaimonCatalogUtils { } else { "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog" } - reflect[ExternalCatalog, SparkConf, Configuration](externalCatalogClassName, conf, hadoopConf) - } - - private def reflect[T, Arg1 <: AnyRef, Arg2 <: AnyRef]( - className: String, - ctorArg1: Arg1, - ctorArg2: Arg2)(implicit ctorArgTag1: ClassTag[Arg1], ctorArgTag2: ClassTag[Arg2]): T = { - try { - val clazz = Utils.classForName(className) - val ctor = clazz.getDeclaredConstructor(ctorArgTag1.runtimeClass, ctorArgTag2.runtimeClass) - val args = Array[AnyRef](ctorArg1, ctorArg2) - ctor.newInstance(args: _*).asInstanceOf[T] - } catch { - case NonFatal(e) => - throw new IllegalArgumentException(s"Error while instantiating '$className':", e) - } + ReflectUtils.reflect[ExternalCatalog, SparkConf, Configuration]( + externalCatalogClassName, + conf, + hadoopConf) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/ReflectUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/ReflectUtils.scala new file mode 100644 index 000000000000..bedac542ab8b --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/ReflectUtils.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon + +import org.apache.spark.util.Utils + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +object ReflectUtils { + + def reflect[T, Arg1 <: AnyRef, Arg2 <: AnyRef](className: String, ctorArg1: Arg1, ctorArg2: Arg2)( + implicit + ctorArgTag1: ClassTag[Arg1], + ctorArgTag2: ClassTag[Arg2]): T = { + try { + val clazz = Utils.classForName(className) + val ctor = clazz.getDeclaredConstructor(ctorArgTag1.runtimeClass, ctorArgTag2.runtimeClass) + val args = Array[AnyRef](ctorArg1, ctorArg2) + ctor.newInstance(args: _*).asInstanceOf[T] + } catch { + case NonFatal(e) => + throw new IllegalArgumentException(s"Error while instantiating '$className':", e) + } + } + +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala new file mode 100644 index 000000000000..100280cec72d --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon.shims + +import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow} +import org.apache.paimon.types.{DataType, RowType} + +import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType + +import java.util.{Map => JMap} + +trait SparkShim { + + def createSparkParser(delegate: ParserInterface): ParserInterface + + def createSparkInternalRow(rowType: RowType): SparkInternalRow + + def createSparkArrayData(elementType: DataType): SparkArrayData + + def supportsHashAggregate( + aggregateBufferAttributes: Seq[Attribute], + groupingExpression: Seq[Expression]): Boolean + + def createTable( + tableCatalog: TableCatalog, + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: JMap[String, String]): Table + + def column(expr: Expression): Column + + def convertToExpression(spark: SparkSession, column: Column): Expression + +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala new file mode 100644 index 000000000000..de7cb631ac69 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShimLoader.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon.shims + +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ + +object SparkShimLoader { + + private var sparkShim: SparkShim = _ + + def getSparkShim: SparkShim = { + if (sparkShim == null) { + sparkShim = loadSparkShim() + } + sparkShim + } + + private def loadSparkShim(): SparkShim = { + val shims = ServiceLoader.load(classOf[SparkShim]).asScala + if (shims.size != 1) { + throw new IllegalStateException("Exactly one spark shim should be here.") + } + shims.head + } +} diff --git a/paimon-spark/paimon-spark-ut/pom.xml b/paimon-spark/paimon-spark-ut/pom.xml new file mode 100644 index 000000000000..5a3a8b761ea5 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/pom.xml @@ -0,0 +1,103 @@ + + + + 4.0.0 + + + org.apache.paimon + paimon-spark + 1.0-SNAPSHOT + + + paimon-spark-ut + Paimon : Spark : UT + + + 3.5.3 + + + + + org.apache.paimon + ${paimon-sparkx-common} + ${project.version} + test + + + + org.apache.paimon + paimon-spark-common_${scala.binary.version} + ${project.version} + test + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + + + diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java similarity index 94% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java index b98213c0e662..fe90019beb10 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.spark.data.SparkInternalRow; import org.apache.paimon.utils.DateTimeUtils; import org.apache.spark.sql.catalyst.CatalystTypeConverters; @@ -48,7 +49,7 @@ import static org.apache.paimon.spark.SparkTypeTest.ALL_TYPES; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link SparkInternalRow}. */ +/** Test for {@link SparkInternalRowX}. */ public class SparkInternalRowTest { @Test @@ -95,7 +96,7 @@ public void test() { SparkTypeUtils.fromPaimonType(ALL_TYPES))); org.apache.spark.sql.Row sparkRow = (org.apache.spark.sql.Row) - sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(rowData)); + sparkConverter.apply(SparkInternalRow.create(ALL_TYPES).replace(rowData)); String expected = "1," @@ -122,7 +123,8 @@ public void test() { SparkRow sparkRowData = new SparkRow(ALL_TYPES, sparkRow); sparkRow = (org.apache.spark.sql.Row) - sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(sparkRowData)); + sparkConverter.apply( + SparkInternalRow.create(ALL_TYPES).replace(sparkRowData)); assertThat(sparkRowToString(sparkRow)).isEqualTo(expected); TimeZone.setDefault(tz); } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java rename to paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/CallStatementParserTest.java diff --git a/paimon-spark/paimon-spark-common/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/paimon-spark/paimon-spark-ut/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension rename to paimon-spark/paimon-spark-ut/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension diff --git a/paimon-spark/paimon-spark-common/src/test/resources/hive-site.xml b/paimon-spark/paimon-spark-ut/src/test/resources/hive-site.xml similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/resources/hive-site.xml rename to paimon-spark/paimon-spark-ut/src/test/resources/hive-site.xml diff --git a/paimon-spark/paimon-spark-common/src/test/resources/log4j2-test.properties b/paimon-spark/paimon-spark-ut/src/test/resources/log4j2-test.properties similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/resources/log4j2-test.properties rename to paimon-spark/paimon-spark-ut/src/test/resources/log4j2-test.properties diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonTableTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonTableTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonTableTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala similarity index 99% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index fc787246f9f1..6f2ad2e58146 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -26,7 +26,6 @@ import org.apache.paimon.table.source.{DataSplit, Split} import org.junit.jupiter.api.Assertions -import java.util import java.util.{HashMap => JHashMap} import scala.collection.JavaConverters._ diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateDatabaseProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateDatabaseProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateDatabaseProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateDatabaseProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ProcedureTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/BucketedTableQueryTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BucketedTableQueryTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/BucketedTableQueryTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BucketedTableQueryTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala similarity index 88% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala index 528dcd3cd107..c9eacc791d0f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala @@ -27,6 +27,18 @@ import java.util.Objects class DescribeTableTest extends PaimonSparkTestBase { + test(s"Paimon Show: show table extended") { + var comment = "test comment" + spark.sql(s""" + |CREATE TABLE T ( + | id INT COMMENT 'id comment', + | name STRING, + | dt STRING) + |COMMENT '$comment' + |""".stripMargin) + spark.sql(s"SHOW TABLE EXTENDED IN $dbName0 LIKE '*'") + } + test(s"Paimon describe: describe table comment") { var comment = "test comment" spark.sql(s""" diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DisableUnnecessaryPaimonBucketedScanSuite.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DisableUnnecessaryPaimonBucketedScanSuite.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DisableUnnecessaryPaimonBucketedScanSuite.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DisableUnnecessaryPaimonBucketedScanSuite.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/LookupCompactionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LookupCompactionTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/LookupCompactionTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LookupCompactionTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowColumnsTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonShowColumnsTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowColumnsTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonShowColumnsTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkVersionSupport.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/spark/paimon/Utils.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/paimon/Utils.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/spark/paimon/Utils.scala rename to paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/paimon/Utils.scala diff --git a/paimon-spark/paimon-spark3-common/pom.xml b/paimon-spark/paimon-spark3-common/pom.xml index 03d29ea05b3a..5fd869f1b393 100644 --- a/paimon-spark/paimon-spark3-common/pom.xml +++ b/paimon-spark/paimon-spark3-common/pom.xml @@ -39,9 +39,35 @@ under the License. - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} + org.apache.paimon + paimon-spark-common_${scala.binary.version} + ${project.version} + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-paimon + package + + shade + + + + + org.apache.paimon:paimon-bundle + org.apache.paimon:paimon-spark-common_${scala.binary.version} + + + + + + + + \ No newline at end of file diff --git a/paimon-spark/paimon-spark3-common/src/main/resources/META-INF/services/org.apache.paimon.spark.shims.SparkShim b/paimon-spark/paimon-spark3-common/src/main/resources/META-INF/services/org.apache.paimon.spark.shims.SparkShim new file mode 100644 index 000000000000..e30a67cdc165 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/resources/META-INF/services/org.apache.paimon.spark.shims.SparkShim @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.spark.shims.Spark3Shim \ No newline at end of file diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3ArrayData.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3ArrayData.scala new file mode 100644 index 000000000000..cb393d928dcb --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3ArrayData.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.types.DataType + +class Spark3ArrayData(override val elementType: DataType) extends AbstractSparkArrayData {} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRow.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRow.scala new file mode 100644 index 000000000000..9c9a1c6bac95 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRow.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.spark.AbstractSparkInternalRow +import org.apache.paimon.types.RowType + +class Spark3InternalRow(rowType: RowType) extends AbstractSparkInternalRow(rowType) {} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala new file mode 100644 index 000000000000..5cec8a2c23a7 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser.extensions + +import org.apache.spark.sql.catalyst.parser.ParserInterface + +class PaimonSparkSqlExtensionsParser(override val delegate: ParserInterface) + extends AbstractPaimonSparkSqlExtensionsParser(delegate) {} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala new file mode 100644 index 000000000000..290d7b383d2e --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon.shims + +import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, SparkArrayData, SparkInternalRow} +import org.apache.paimon.types.{DataType, RowType} + +import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser +import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.paimon.shims.SparkShim +import org.apache.spark.sql.types.StructType + +import java.util.{Map => JMap} + +class Spark3Shim extends SparkShim { + + override def createSparkParser(delegate: ParserInterface): ParserInterface = { + new PaimonSparkSqlExtensionsParser(delegate) + } + + override def createSparkInternalRow(rowType: RowType): SparkInternalRow = { + new Spark3InternalRow(rowType) + } + + override def createSparkArrayData(elementType: DataType): SparkArrayData = { + new Spark3ArrayData(elementType) + } + + override def supportsHashAggregate( + aggregateBufferAttributes: Seq[Attribute], + groupingExpression: Seq[Expression]): Boolean = { + Aggregate.supportsHashAggregate(aggregateBufferAttributes) + } + + override def createTable( + tableCatalog: TableCatalog, + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: JMap[String, String]): Table = { + tableCatalog.createTable(ident, schema, partitions, properties) + } + + override def column(expr: Expression): Column = new Column(expr) + + override def convertToExpression(spark: SparkSession, column: Column): Expression = column.expr + +} diff --git a/paimon-spark/paimon-spark4-common/pom.xml b/paimon-spark/paimon-spark4-common/pom.xml index dcc5b370d59a..a1bccfff64ce 100644 --- a/paimon-spark/paimon-spark4-common/pom.xml +++ b/paimon-spark/paimon-spark4-common/pom.xml @@ -39,9 +39,35 @@ under the License. - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} + org.apache.paimon + paimon-spark-common_${scala.binary.version} + ${project.version} - \ No newline at end of file + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-paimon + package + + shade + + + + + org.apache.paimon:paimon-bundle + org.apache.paimon:paimon-spark-common_${scala.binary.version} + + + + + + + + + diff --git a/paimon-spark/paimon-spark4-common/src/main/resources/META-INF/services/org.apache.paimon.spark.shims.SparkShim b/paimon-spark/paimon-spark4-common/src/main/resources/META-INF/services/org.apache.paimon.spark.shims.SparkShim new file mode 100644 index 000000000000..b0df8c67cf9a --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/resources/META-INF/services/org.apache.paimon.spark.shims.SparkShim @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.spark.sql.paimon.shims.Spark4Shim \ No newline at end of file diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala new file mode 100644 index 000000000000..be319c0a9c23 --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4ArrayData.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.types.DataType + +import org.apache.spark.unsafe.types.VariantVal + +class Spark4ArrayData(override val elementType: DataType) extends AbstractSparkArrayData { + + override def getVariant(ordinal: Int): VariantVal = throw new UnsupportedOperationException + +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala new file mode 100644 index 000000000000..54b0f420ea93 --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRow.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.data + +import org.apache.paimon.spark.AbstractSparkInternalRow +import org.apache.paimon.types.RowType + +import org.apache.spark.unsafe.types.VariantVal + +class Spark4InternalRow(rowType: RowType) extends AbstractSparkInternalRow(rowType) { + override def getVariant(i: Int): VariantVal = throw new UnsupportedOperationException +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala new file mode 100644 index 000000000000..2066bc8dec3e --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser.extensions + +import org.apache.spark.sql.catalyst.parser.{CompoundBody, ParserInterface} + +class PaimonSparkSqlExtensionsParser(override val delegate: ParserInterface) + extends AbstractPaimonSparkSqlExtensionsParser(delegate) { + + def parseScript(sqlScriptText: String): CompoundBody = delegate.parseScript(sqlScriptText) +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims.scala deleted file mode 100644 index ee6c9ad35857..000000000000 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.paimon - -import org.apache.spark.sql.{Column, SparkSession} -import org.apache.spark.sql.catalyst.{InternalRow => SparkInternalRow} -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.parser.{CompoundBody, ParserInterface => SparkParserInterface} -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate => SparkAggregate} -import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog => SparkTableCatalog} -import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.internal.{ExpressionUtils => SparkExpressionUtils} -import org.apache.spark.sql.types.StructType -import org.apache.spark.unsafe.types.VariantVal - -import java.util.{Map => JMap} - -/** Shims for Spark 4.x in [[org.apache.spark.sql]]. */ -object shims { - - /** In [[org.apache.spark.sql.catalyst]]. */ - - abstract class ParserInterface extends SparkParserInterface { - val delegate: SparkParserInterface - - def parseScript(sqlScriptText: String): CompoundBody = delegate.parseScript(sqlScriptText) - } - - abstract class ArrayData extends SparkArrayData { - def getVariant(ordinal: Int): VariantVal = throw new UnsupportedOperationException - } - - abstract class InternalRow extends SparkInternalRow { - override def getVariant(i: Int): VariantVal = throw new UnsupportedOperationException - } - - object Aggregate { - def supportsHashAggregate( - aggregateBufferAttributes: Seq[Attribute], - groupingExpression: Seq[Expression]): Boolean = { - SparkAggregate.supportsHashAggregate(aggregateBufferAttributes, groupingExpression) - } - } - - /** In [[org.apache.spark.sql.connector]]. */ - - def createTable( - tableCatalog: SparkTableCatalog, - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: JMap[String, String]): Table = { - tableCatalog.createTable( - ident, - CatalogV2Util.structTypeToV2Columns(schema), - partitions, - properties) - } - - /** In [[org.apache.spark.sql.internal]]. */ - - object ExpressionUtils { - def column(expr: Expression): Column = SparkExpressionUtils.column(expr) - - def convertToExpression(spark: SparkSession, column: Column): Expression = { - spark.expression(column) - } - } -} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala similarity index 51% rename from paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims.scala rename to paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 13ade3f3c5ac..af9e25cfd325 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -16,45 +16,44 @@ * limitations under the License. */ -package org.apache.spark.sql.paimon +package org.apache.spark.sql.paimon.shims + +import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, SparkArrayData, SparkInternalRow} +import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.{Column, SparkSession} -import org.apache.spark.sql.catalyst.{InternalRow => SparkInternalRow} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.parser.{ParserInterface => SparkParserInterface} -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate => SparkAggregate} -import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData} -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog => SparkTableCatalog} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser +import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.internal.ExpressionUtils import org.apache.spark.sql.types.StructType import java.util.{Map => JMap} -/** Shims for Spark 3.x in [[org.apache.spark.sql]]. */ -object shims { - - /** In [[org.apache.spark.sql.catalyst]]. */ +class Spark4Shim extends SparkShim { - abstract class ParserInterface extends SparkParserInterface { - val delegate: SparkParserInterface + override def createSparkParser(delegate: ParserInterface): ParserInterface = { + new PaimonSparkSqlExtensionsParser(delegate) + } + override def createSparkInternalRow(rowType: RowType): SparkInternalRow = { + new Spark4InternalRow(rowType) } - abstract class ArrayData extends SparkArrayData {} - - abstract class InternalRow extends SparkInternalRow {} - - object Aggregate { - def supportsHashAggregate( - aggregateBufferAttributes: Seq[Attribute], - groupingExpression: Seq[Expression]): Boolean = { - SparkAggregate.supportsHashAggregate(aggregateBufferAttributes) - } + override def createSparkArrayData(elementType: DataType): SparkArrayData = { + new Spark4ArrayData(elementType) } - /** In [[org.apache.spark.sql.connector]]. */ + def supportsHashAggregate( + aggregateBufferAttributes: Seq[Attribute], + groupingExpression: Seq[Expression]): Boolean = { + Aggregate.supportsHashAggregate(aggregateBufferAttributes, groupingExpression) + } def createTable( - tableCatalog: SparkTableCatalog, + tableCatalog: TableCatalog, ident: Identifier, schema: StructType, partitions: Array[Transform], @@ -62,11 +61,8 @@ object shims { tableCatalog.createTable(ident, schema, partitions, properties) } - /** In [[org.apache.spark.sql.internal]]. */ + def column(expr: Expression): Column = ExpressionUtils.column(expr) - object ExpressionUtils { - def column(expr: Expression): Column = new Column(expr) - - def convertToExpression(spark: SparkSession, column: Column): Expression = column.expr - } + def convertToExpression(spark: SparkSession, column: Column): Expression = + spark.expression(column) } diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml index aac73baa5fec..bd6024dbec0f 100644 --- a/paimon-spark/pom.xml +++ b/paimon-spark/pom.xml @@ -39,6 +39,7 @@ under the License. paimon-spark-common + paimon-spark-ut