From d651ef4ea1ba76442ee92926a7f285a6c30c04af Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Mon, 15 Jul 2024 23:25:16 +0800 Subject: [PATCH] expire snapshot; file deletion. --- .../paimon/predicate/PredicateBuilder.java | 20 +++-- .../apache/paimon/utils/ParameterUtils.java | 7 +- .../main/java/org/apache/paimon/Snapshot.java | 3 +- .../org/apache/paimon/manifest/FileEntry.java | 1 + .../paimon/manifest/ManifestFileMeta.java | 2 +- .../compact/UniversalCompaction.java | 1 - .../paimon/operation/FileDeletionBase.java | 10 ++- .../paimon/table/ExpireSnapshotsImpl.java | 24 +++++- .../org/apache/paimon/utils/TagManager.java | 33 +++++++- .../sink/FixedBucketRowKeyExtractorTest.java | 5 +- .../paimon/spark/PaimonBaseScanBuilder.scala | 10 +-- .../spark/sql/AskwangSQLQueryTest.scala | 79 +++++++++---------- 12 files changed, 125 insertions(+), 70 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java index e42194efb32e9..e7645f9f2393d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java @@ -280,18 +280,26 @@ public static Object convertJavaObject(DataType literalType, Object o) { int scale = decimalType.getScale(); return Decimal.fromBigDecimal((BigDecimal) o, precision, scale); case TIMESTAMP_WITHOUT_TIME_ZONE: - Timestamp ts; + // spark PushedFilters 结果是将 Timestamp 转为了时区无关的 Instant + // 这里如果是 Instant 类型则需要重新转换为时区感知的时间 + // 参考 DateTimeUtils + Timestamp timestamp; System.out.println(ZoneId.systemDefault()); - if (o instanceof Instant) { + if (o instanceof java.sql.Timestamp) { + timestamp = Timestamp.fromSQLTimestamp((java.sql.Timestamp) o); + } else if (o instanceof Instant) { Instant instant = (Instant) o; - LocalDateTime localDateTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime(); - ts = Timestamp.fromLocalDateTime(localDateTime); + LocalDateTime localDateTime = + instant.atZone(ZoneId.systemDefault()).toLocalDateTime(); + timestamp = Timestamp.fromLocalDateTime(localDateTime); + } else if (o instanceof LocalDateTime) { + timestamp = Timestamp.fromLocalDateTime((LocalDateTime) o); } else { throw new UnsupportedOperationException("Unsupported object: " + o); } - return ts; + return timestamp; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - Timestamp timestamp; + // Timestamp timestamp; if (o instanceof java.sql.Timestamp) { timestamp = Timestamp.fromSQLTimestamp((java.sql.Timestamp) o); } else if (o instanceof Instant) { diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java index efb0c25a44f72..920ed1c5f1f8e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java @@ -44,7 +44,6 @@ public static Map parseCommaSeparatedKeyValues(String keyValues) return kvs; } - public static void parseKeyValueString(Map map, String kvString) { String[] kv = kvString.split("=", 2); if (kv.length != 2) { @@ -62,11 +61,7 @@ public Map parseCommaSeparateKeyValuesAskwang(String keyValues) String[] kv = kvString.split("=", 2); if (kv.length != 2) { throw new IllegalArgumentException( - String.format( - "Invalid key-value string '%s'", - kvString - ) - ); + String.format("Invalid key-value string '%s'", kvString)); } kvs.put(kv[0], kv[1]); } diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 6c2656d39447f..5cb5e64e47364 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -374,7 +374,8 @@ public List allManifests(ManifestList manifestList) { public List dataManifests(ManifestList manifestList) { List result = new ArrayList<>(); result.addAll(manifestList.read(baseManifestList)); - result.addAll(deltaManifests(manifestList)); + // result.addAll(deltaManifests(manifestList)); + result.addAll(manifestList.read(deltaManifestList)); return result; } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index e0a6d25b71a34..e98a3a5f8c2b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -126,6 +126,7 @@ static void mergeEntries( } } + // 读取 manifest file 中的内容,对文件进行 ADD 或 DELETE static void mergeEntries(Iterable entries, Map map) { for (T entry : entries) { Identifier identifier = entry.identifier(); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java index 2dc091d889e19..b52645b07344f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java @@ -49,7 +49,7 @@ import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.Preconditions.checkArgument; -/** Metadata of a manifest file. */ +/** Metadata of a manifest file. 区分于 ManifestEntry,ManifestEntry是 manifest file 的内容. */ public class ManifestFileMeta { private static final Logger LOG = LoggerFactory.getLogger(ManifestFileMeta.class); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java index 28de1de6ac17d..67e99a4279486 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java @@ -33,7 +33,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.logging.Level; /** * Universal Compaction Style is a compaction style, targeting the use cases requiring lower write diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index c0b5c289cf7f9..37063f5fb9ac8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -165,6 +165,7 @@ protected void recordDeletionBuckets(ManifestEntry entry) { .add(entry.bucket()); } + // 这里的 manifestList 是 snapshot.deltaManifestList() public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { // try read manifests List manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList)); @@ -193,6 +194,7 @@ protected void doCleanUnusedDataFile( dataFileToDelete.forEach( (path, pair) -> { ManifestEntry entry = pair.getLeft(); + // skipper.test(entry)=true, 表示 entry 被忽略,不被删除 // check whether we should skip the data file if (!skipper.test(entry)) { // delete data files @@ -295,10 +297,12 @@ public void cleanUnusedManifestList(String manifestName, Set skippingSet String fileName = manifest.fileName(); if (!skippingSet.contains(fileName)) { toDeleteManifests.add(fileName); + // 已经删除的 manifest-file 添加到 skippingSet,避免多次删除 // to avoid other snapshots trying to delete again skippingSet.add(fileName); } } + // 删除 manifest-list 文件 if (!skippingSet.contains(manifestName)) { toDeleteManifests.add(manifestName); } @@ -327,7 +331,7 @@ public Predicate dataFileSkipper( addMergedDataFiles(cachedTagDataFiles, taggedSnapshots.get(index)); } - return entry -> index >= 0 && containsDataFile(cachedTagDataFiles, entry); + return entry -> (index >= 0 && containsDataFile(cachedTagDataFiles, entry)); } /** @@ -364,6 +368,7 @@ protected List readManifestFileNames(List manifestFile protected void addMergedDataFiles( Map>> dataFiles, Snapshot snapshot) throws IOException { + // 从 snapshot 对应的 manifest 读取 DataFile 文件 for (ManifestEntry entry : readMergedDataFiles(snapshot)) { dataFiles .computeIfAbsent(entry.partition(), p -> new HashMap<>()) @@ -373,6 +378,7 @@ protected void addMergedDataFiles( } protected Collection readMergedDataFiles(Snapshot snapshot) throws IOException { + // 读取 baseManifestList 和 deltaManifestList 中包含的 manifestfile // read data manifests List files = tryReadDataManifests(snapshot); @@ -384,6 +390,7 @@ protected Collection readMergedDataFiles(Snapshot snapshot) throw FileEntry.mergeEntries(entries, map); } + // 只取 value 对象 ManifestEntry return map.values(); } @@ -408,6 +415,7 @@ public Set manifestSkippingSet(List skippingSnapshots) { Set skippingSet = new HashSet<>(); for (Snapshot skippingSnapshot : skippingSnapshots) { + // manifest-list and manifest-file // data manifests skippingSet.add(skippingSnapshot.baseManifestList()); skippingSet.add(skippingSnapshot.deltaManifestList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 7d0150f87ea2e..976b8ae9729a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -94,6 +94,12 @@ public int expire() { Preconditions.checkArgument( retainMax >= retainMin, "retainMax must greater than retainMin."); + // example, snapshot[1,2,3,4,5,6,7,8,9,10] + // minRetain = 2, maxRetain = 5, maxDeletes=4 + // min=max(10-5+1,1)=6 + // maxExclusive=10-2+1=9 + // maxExclusive = min(9, 1+4)=5 + // the min snapshot to retain from 'snapshot.num-retained.max' // (the maximum number of snapshots to retain) long min = Math.max(latestSnapshotId - retainMax + 1, earliest); @@ -103,23 +109,29 @@ public int expire() { // (the minimum number of completed snapshots to retain) long maxExclusive = latestSnapshotId - retainMin + 1; + // Consumer which contains next snapshot. + // 最早的 consumer snapshot id 是最早需要读取的 snapshot,不可被删除 // the snapshot being read by the consumer cannot be deleted maxExclusive = Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE)); // protected by 'snapshot.expire.limit' // (the maximum number of snapshots allowed to expire at a time) + // askwang-todo: 考虑 maxDelete 后,min 就有可能大于 maxExclusive,则直接 expireUtil(1,5) maxExclusive = Math.min(maxExclusive, earliest + maxDeletes); + // snapshot(i).timeMillis + snapshot.time-retained >= system.currentTime + // 则认定为该 snapshot 没有过期。比如 10点创建,保留1小时,当前是 10:30,表示没有过期 for (long id = min; id < maxExclusive; id++) { // Early exit the loop for 'snapshot.time-retained' // (the maximum time of snapshots to retain) if (snapshotManager.snapshotExists(id) && olderThanMills <= snapshotManager.snapshot(id).timeMillis()) { + // 如果 id 没有过期,则过期清理的 snapshot id 范围为 [earliest, id) return expireUntil(earliest, id); } } - + // [min, maxExclusive) 之间的 snapshot 都已过期 return expireUntil(earliest, maxExclusive); } @@ -155,8 +167,11 @@ public int expireUntil(long earliestId, long endExclusiveId) { "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); } + // tag 依赖的 snapshot List taggedSnapshots = tagManager.taggedSnapshots(); + // 删除文件:删除一个 snapshot 下的文件,前提条件是下一个 snapshot 没有使用该文件 + // askwang-todo: 为什么要这么遍历? // delete merge tree files // deleted merge tree files in a snapshot are not used by the next snapshot, so the range of // id should be (beginInclusiveId, endExclusiveId] @@ -168,6 +183,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { // expire merge tree files and collect changed buckets Predicate skipper; try { + // 没有 tag 的情况,skipper 始终为 false skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots, id); } catch (Exception e) { LOG.info( @@ -196,29 +212,35 @@ public int expireUntil(long earliestId, long endExclusiveId) { // data files and changelog files in bucket directories has been deleted // then delete changed bucket directories if they are empty + // 从 bucket=-1 -> bucket=2,清理snapshot/file 时会清理之前动态 bucket 生成的文件和空的 bucket 目录 if (cleanEmptyDirectories) { snapshotDeletion.cleanDataDirectories(); } + // 过滤出不应该被删除的 snapshot manifest 和文件,因为 tag snapshot 会依赖 // delete manifests and indexFiles List skippingSnapshots = TagManager.findOverlappedSnapshots( taggedSnapshots, beginInclusiveId, endExclusiveId); skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId)); Set skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots); + for (long id = beginInclusiveId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { LOG.debug("Ready to delete manifests in snapshot #" + id); } Snapshot snapshot = snapshotManager.snapshot(id); + // clean manifest and indexFile snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet); if (expireConfig.isChangelogDecoupled()) { commitChangelog(new Changelog(snapshot)); } + // snapshot id 删除的范围是 [bengin, end) snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id)); } + // 更新 snapshot/EARLIEST 文件 writeEarliestHint(endExclusiveId); return (int) (endExclusiveId - beginInclusiveId); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 0c5658192fd10..2aaefa93da776 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -310,7 +310,7 @@ public SortedMap> tags(Predicate filter) { for (Path path : paths) { String tagName = path.getName().substring(TAG_PREFIX.length()); - // Predicate filter = s -> true. 只要是字符串就返回 true + // Predicate filter = s -> true. 只要是字符串就返回 true,if条件不满足 if (!filter.test(tagName)) { continue; } @@ -412,6 +412,7 @@ public static List findOverlappedSnapshots( List snapshots = new ArrayList<>(); int right = findPreviousTag(taggedSnapshots, endExclusive); if (right >= 0) { + // askwang-todo: left 会多一个无效的值,参考 findNextOrEqualTagAskwang int left = Math.max(findPreviousOrEqualTag(taggedSnapshots, beginInclusive), 0); for (int i = left; i <= right; i++) { snapshots.add(taggedSnapshots.get(i)); @@ -420,6 +421,25 @@ public static List findOverlappedSnapshots( return snapshots; } + /** + * 找到 taggedSnapshots 和 snapshot [begin, end) id 直接重叠的 snapshot 缩小 taggedSnapshots 中的 snapshot. + * id 的范围 [left, right); 第一次小于或等于 begin 的 id 为 left,(优化:第一次大于或等于 begin 的 id 为 left) 第一次小于 end 的. + * id 为 right. + */ + private static List findOverlappedSnapshotsAskwang( + List taggedSnapshot, long beginInclusive, long endExclusive) { + List snapshots = new ArrayList<>(); + int right = findPreviousTag(taggedSnapshot, endExclusive); + if (right >= 0) { + int left = Math.max(findNextOrEqualTagAskwang(taggedSnapshot, beginInclusive), 0); + for (int i = left; i <= right; i++) { + snapshots.add(taggedSnapshot.get(i)); + } + } + + return snapshots; + } + public static int findPreviousTag(List taggedSnapshots, long targetSnapshotId) { for (int i = taggedSnapshots.size() - 1; i >= 0; i--) { if (taggedSnapshots.get(i).id() < targetSnapshotId) { @@ -429,6 +449,17 @@ public static int findPreviousTag(List taggedSnapshots, long targetSna return -1; } + /** 减少不重叠的 snapshot 范围 [begin, end): [10, 15) taggedSnapshots: [7, 9, 11, 12]. */ + private static int findNextOrEqualTagAskwang( + List taggedSnapshots, long targetSnapshotId) { + for (int i = 0; i < targetSnapshotId; i++) { + if (taggedSnapshots.get(i).id() >= targetSnapshotId) { + return i; + } + } + return -1; + } + private static int findPreviousOrEqualTag( List taggedSnapshots, long targetSnapshotId) { for (int i = taggedSnapshots.size() - 1; i >= 0; i--) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java index a199396f8bfd9..17f353f5443c7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java @@ -37,7 +37,6 @@ import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.BUCKET_KEY; -import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -75,7 +74,7 @@ public void testBucketHashCode() { int recordNum = 10; for (int i = 1; i <= recordNum; i++) { iList.add(i); - list.add(bucket(extractor("", "a"), GenericRow.of(i,2,3))); + list.add(bucket(extractor("", "a"), GenericRow.of(i, 2, 3))); } System.out.println("origin: " + list.size()); @@ -109,8 +108,6 @@ private FixedBucketRowKeyExtractor extractor(String bk, String pk) { return extractor("", bk, pk); } - - private FixedBucketRowKeyExtractor extractor( String partK, String bk, String pk, int numBucket) { RowType rowType = diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala index 15a96d41dde44..fc87df81e63d1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala @@ -51,12 +51,10 @@ abstract class PaimonBaseScanBuilder(table: Table) * should be returned from the data source if and only if all of the filters match. That is, * filters must be interpreted as ANDed together. * - *

spark-sql INFO V2ScanRelationPushDown: - * Pushing operators to z_paimon_pk_table - * Pushed Filters: IsNotNull(dt), EqualTo(dt,2024-04-11T03:01:00Z) - * Post-Scan Filters: isnotnull(dt#3),(dt#3 = 2024-04-11 11:01:00) - * spark 侧输出的 Post-scan Filter 没问题是 spark 进行了 filter 的 translateFilterWithMapping, - * 之后又进行了 rebuildExpressionFromFilter 还原 + *

spark-sql INFO V2ScanRelationPushDown: Pushing operators to z_paimon_pk_table Pushed + * Filters: IsNotNull(dt), EqualTo(dt,2024-04-11T03:01:00Z) Post-Scan Filters: + * isnotnull(dt#3),(dt#3 = 2024-04-11 11:01:00) spark 侧输出的 Post-scan Filter 没问题是 spark 进行了 filter + * 的 translateFilterWithMapping, 之后又进行了 rebuildExpressionFromFilter 还原 */ override def pushFilters(filters: Array[Filter]): Array[Filter] = { // spark 传进来的 filter: Array[Filter] 就是转换为 utc 类型的. EqualTo(dt,2024-04-11T03:01:00Z) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AskwangSQLQueryTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AskwangSQLQueryTest.scala index cbe3795d4d666..4fab9cc7228f8 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AskwangSQLQueryTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AskwangSQLQueryTest.scala @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,17 +19,21 @@ package org.apache.paimon.spark.sql import org.apache.paimon.spark.PaimonSparkTestBase -import org.apache.spark.sql.SparkSession + +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.execution.SparkPlanner +import org.apache.spark.sql.internal.SQLConf /** - * @author askwang - * @date 2024/7/6 + * @author + * askwang + * @date + * 2024/7/6 */ -class AskwangSQLQueryTest extends PaimonSparkTestBase{ +class AskwangSQLQueryTest extends PaimonSparkTestBase { test("sql query with filter timestamp") { withTable("tb") { @@ -38,7 +41,8 @@ class AskwangSQLQueryTest extends PaimonSparkTestBase{ // spark.conf.set("spark.sql.planChangeLog.level", "INFO") spark.conf.set("spark.sql.datetime.java8API.enabled", "true") println("version: " + sparkVersion) - spark.sql(s"CREATE TABLE tb (id INT, dt TIMESTAMP) using paimon TBLPROPERTIES ('file.format'='parquet')") + spark.sql( + s"CREATE TABLE tb (id INT, dt TIMESTAMP) using paimon TBLPROPERTIES ('file.format'='parquet')") val ds = sql("INSERT INTO `tb` VALUES (1,cast(\"2024-04-11 11:01:00\" as Timestamp))") val data = sql("SELECT * FROM `tb` where dt ='2024-04-11 11:01:00' ") println(spark.conf.get("spark.sql.session.timeZone")) @@ -47,45 +51,22 @@ class AskwangSQLQueryTest extends PaimonSparkTestBase{ } } - // not - test("writ pk table with pk null int type") { + // int/long type pk field insert null failed. string is ok. + // new version fix this, pk filed check should not null. + test("writ pk table with pk null long/int type") { withTable("tb") { - spark.conf.set("spark.sql.datetime.java8API.enabled", "false") - spark.sql(s"CREATE TABLE tb (id INT, dt string) " + - s"using paimon " + - s"TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1')") - val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as int),cast(NULL as string))") - val data = sql("SELECT * FROM `tb`") - println(data.show()) - } - } + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "false") { + spark.sql( + s"CREATE TABLE tb (id long, dt string) " + + s"using paimon " + + s"TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1')") + // val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as long),cast(NULL as string))") - // ok - test("writ pk table with pk null string type") { - withTable("tb") { - spark.conf.set("spark.sql.datetime.java8API.enabled", "false") - spark.sql(s"CREATE TABLE tb (id string, dt string) " + - s"using paimon " + - s"TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1')") - val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as string),cast(NULL as string))") - val data = sql("SELECT * FROM `tb`") - println(data.show()) - } - } - - // not ok - test("writ pk table with pk null long type") { - withTable("tb") { - spark.conf.set("spark.sql.datetime.java8API.enabled", "false") - spark.sql(s"CREATE TABLE tb (id long, dt string) " + - s"using paimon " + - s"TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1')") -// val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as long),cast(NULL as string))") - - val query2 = "INSERT INTO `tb` VALUES (cast(NULL as long),cast(NULL as string))" - val query = "INSERT INTO `tb` VALUES (NULL, NULL)" + val query2 = "INSERT INTO `tb` VALUES (cast(NULL as long),cast(NULL as string))" + val query = "INSERT INTO `tb` VALUES (NULL, NULL)" - explainPlan(query, spark) + explainPlan(query, spark) + } } } @@ -113,5 +94,19 @@ class AskwangSQLQueryTest extends PaimonSparkTestBase{ (parser, analyzer, optimizer, planner) } + def showQueryExecutionPlanInfo(analyzedDF: DataFrame): Unit = { + val ana = analyzedDF.queryExecution.analyzed + println("== Analyzed Logical Plan ==") + println(ana) + // println( ana.prettyJson) + println("== Optimized Logical Plan ==") + val opt = analyzedDF.queryExecution.optimizedPlan + println(opt) + // println( opt.prettyJson) + println("== Physical Plan ==") + println(analyzedDF.queryExecution.sparkPlan) + println("== executedPlan ==") + println(analyzedDF.queryExecution.executedPlan) + } }