From c41d23c47fd96fe2b1df131ccea7f8e82f45b53b Mon Sep 17 00:00:00 2001 From: askwang <135721692+askwang@users.noreply.github.com> Date: Fri, 11 Oct 2024 15:52:04 +0800 Subject: [PATCH] [core] Sort some system table query result (#4306) --- .../paimon/table/system/BucketsTable.java | 30 +++++-- .../paimon/table/system/PartitionsTable.java | 27 ++++-- .../apache/paimon/table/system/TagsTable.java | 4 +- .../paimon/table/system/TagsTableTest.java | 12 ++- .../spark/sql/PaimonSystemTableTest.scala | 84 +++++++++++++++++++ 5 files changed, 137 insertions(+), 20 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java index 0ac42485068c..ccc260ef0b79 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java @@ -40,6 +40,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SerializationUtils; @@ -52,9 +53,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -181,9 +184,24 @@ public RecordReader createReader(Split split) { new RowDataToObjectArrayConverter( fileStoreTable.schema().logicalPartitionType()); + // sorted by partition and bucket + List> bucketList = + buckets.stream() + .map( + entry -> + Pair.of( + Arrays.toString( + converter.convert(entry.partition())), + entry)) + .sorted( + Comparator.comparing( + (Pair p) -> p.getLeft()) + .thenComparing(p -> p.getRight().bucket())) + .collect(Collectors.toList()); + List results = new ArrayList<>(buckets.size()); - for (BucketEntry entry : buckets) { - results.add(toRow(entry, converter)); + for (Pair pair : bucketList) { + results.add(toRow(pair.getLeft(), pair.getRight())); } Iterator iterator = results.iterator(); @@ -198,13 +216,9 @@ public RecordReader createReader(Split split) { return new IteratorRecordReader<>(iterator); } - private GenericRow toRow( - BucketEntry entry, RowDataToObjectArrayConverter partitionConverter) { - BinaryString partitionId = - BinaryString.fromString( - Arrays.toString(partitionConverter.convert(entry.partition()))); + private GenericRow toRow(String partStr, BucketEntry entry) { return GenericRow.of( - partitionId, + BinaryString.fromString(partStr), entry.bucket(), entry.recordCount(), entry.fileSizeInBytes(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index 5e966eabd9a7..736347b9aff3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -40,6 +40,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SerializationUtils; @@ -53,9 +54,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -181,9 +184,21 @@ public RecordReader createReader(Split split) throws IOException { new RowDataToObjectArrayConverter( fileStoreTable.schema().logicalPartitionType()); + // sorted by partition + List> partitionList = + partitions.stream() + .map( + entry -> + Pair.of( + Arrays.toString( + converter.convert(entry.partition())), + entry)) + .sorted(Comparator.comparing(Pair::getLeft)) + .collect(Collectors.toList()); + List results = new ArrayList<>(partitions.size()); - for (PartitionEntry entry : partitions) { - results.add(toRow(entry, converter)); + for (Pair pair : partitionList) { + results.add(toRow(pair.getLeft(), pair.getRight())); } Iterator iterator = results.iterator(); @@ -198,13 +213,9 @@ public RecordReader createReader(Split split) throws IOException { return new IteratorRecordReader<>(iterator); } - private GenericRow toRow( - PartitionEntry entry, RowDataToObjectArrayConverter partitionConverter) { - BinaryString partitionId = - BinaryString.fromString( - Arrays.toString(partitionConverter.convert(entry.partition()))); + private GenericRow toRow(String partStr, PartitionEntry entry) { return GenericRow.of( - partitionId, + BinaryString.fromString(partStr), entry.recordCount(), entry.fileSizeInBytes(), entry.fileCount(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index abce3f33027e..dd7335e38564 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -59,11 +59,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.TreeMap; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -220,7 +220,7 @@ public RecordReader createReader(Split split) { LeafPredicate predicate = ((TagsSplit) split).tagName; TagManager tagManager = new TagManager(fileIO, location, branch); - Map nameToSnapshot = new LinkedHashMap<>(); + Map nameToSnapshot = new TreeMap<>(); if (predicate != null && predicate.function() instanceof Equal diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java index 8f8029cde785..7ee8cd53d75c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java @@ -40,6 +40,8 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import static org.assertj.core.api.Assertions.assertThat; @@ -89,11 +91,12 @@ void testTagsTable() throws Exception { } private List getExpectedResult() { - List internalRows = new ArrayList<>(); + Map tagToRows = new TreeMap<>(); for (Pair snapshot : tagManager.tagObjects()) { Tag tag = snapshot.getKey(); String tagName = snapshot.getValue(); - internalRows.add( + tagToRows.put( + tagName, GenericRow.of( BinaryString.fromString(tagName), tag.id(), @@ -109,6 +112,11 @@ private List getExpectedResult() { : BinaryString.fromString( tag.getTagTimeRetained().toString()))); } + + List internalRows = new ArrayList<>(); + for (Map.Entry entry : tagToRows.entrySet()) { + internalRows.add(entry.getValue()); + } return internalRows; } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala new file mode 100644 index 000000000000..64baf6232fd8 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +class PaimonSystemTableTest extends PaimonSparkTestBase { + + test("system table: sort tags table") { + spark.sql(s""" + |CREATE TABLE T (id STRING, name STRING) + |USING PAIMON + |""".stripMargin) + + spark.sql(s"INSERT INTO T VALUES(1, 'a')") + + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-02')") + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-01')") + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-04')") + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-03')") + + checkAnswer( + spark.sql("select tag_name from `T$tags`"), + Row("2024-10-01") :: Row("2024-10-02") :: Row("2024-10-03") :: Row("2024-10-04") :: Nil) + } + + test("system table: sort partitions table") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING,dt STRING,hh STRING) + |PARTITIONED BY (dt, hh) + |TBLPROPERTIES ('primary-key'='a,dt,hh', 'bucket' = '3') + |""".stripMargin) + + spark.sql("INSERT INTO T VALUES(1, 'a', '2024-10-10', '01')") + spark.sql("INSERT INTO T VALUES(3, 'c', '2024-10-10', '23')") + spark.sql("INSERT INTO T VALUES(2, 'b', '2024-10-10', '12')") + spark.sql("INSERT INTO T VALUES(5, 'f', '2024-10-09', '02')") + spark.sql("INSERT INTO T VALUES(4, 'd', '2024-10-09', '01')") + + checkAnswer(spark.sql("select count(*) from `T$partitions`"), Row(5) :: Nil) + checkAnswer( + spark.sql("select partition from `T$partitions`"), + Row("[2024-10-09, 01]") :: Row("[2024-10-09, 02]") :: Row("[2024-10-10, 01]") :: Row( + "[2024-10-10, 12]") :: Row("[2024-10-10, 23]") :: Nil + ) + } + + test("system table: sort buckets table") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING,dt STRING,hh STRING) + |PARTITIONED BY (dt, hh) + |TBLPROPERTIES ('primary-key'='a,dt,hh', 'bucket' = '3') + |""".stripMargin) + + spark.sql("INSERT INTO T VALUES(1, 'a', '2024-10-10', '01')") + spark.sql("INSERT INTO T VALUES(2, 'b', '2024-10-10', '01')") + spark.sql("INSERT INTO T VALUES(3, 'c', '2024-10-10', '01')") + spark.sql("INSERT INTO T VALUES(4, 'd', '2024-10-10', '01')") + spark.sql("INSERT INTO T VALUES(5, 'f', '2024-10-10', '01')") + + checkAnswer(spark.sql("select count(*) from `T$partitions`"), Row(1) :: Nil) + checkAnswer( + spark.sql("select partition,bucket from `T$buckets`"), + Row("[2024-10-10, 01]", 0) :: Row("[2024-10-10, 01]", 1) :: Row("[2024-10-10, 01]", 2) :: Nil) + } +}