From be2904332ba010fc55a48788e2f0ae847cea93ed Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Wed, 9 Oct 2024 14:33:46 +0800 Subject: [PATCH] [core] Manifest and Statistic support query by tag name (#4291) --- docs/content/maintenance/system-tables.md | 11 ++++++++++ .../paimon/table/AbstractFileStoreTable.java | 10 ++++++++- .../paimon/table/system/ManifestsTable.java | 10 +++++++-- .../table/system/ManifestsTableTest.java | 21 +++++++++++++++++++ .../spark/sql/AnalyzeTableTestBase.scala | 21 +++++++++++++++++++ 5 files changed, 70 insertions(+), 3 deletions(-) diff --git a/docs/content/maintenance/system-tables.md b/docs/content/maintenance/system-tables.md index 9c8bfc295bd4..05a0c0db7324 100644 --- a/docs/content/maintenance/system-tables.md +++ b/docs/content/maintenance/system-tables.md @@ -272,6 +272,17 @@ SELECT * FROM my_table$manifests /*+ OPTIONS('scan.snapshot-id'='1') */; +--------------------------------+-------------+------------------+-------------------+---------------+ 1 rows in set */ + +- You can also query the manifest with specified tagName +SELECT * FROM my_table$manifests /*+ OPTIONS('scan.tag-name'='tag1') */; +/* ++--------------------------------+-------------+------------------+-------------------+---------------+ +| file_name | file_size | num_added_files | num_deleted_files | schema_id | ++--------------------------------+-------------+------------------+-------------------+---------------+ +| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 | ++--------------------------------+-------------+------------------+-------------------+---------------+ +1 rows in set +*/ ``` ### Aggregation fields Table diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 7682cdf7724a..6aacef6ed841 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -65,6 +65,7 @@ import org.apache.paimon.utils.SimpleFileReader; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.SnapshotNotExistException; +import org.apache.paimon.utils.StringUtils; import org.apache.paimon.utils.TagManager; import javax.annotation.Nullable; @@ -170,8 +171,15 @@ public Identifier identifier() { public Optional statistics() { Snapshot latestSnapshot; Long snapshotId = coreOptions().scanSnapshotId(); + String tagName = coreOptions().scanTagName(); + if (snapshotId == null) { - snapshotId = snapshotManager().latestSnapshotId(); + if (!StringUtils.isEmpty(tagName) && tagManager().tagExists(tagName)) { + return store().newStatsFileHandler() + .readStats(tagManager().tag(tagName).trimToSnapshot()); + } else { + snapshotId = snapshotManager().latestSnapshotId(); + } } if (snapshotId != null && snapshotManager().snapshotExists(snapshotId)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java index 30c5396d177d..4e07c6c58fcd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java @@ -46,6 +46,7 @@ import org.apache.paimon.utils.SerializationUtils; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.SnapshotNotExistException; +import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; @@ -198,6 +199,7 @@ private static List allManifests(FileStoreTable dataTable) { CoreOptions coreOptions = CoreOptions.fromMap(dataTable.options()); SnapshotManager snapshotManager = dataTable.snapshotManager(); Long snapshotId = coreOptions.scanSnapshotId(); + String tagName = coreOptions.scanTagName(); Snapshot snapshot = null; if (snapshotId != null) { // reminder user with snapshot id range @@ -210,8 +212,12 @@ private static List allManifests(FileStoreTable dataTable) { snapshotId, earliestSnapshotId, latestSnapshotId)); } snapshot = snapshotManager.snapshot(snapshotId); - } else if (snapshotId == null) { - snapshot = snapshotManager.latestSnapshot(); + } else { + if (!StringUtils.isEmpty(tagName) && dataTable.tagManager().tagExists(tagName)) { + snapshot = dataTable.tagManager().tag(tagName).trimToSnapshot(); + } else { + snapshot = snapshotManager.latestSnapshot(); + } } if (snapshot == null) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java index 32dd0d24888d..4125a3f83dfe 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java @@ -114,6 +114,27 @@ public void testReadManifestsFromSpecifiedSnapshot() throws Exception { assertThat(result).containsExactlyElementsOf(expectedRow); } + @Test + public void testReadManifestsFromSpecifiedTagName() throws Exception { + List expectedRow = getExpectedResult(1L); + table.createTag("tag1", 1L); + manifestsTable = + (ManifestsTable) + manifestsTable.copy( + Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "tag1")); + List result = read(manifestsTable); + assertThat(result).containsExactlyElementsOf(expectedRow); + + expectedRow = getExpectedResult(2L); + table.createTag("tag2", 2L); + manifestsTable = + (ManifestsTable) + manifestsTable.copy( + Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "tag2")); + result = read(manifestsTable); + assertThat(result).containsExactlyElementsOf(expectedRow); + } + @Test public void testReadManifestsFromNotExistSnapshot() throws Exception { manifestsTable = diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala index 21d43f77fddb..1ccf4e38ec4f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala @@ -91,6 +91,27 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS") + // create tag + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag5', snapshot => 5)"), + Row(true) :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag6', snapshot => 6)"), + Row(true) :: Nil) + + withSQLConf("spark.paimon.scan.tag-name" -> "test_tag5") { + checkAnswer( + sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"), + Row(2, 0, 2, "{ }")) + } + + withSQLConf("spark.paimon.scan.tag-name" -> "test_tag6") { + checkAnswer( + sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"), + Row(5, 0, 4, "{ }")) + } + withSQLConf("spark.paimon.scan.snapshot-id" -> "3") { checkAnswer( sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),