Skip to content

Commit

Permalink
[core] Manifest and Statistic support query by tag name (#4291)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Oct 9, 2024
1 parent 7e43ff5 commit be29043
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 3 deletions.
11 changes: 11 additions & 0 deletions docs/content/maintenance/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ SELECT * FROM my_table$manifests /*+ OPTIONS('scan.snapshot-id'='1') */;
+--------------------------------+-------------+------------------+-------------------+---------------+
1 rows in set
*/

- You can also query the manifest with specified tagName
SELECT * FROM my_table$manifests /*+ OPTIONS('scan.tag-name'='tag1') */;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
1 rows in set
*/
```

### Aggregation fields Table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.paimon.utils.SimpleFileReader;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TagManager;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -170,8 +171,15 @@ public Identifier identifier() {
public Optional<Statistics> statistics() {
Snapshot latestSnapshot;
Long snapshotId = coreOptions().scanSnapshotId();
String tagName = coreOptions().scanTagName();

if (snapshotId == null) {
snapshotId = snapshotManager().latestSnapshotId();
if (!StringUtils.isEmpty(tagName) && tagManager().tagExists(tagName)) {
return store().newStatsFileHandler()
.readStats(tagManager().tag(tagName).trimToSnapshot());
} else {
snapshotId = snapshotManager().latestSnapshotId();
}
}

if (snapshotId != null && snapshotManager().snapshotExists(snapshotId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

Expand Down Expand Up @@ -198,6 +199,7 @@ private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) {
CoreOptions coreOptions = CoreOptions.fromMap(dataTable.options());
SnapshotManager snapshotManager = dataTable.snapshotManager();
Long snapshotId = coreOptions.scanSnapshotId();
String tagName = coreOptions.scanTagName();
Snapshot snapshot = null;
if (snapshotId != null) {
// reminder user with snapshot id range
Expand All @@ -210,8 +212,12 @@ private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) {
snapshotId, earliestSnapshotId, latestSnapshotId));
}
snapshot = snapshotManager.snapshot(snapshotId);
} else if (snapshotId == null) {
snapshot = snapshotManager.latestSnapshot();
} else {
if (!StringUtils.isEmpty(tagName) && dataTable.tagManager().tagExists(tagName)) {
snapshot = dataTable.tagManager().tag(tagName).trimToSnapshot();
} else {
snapshot = snapshotManager.latestSnapshot();
}
}

if (snapshot == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ public void testReadManifestsFromSpecifiedSnapshot() throws Exception {
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testReadManifestsFromSpecifiedTagName() throws Exception {
List<InternalRow> expectedRow = getExpectedResult(1L);
table.createTag("tag1", 1L);
manifestsTable =
(ManifestsTable)
manifestsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "tag1"));
List<InternalRow> result = read(manifestsTable);
assertThat(result).containsExactlyElementsOf(expectedRow);

expectedRow = getExpectedResult(2L);
table.createTag("tag2", 2L);
manifestsTable =
(ManifestsTable)
manifestsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_TAG_NAME.key(), "tag2"));
result = read(manifestsTable);
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testReadManifestsFromNotExistSnapshot() throws Exception {
manifestsTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,27 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")

// create tag
checkAnswer(
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag5', snapshot => 5)"),
Row(true) :: Nil)

checkAnswer(
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag6', snapshot => 6)"),
Row(true) :: Nil)

withSQLConf("spark.paimon.scan.tag-name" -> "test_tag5") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(2, 0, 2, "{ }"))
}

withSQLConf("spark.paimon.scan.tag-name" -> "test_tag6") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(5, 0, 4, "{ }"))
}

withSQLConf("spark.paimon.scan.snapshot-id" -> "3") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Expand Down

0 comments on commit be29043

Please sign in to comment.