From 6742db80a040e669069834a2c4263db85b6861cd Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Tue, 16 Jul 2024 18:47:39 +0800 Subject: [PATCH] [spark] Support timestamp_ntz for Spark 3.4+ (#3569) --- docs/content/spark/quick-start.md | 15 +- .../paimon/spark/util/shim/TypeUtils.scala | 25 ++ .../paimon/spark/util/shim/TypeUtils.scala | 25 ++ .../paimon/spark/util/shim/TypeUtils.scala | 25 ++ .../apache/paimon/spark/SparkInternalRow.java | 7 +- .../org/apache/paimon/spark/SparkRow.java | 19 +- .../apache/paimon/spark/SparkTypeUtils.java | 16 +- .../paimon/spark/util/shim/TypeUtils.scala | 24 ++ .../paimon/spark/SparkInternalRowTest.java | 57 +++-- .../apache/paimon/spark/SparkTypeTest.java | 4 +- .../paimon/spark/PaimonSparkTestBase.scala | 13 + .../apache/paimon/spark/sql/DDLTestBase.scala | 227 ++++++++++++++++++ 12 files changed, 426 insertions(+), 31 deletions(-) create mode 100644 paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala create mode 100644 paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala create mode 100644 paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala diff --git a/docs/content/spark/quick-start.md b/docs/content/spark/quick-start.md index 253ed0b2f659..6687d295964f 100644 --- a/docs/content/spark/quick-start.md +++ b/docs/content/spark/quick-start.md @@ -310,7 +310,12 @@ All Spark's data types are available in package `org.apache.spark.sql.types`. TimestampType - TimestampType, LocalZonedTimestamp + LocalZonedTimestamp + true + + + TimestampNTZType(Spark3.4+) + TimestampType true @@ -325,3 +330,11 @@ All Spark's data types are available in package `org.apache.spark.sql.types`. + +{{< hint warning >}} +Due to the previous design, in Spark3.3 and below, Paimon will map both Paimon's TimestampType and LocalZonedTimestamp to Spark's TimestampType, and only correctly handle with TimestampType. + +Therefore, when using Spark3.3 and below, reads Paimon table with LocalZonedTimestamp type written by other engines, such as Flink, the query result of LocalZonedTimestamp type will have time zone offset, which needs to be adjusted manually. + +When using Spark3.4 and above, all timestamp types can be parsed correctly. +{{< /hint >}} \ No newline at end of file diff --git a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala new file mode 100644 index 000000000000..dcd2d68891fe --- /dev/null +++ b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.util.shim + +object TypeUtils { + + // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon TimestampType as Spark TimestampType + def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true +} diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala new file mode 100644 index 000000000000..dcd2d68891fe --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.util.shim + +object TypeUtils { + + // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon TimestampType as Spark TimestampType + def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true +} diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala new file mode 100644 index 000000000000..dcd2d68891fe --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.util.shim + +object TypeUtils { + + // Since Spark 3.3 and below do not support timestamp ntz, treat Paimon TimestampType as Spark TimestampType + def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = true +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java index 787a1329a112..9dd6c7b68519 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.spark.util.shim.TypeUtils; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataType; @@ -283,7 +284,11 @@ public static org.apache.spark.sql.catalyst.InternalRow fromPaimon( } public static long fromPaimon(Timestamp timestamp) { - return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp()); + if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) { + return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp()); + } else { + return timestamp.toMicros(); + } } public static ArrayData fromPaimon(InternalArray array, ArrayType arrayType) { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index c650719b0f12..b343be247824 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.spark.util.shim.TypeUtils; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DateType; @@ -170,11 +171,21 @@ private static int toPaimonDate(Object object) { private static Timestamp toPaimonTimestamp(Object object) { if (object instanceof java.sql.Timestamp) { - return Timestamp.fromSQLTimestamp((java.sql.Timestamp) object); + java.sql.Timestamp ts = (java.sql.Timestamp) object; + if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) { + return Timestamp.fromSQLTimestamp(ts); + } else { + return Timestamp.fromInstant(ts.toInstant()); + } } else if (object instanceof java.time.Instant) { - LocalDateTime localDateTime = - LocalDateTime.ofInstant((Instant) object, ZoneId.systemDefault()); - return Timestamp.fromLocalDateTime(localDateTime); + Instant instant = (Instant) object; + if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) { + LocalDateTime localDateTime = + LocalDateTime.ofInstant((Instant) object, ZoneId.systemDefault()); + return Timestamp.fromLocalDateTime(localDateTime); + } else { + return Timestamp.fromInstant(instant); + } } else { return Timestamp.fromLocalDateTime((LocalDateTime) object); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java index 803cf5e54bc7..08fb2de32aa6 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark; +import org.apache.paimon.spark.util.shim.TypeUtils; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; @@ -150,7 +151,11 @@ public DataType visit(TimeType timeType) { @Override public DataType visit(TimestampType timestampType) { - return DataTypes.TimestampType; + if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) { + return DataTypes.TimestampType; + } else { + return DataTypes.TimestampNTZType; + } } @Override @@ -308,13 +313,20 @@ public org.apache.paimon.types.DataType atomic(DataType atomic) { } else if (atomic instanceof org.apache.spark.sql.types.DateType) { return new DateType(); } else if (atomic instanceof org.apache.spark.sql.types.TimestampType) { - return new TimestampType(); + if (TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType()) { + return new TimestampType(); + } else { + return new LocalZonedTimestampType(); + } } else if (atomic instanceof org.apache.spark.sql.types.DecimalType) { return new DecimalType( ((org.apache.spark.sql.types.DecimalType) atomic).precision(), ((org.apache.spark.sql.types.DecimalType) atomic).scale()); } else if (atomic instanceof org.apache.spark.sql.types.BinaryType) { return new VarBinaryType(VarBinaryType.MAX_LENGTH); + } else if (atomic instanceof org.apache.spark.sql.types.TimestampNTZType) { + // Move TimestampNTZType to the end for compatibility with spark3.3 and below + return new TimestampType(); } throw new UnsupportedOperationException( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala new file mode 100644 index 000000000000..8e70432bd197 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/shim/TypeUtils.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.util.shim + +object TypeUtils { + + def treatPaimonTimestampTypeAsSparkTimestampType(): Boolean = false +} diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java index 14080cd8a1f0..9af886d8369f 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java @@ -37,10 +37,12 @@ import java.time.LocalDateTime; import java.util.AbstractMap; import java.util.Map; +import java.util.TimeZone; import java.util.stream.Collectors; import java.util.stream.Stream; import scala.Function1; +import scala.collection.JavaConverters; import static org.apache.paimon.data.BinaryString.fromString; import static org.apache.paimon.spark.SparkTypeTest.ALL_TYPES; @@ -51,6 +53,8 @@ public class SparkInternalRowTest { @Test public void test() { + TimeZone tz = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); InternalRow rowData = GenericRow.of( 1, @@ -77,6 +81,7 @@ public void test() { 23567222L, "varbinary_v".getBytes(StandardCharsets.UTF_8), Timestamp.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")), DateTimeUtils.toInternal(LocalDate.parse("2022-05-02")), Decimal.fromBigDecimal(BigDecimal.valueOf(0.21), 2, 2), Decimal.fromBigDecimal(BigDecimal.valueOf(65782123123.01), 38, 2), @@ -93,32 +98,40 @@ public void test() { sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(rowData)); String expected = - "{" - + "\"id\":1," - + "\"name\":\"jingsong\"," - + "\"char\":\"apache\"," - + "\"varchar\":\"paimon\"," - + "\"salary\":22.2," - + "\"locations\":{\"key1\":{\"posX\":1.2,\"posY\":2.3},\"key2\":{\"posX\":2.4,\"posY\":3.5}}," - + "\"strArray\":[\"v1\",\"v5\"]," - + "\"intArray\":[10,30]," - + "\"boolean\":true," - + "\"tinyint\":22," - + "\"smallint\":356," - + "\"bigint\":23567222," - + "\"bytes\":\"dmFyYmluYXJ5X3Y=\"," - + "\"timestamp\":\"2007-12-03 10:15:30\"," - + "\"date\":\"2022-05-02\"," - + "\"decimal\":0.21," - + "\"decimal2\":65782123123.01," - + "\"decimal3\":62123123.5" - + "}"; - assertThat(sparkRow.json()).isEqualTo(expected); + "1," + + "jingsong," + + "apache," + + "paimon," + + "22.2," + + "Map(key2 -> [2.4,3.5], key1 -> [1.2,2.3])," + + "WrappedArray(v1, v5)," + + "WrappedArray(10, 30)," + + "true," + + "22," + + "356," + + "23567222," + + "[B@," + + "2007-12-03 18:15:30.0," + + "2007-12-03T10:15:30," + + "2022-05-02," + + "0.21," + + "65782123123.01," + + "62123123.5"; + assertThat(sparkRowToString(sparkRow)).isEqualTo(expected); SparkRow sparkRowData = new SparkRow(ALL_TYPES, sparkRow); sparkRow = (org.apache.spark.sql.Row) sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(sparkRowData)); - assertThat(sparkRow.json()).isEqualTo(expected); + assertThat(sparkRowToString(sparkRow)).isEqualTo(expected); + TimeZone.setDefault(tz); + } + + private String sparkRowToString(org.apache.spark.sql.Row row) { + return JavaConverters.seqAsJavaList(row.toSeq()).stream() + .map(Object::toString) + // Since the toString result of Spark's binary col is unstable, replace it + .map(x -> x.startsWith("[B@") ? "[B@" : x) + .collect(Collectors.joining(",")); } } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java index 84f509d768b6..fdc7558fd5f4 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java @@ -65,7 +65,8 @@ public class SparkTypeTest { .field("smallint", DataTypes.SMALLINT()) .field("bigint", DataTypes.BIGINT()) .field("bytes", DataTypes.BYTES()) - .field("timestamp", DataTypes.TIMESTAMP()) + .field("timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .field("timestamp_ntz", DataTypes.TIMESTAMP()) .field("date", DataTypes.DATE()) .field("decimal", DataTypes.DECIMAL(2, 2)) .field("decimal2", DataTypes.DECIMAL(38, 2)) @@ -95,6 +96,7 @@ public void testAllTypes() { + "StructField(bigint,LongType,true)," + "StructField(bytes,BinaryType,true)," + "StructField(timestamp,TimestampType,true)," + + "StructField(timestamp_ntz,TimestampNTZType,true)," + "StructField(date,DateType,true)," + "StructField(decimal,DecimalType(2,2),true)," + "StructField(decimal2,DecimalType(38,2),true)," diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index d431a754d483..b8115132d71c 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -36,6 +36,7 @@ import org.scalactic.source.Position import org.scalatest.Tag import java.io.File +import java.util.TimeZone import scala.util.Random @@ -101,6 +102,18 @@ class PaimonSparkTestBase withTempDir(file1 => withTempDir(file2 => f(file1, file2))) } + protected def withTimeZone(timeZone: String)(f: => Unit): Unit = { + withSQLConf("spark.sql.session.timeZone" -> timeZone) { + val originTimeZone = TimeZone.getDefault + try { + TimeZone.setDefault(TimeZone.getTimeZone(timeZone)) + f + } finally { + TimeZone.setDefault(originTimeZone) + } + } + } + override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { println(testName) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index 80118a1b6243..da40171042a1 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -18,11 +18,17 @@ package org.apache.paimon.spark.sql +import org.apache.paimon.catalog.Identifier +import org.apache.paimon.schema.Schema import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.types.DataTypes import org.apache.spark.sql.Row import org.junit.jupiter.api.Assertions +import java.sql.Timestamp +import java.time.LocalDateTime + abstract class DDLTestBase extends PaimonSparkTestBase { import testImplicits._ @@ -181,4 +187,225 @@ abstract class DDLTestBase extends PaimonSparkTestBase { } } } + + test("Paimon DDL: create table with timestamp/timestamp_ntz") { + Seq("orc", "parquet", "avro").foreach { + format => + Seq(true, false).foreach { + datetimeJava8APIEnabled => + withSQLConf("spark.sql.datetime.java8API.enabled" -> datetimeJava8APIEnabled.toString) { + withTimeZone("Asia/Shanghai") { + withTable("paimon_tbl") { + // Spark support create table with timestamp_ntz since 3.4 + if (gteqSpark3_4) { + sql(s""" + |CREATE TABLE paimon_tbl (id int, binary BINARY, ts timestamp, ts_ntz timestamp_ntz) + |USING paimon + |TBLPROPERTIES ('file.format'='$format') + |""".stripMargin) + + sql(s"INSERT INTO paimon_tbl VALUES (1, binary('b'), timestamp'2024-01-01 00:00:00', timestamp_ntz'2024-01-01 00:00:00')") + checkAnswer( + sql(s"SELECT ts, ts_ntz FROM paimon_tbl"), + Row( + if (datetimeJava8APIEnabled) + Timestamp.valueOf("2024-01-01 00:00:00").toInstant + else Timestamp.valueOf("2024-01-01 00:00:00"), + LocalDateTime.parse("2024-01-01T00:00:00") + ) + ) + + // change time zone to UTC + withTimeZone("UTC") { + // todo: fix with orc + if (format != "orc") + checkAnswer( + sql(s"SELECT ts, ts_ntz FROM paimon_tbl"), + Row( + if (datetimeJava8APIEnabled) + Timestamp.valueOf("2023-12-31 16:00:00").toInstant + else Timestamp.valueOf("2023-12-31 16:00:00"), + LocalDateTime.parse("2024-01-01T00:00:00") + ) + ) + } + } else { + sql(s""" + |CREATE TABLE paimon_tbl (id int, binary BINARY, ts timestamp) + |USING paimon + |TBLPROPERTIES ('file.format'='$format') + |""".stripMargin) + + sql(s"INSERT INTO paimon_tbl VALUES (1, binary('b'), timestamp'2024-01-01 00:00:00')") + checkAnswer( + sql(s"SELECT ts FROM paimon_tbl"), + Row( + if (datetimeJava8APIEnabled) + Timestamp.valueOf("2024-01-01 00:00:00").toInstant + else Timestamp.valueOf("2024-01-01 00:00:00")) + ) + + // For Spark 3.3 and below, time zone conversion is not supported, + // see TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType + withTimeZone("UTC") { + // todo: fix with orc + if (format != "orc") { + checkAnswer( + sql(s"SELECT ts FROM paimon_tbl"), + Row( + if (datetimeJava8APIEnabled) + Timestamp.valueOf("2024-01-01 00:00:00").toInstant + else Timestamp.valueOf("2024-01-01 00:00:00")) + ) + } + } + } + } + } + } + } + } + } + + test("Paimon DDL: create table with timestamp/timestamp_ntz using table API") { + val identifier = Identifier.create("test", "paimon_tbl") + try { + withTimeZone("Asia/Shanghai") { + val schema = Schema.newBuilder + .column("ts", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .column("ts_ntz", DataTypes.TIMESTAMP()) + .build + catalog.createTable(identifier, schema, false) + sql( + s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 00:00:00', timestamp_ntz'2024-01-01 00:00:00')") + + // read by spark + checkAnswer( + sql(s"SELECT ts, ts_ntz FROM paimon_tbl"), + Row( + Timestamp.valueOf("2024-01-01 00:00:00"), + if (gteqSpark3_4) LocalDateTime.parse("2024-01-01T00:00:00") + else Timestamp.valueOf("2024-01-01 00:00:00") + ) + ) + + // read by table api + // Due to previous design, read timestamp ltz type with spark 3.3 and below will cause problems, + // skip testing it + if (gteqSpark3_4) { + val table = catalog.getTable(identifier) + val builder = table.newReadBuilder.withProjection(Array[Int](0, 1)) + val splits = builder.newScan().plan().splits() + builder.newRead + .createReader(splits) + .forEachRemaining( + r => { + Assertions.assertEquals( + Timestamp.valueOf("2023-12-31 16:00:00"), + r.getTimestamp(0, 6).toSQLTimestamp) + Assertions.assertEquals( + Timestamp.valueOf("2024-01-01 00:00:00").toLocalDateTime, + r.getTimestamp(1, 6).toLocalDateTime) + }) + } + + // change time zone to UTC + withTimeZone("UTC") { + // read by spark + checkAnswer( + sql(s"SELECT ts, ts_ntz FROM paimon_tbl"), + Row( + // For Spark 3.3 and below, time zone conversion is not supported, + // see TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType + if (gteqSpark3_4) Timestamp.valueOf("2023-12-31 16:00:00") + else Timestamp.valueOf("2024-01-01 00:00:00"), + if (gteqSpark3_4) LocalDateTime.parse("2024-01-01T00:00:00") + else Timestamp.valueOf("2024-01-01 00:00:00") + ) + ) + + // read by table api + // Due to previous design, read timestamp ltz type with spark 3.3 and below will cause problems, + // skip testing it + if (gteqSpark3_4) { + val table = catalog.getTable(identifier) + val builder = table.newReadBuilder.withProjection(Array[Int](0, 1)) + val splits = builder.newScan().plan().splits() + builder.newRead + .createReader(splits) + .forEachRemaining( + r => { + Assertions.assertEquals( + Timestamp.valueOf("2023-12-31 16:00:00"), + r.getTimestamp(0, 6).toSQLTimestamp) + Assertions.assertEquals( + Timestamp.valueOf("2024-01-01 00:00:00").toLocalDateTime, + r.getTimestamp(1, 6).toLocalDateTime) + }) + } + } + } + } finally { + catalog.dropTable(identifier, true) + } + } + + test("Paimon DDL: select table with timestamp and timestamp_ntz with filter") { + Seq(true, false).foreach { + datetimeJava8APIEnabled => + withSQLConf("spark.sql.datetime.java8API.enabled" -> datetimeJava8APIEnabled.toString) { + withTable("paimon_tbl") { + // Spark support create table with timestamp_ntz since 3.4 + if (gteqSpark3_4) { + sql(s""" + |CREATE TABLE paimon_tbl (ts timestamp, ts_ntz timestamp_ntz) + |USING paimon + |""".stripMargin) + sql( + s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 00:00:00', timestamp_ntz'2024-01-01 00:00:00')") + sql( + s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-02 00:00:00', timestamp_ntz'2024-01-02 00:00:00')") + sql( + s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-03 00:00:00', timestamp_ntz'2024-01-03 00:00:00')") + + checkAnswer( + sql(s"SELECT * FROM paimon_tbl where ts_ntz = timestamp_ntz'2024-01-01 00:00:00'"), + Row( + if (datetimeJava8APIEnabled) + Timestamp.valueOf("2024-01-01 00:00:00").toInstant + else Timestamp.valueOf("2024-01-01 00:00:00"), + LocalDateTime.parse("2024-01-01T00:00:00") + ) + ) + + checkAnswer( + sql(s"SELECT * FROM paimon_tbl where ts > timestamp'2024-01-02 00:00:00'"), + Row( + if (datetimeJava8APIEnabled) + Timestamp.valueOf("2024-01-03 00:00:00").toInstant + else Timestamp.valueOf("2024-01-03 00:00:00"), + LocalDateTime.parse("2024-01-03T00:00:00") + ) + ) + } else { + sql(s""" + |CREATE TABLE paimon_tbl (ts timestamp) + |USING paimon + |""".stripMargin) + sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 00:00:00')") + sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-02 00:00:00')") + sql(s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-03 00:00:00')") + + checkAnswer( + sql(s"SELECT * FROM paimon_tbl where ts = timestamp'2024-01-01 00:00:00'"), + Row( + if (datetimeJava8APIEnabled) + Timestamp.valueOf("2024-01-01 00:00:00").toInstant + else Timestamp.valueOf("2024-01-01 00:00:00")) + ) + } + } + } + } + } }