From ff64c8ca46c1e8593857eb15ef07c93045959b30 Mon Sep 17 00:00:00 2001
From: Kerwin <37063904+zhuangchong@users.noreply.github.com>
Date: Thu, 11 Jul 2024 09:53:27 +0800
Subject: [PATCH] [hotfix] Improve spark procedures document format and
duplicate code calls (#3716)
---
docs/content/spark/procedures.md | 16 ++++-----
.../org/apache/paimon/stats/Statistics.java | 34 +++++++++----------
.../apache/paimon/stats/StatsFileHandler.java | 2 +-
.../paimon/utils/FileStorePathFactory.java | 10 +++---
.../paimon/utils/ScanParallelExecutor.java | 2 +-
5 files changed, 30 insertions(+), 34 deletions(-)
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 43b82cb6c36d..d464881d6058 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -49,8 +49,8 @@ This section introduce all available spark procedures about paimon.
max_concurrent_jobs: when sort compact is used, files in one partition are grouped and submitted as a single spark compact job. This parameter controls the maximum number of jobs that can be submitted simultaneously. The default value is 15.
- SET spark.sql.shuffle.partitions=10; --set the compact parallelism
- CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b')
+ SET spark.sql.shuffle.partitions=10; --set the compact parallelism
+ CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b')
CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')
|
@@ -87,7 +87,7 @@ This section introduce all available spark procedures about paimon.
-- based on snapshot 10 with 1d
- CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d')
+ CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d')
-- based on the latest snapshot
CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
|
@@ -109,7 +109,7 @@ This section introduce all available spark procedures about paimon.
version: id of the snapshot or name of tag that will roll back to.
- CALL sys.rollback(table => 'default.T', version => 'my_tag')
+ CALL sys.rollback(table => 'default.T', version => 'my_tag')
CALL sys.rollback(table => 'default.T', version => 10)
|
@@ -146,9 +146,7 @@ This section introduce all available spark procedures about paimon.
database_or_table: empty or the target database name or the target table identifier, if you specify multiple tags, delimiter is ','
- CALL sys.repair('test_db.T')
- |
-
+ CALL sys.repair('test_db.T')
CALL sys.repair('test_db.T,test_db01,test_db.T2')
|
@@ -162,8 +160,8 @@ This section introduce all available spark procedures about paimon.
snapshot(Long): id of the snapshot which the new tag is based on.
- CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch')
- CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')
+ CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch')
+ CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', snapshot => 10)
|
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java b/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
index 5d94030443d7..32c3699c49f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
@@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.OptionalUtils;
@@ -38,6 +39,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
+import java.util.stream.Collectors;
/**
* Global stats, supports the following stats.
@@ -119,18 +121,16 @@ public Map> colStats() {
public void serializeFieldsToString(TableSchema schema) {
try {
if (colStats != null) {
+ Map fields =
+ schema.fields().stream()
+ .collect(Collectors.toMap(DataField::name, DataField::type));
for (Map.Entry> entry : colStats.entrySet()) {
String colName = entry.getKey();
ColStats> colStats = entry.getValue();
- DataType type =
- schema.fields().stream()
- .filter(field -> field.name().equals(colName))
- .findFirst()
- .orElseThrow(
- () ->
- new IllegalStateException(
- "Unable to obtain the latest schema"))
- .type();
+ DataType type = fields.get(colName);
+ if (type == null) {
+ throw new IllegalStateException("Unable to obtain the latest schema");
+ }
colStats.serializeFieldsToString(type);
}
}
@@ -142,18 +142,16 @@ public void serializeFieldsToString(TableSchema schema) {
public void deserializeFieldsFromString(TableSchema schema) {
try {
if (colStats != null) {
+ Map fields =
+ schema.fields().stream()
+ .collect(Collectors.toMap(DataField::name, DataField::type));
for (Map.Entry> entry : colStats.entrySet()) {
String colName = entry.getKey();
ColStats> colStats = entry.getValue();
- DataType type =
- schema.fields().stream()
- .filter(field -> field.name().equals(colName))
- .findFirst()
- .orElseThrow(
- () ->
- new IllegalStateException(
- "Unable to obtain the latest schema"))
- .type();
+ DataType type = fields.get(colName);
+ if (type == null) {
+ throw new IllegalStateException("Unable to obtain the latest schema");
+ }
colStats.deserializeFieldsFromString(type);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
index 0ca23fad5d96..f9e057c7cbb3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
@@ -56,7 +56,7 @@ public String writeStats(Statistics stats) {
public Optional readStats() {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
- throw new IllegalStateException("Unable to obtain the latest schema");
+ throw new IllegalStateException("Unable to obtain the latest snapshot");
}
return readStats(latestSnapshotId);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 7696f9adaa46..a49e6dbc41c0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -106,11 +106,11 @@ public Path bucketPath(BinaryRow partition, int bucket) {
public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) {
String partitionPath = getPartitionString(partition);
- if (partitionPath.isEmpty()) {
- return new Path(BUCKET_PATH_PREFIX + bucket);
- } else {
- return new Path(getPartitionString(partition) + "/" + BUCKET_PATH_PREFIX + bucket);
- }
+ String fullPath =
+ partitionPath.isEmpty()
+ ? BUCKET_PATH_PREFIX + bucket
+ : partitionPath + "/" + BUCKET_PATH_PREFIX + bucket;
+ return new Path(fullPath);
}
/** IMPORTANT: This method is NOT THREAD SAFE. */
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
index 969dbbf9d224..6bf6b351793e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
@@ -45,7 +45,7 @@ public static Iterable parallelismBatchIterable(
if (queueSize == null) {
queueSize = COMMON_IO_FORK_JOIN_POOL.getParallelism();
} else if (queueSize <= 0) {
- throw new NegativeArraySizeException("queue size should not be negetive");
+ throw new NegativeArraySizeException("queue size should not be negative");
}
final Queue> stack = new ArrayDeque<>(Lists.partition(input, queueSize));