Skip to content

Commit

Permalink
[core] Sort some system table query result (apache#4306)
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang authored Oct 11, 2024
1 parent 9456654 commit c41d23c
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SerializationUtils;
Expand All @@ -52,9 +53,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -181,9 +184,24 @@ public RecordReader<InternalRow> createReader(Split split) {
new RowDataToObjectArrayConverter(
fileStoreTable.schema().logicalPartitionType());

// sorted by partition and bucket
List<Pair<String, BucketEntry>> bucketList =
buckets.stream()
.map(
entry ->
Pair.of(
Arrays.toString(
converter.convert(entry.partition())),
entry))
.sorted(
Comparator.comparing(
(Pair<String, BucketEntry> p) -> p.getLeft())
.thenComparing(p -> p.getRight().bucket()))
.collect(Collectors.toList());

List<InternalRow> results = new ArrayList<>(buckets.size());
for (BucketEntry entry : buckets) {
results.add(toRow(entry, converter));
for (Pair<String, BucketEntry> pair : bucketList) {
results.add(toRow(pair.getLeft(), pair.getRight()));
}

Iterator<InternalRow> iterator = results.iterator();
Expand All @@ -198,13 +216,9 @@ public RecordReader<InternalRow> createReader(Split split) {
return new IteratorRecordReader<>(iterator);
}

private GenericRow toRow(
BucketEntry entry, RowDataToObjectArrayConverter partitionConverter) {
BinaryString partitionId =
BinaryString.fromString(
Arrays.toString(partitionConverter.convert(entry.partition())));
private GenericRow toRow(String partStr, BucketEntry entry) {
return GenericRow.of(
partitionId,
BinaryString.fromString(partStr),
entry.bucket(),
entry.recordCount(),
entry.fileSizeInBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SerializationUtils;
Expand All @@ -53,9 +54,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -181,9 +184,21 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
new RowDataToObjectArrayConverter(
fileStoreTable.schema().logicalPartitionType());

// sorted by partition
List<Pair<String, PartitionEntry>> partitionList =
partitions.stream()
.map(
entry ->
Pair.of(
Arrays.toString(
converter.convert(entry.partition())),
entry))
.sorted(Comparator.comparing(Pair::getLeft))
.collect(Collectors.toList());

List<InternalRow> results = new ArrayList<>(partitions.size());
for (PartitionEntry entry : partitions) {
results.add(toRow(entry, converter));
for (Pair<String, PartitionEntry> pair : partitionList) {
results.add(toRow(pair.getLeft(), pair.getRight()));
}

Iterator<InternalRow> iterator = results.iterator();
Expand All @@ -198,13 +213,9 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
return new IteratorRecordReader<>(iterator);
}

private GenericRow toRow(
PartitionEntry entry, RowDataToObjectArrayConverter partitionConverter) {
BinaryString partitionId =
BinaryString.fromString(
Arrays.toString(partitionConverter.convert(entry.partition())));
private GenericRow toRow(String partStr, PartitionEntry entry) {
return GenericRow.of(
partitionId,
BinaryString.fromString(partStr),
entry.recordCount(),
entry.fileSizeInBytes(),
entry.fileCount(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -220,7 +220,7 @@ public RecordReader<InternalRow> createReader(Split split) {
LeafPredicate predicate = ((TagsSplit) split).tagName;
TagManager tagManager = new TagManager(fileIO, location, branch);

Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
Map<String, Tag> nameToSnapshot = new TreeMap<>();

if (predicate != null
&& predicate.function() instanceof Equal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -89,11 +91,12 @@ void testTagsTable() throws Exception {
}

private List<InternalRow> getExpectedResult() {
List<InternalRow> internalRows = new ArrayList<>();
Map<String, InternalRow> tagToRows = new TreeMap<>();
for (Pair<Tag, String> snapshot : tagManager.tagObjects()) {
Tag tag = snapshot.getKey();
String tagName = snapshot.getValue();
internalRows.add(
tagToRows.put(
tagName,
GenericRow.of(
BinaryString.fromString(tagName),
tag.id(),
Expand All @@ -109,6 +112,11 @@ private List<InternalRow> getExpectedResult() {
: BinaryString.fromString(
tag.getTagTimeRetained().toString())));
}

List<InternalRow> internalRows = new ArrayList<>();
for (Map.Entry<String, InternalRow> entry : tagToRows.entrySet()) {
internalRows.add(entry.getValue());
}
return internalRows;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.Row

class PaimonSystemTableTest extends PaimonSparkTestBase {

test("system table: sort tags table") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES(1, 'a')")

spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-02')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-01')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-04')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-03')")

checkAnswer(
spark.sql("select tag_name from `T$tags`"),
Row("2024-10-01") :: Row("2024-10-02") :: Row("2024-10-03") :: Row("2024-10-04") :: Nil)
}

test("system table: sort partitions table") {
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING,dt STRING,hh STRING)
|PARTITIONED BY (dt, hh)
|TBLPROPERTIES ('primary-key'='a,dt,hh', 'bucket' = '3')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES(1, 'a', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(3, 'c', '2024-10-10', '23')")
spark.sql("INSERT INTO T VALUES(2, 'b', '2024-10-10', '12')")
spark.sql("INSERT INTO T VALUES(5, 'f', '2024-10-09', '02')")
spark.sql("INSERT INTO T VALUES(4, 'd', '2024-10-09', '01')")

checkAnswer(spark.sql("select count(*) from `T$partitions`"), Row(5) :: Nil)
checkAnswer(
spark.sql("select partition from `T$partitions`"),
Row("[2024-10-09, 01]") :: Row("[2024-10-09, 02]") :: Row("[2024-10-10, 01]") :: Row(
"[2024-10-10, 12]") :: Row("[2024-10-10, 23]") :: Nil
)
}

test("system table: sort buckets table") {
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING,dt STRING,hh STRING)
|PARTITIONED BY (dt, hh)
|TBLPROPERTIES ('primary-key'='a,dt,hh', 'bucket' = '3')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES(1, 'a', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(2, 'b', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(3, 'c', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(4, 'd', '2024-10-10', '01')")
spark.sql("INSERT INTO T VALUES(5, 'f', '2024-10-10', '01')")

checkAnswer(spark.sql("select count(*) from `T$partitions`"), Row(1) :: Nil)
checkAnswer(
spark.sql("select partition,bucket from `T$buckets`"),
Row("[2024-10-10, 01]", 0) :: Row("[2024-10-10, 01]", 1) :: Row("[2024-10-10, 01]", 2) :: Nil)
}
}

0 comments on commit c41d23c

Please sign in to comment.