Skip to content

Commit

Permalink
[hotfix] Improve spark procedures document format and duplicate code …
Browse files Browse the repository at this point in the history
…calls (#3716)
  • Loading branch information
zhuangchong authored Jul 11, 2024
1 parent 049232b commit ff64c8c
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 34 deletions.
16 changes: 7 additions & 9 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ This section introduce all available spark procedures about paimon.
<li>max_concurrent_jobs: when sort compact is used, files in one partition are grouped and submitted as a single spark compact job. This parameter controls the maximum number of jobs that can be submitted simultaneously. The default value is 15.</li>
</td>
<td>
SET spark.sql.shuffle.partitions=10; --set the compact parallelism <br/>
CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b') <br/>
SET spark.sql.shuffle.partitions=10; --set the compact parallelism <br/><br/>
CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')
</td>
</tr>
Expand Down Expand Up @@ -87,7 +87,7 @@ This section introduce all available spark procedures about paimon.
</td>
<td>
-- based on snapshot 10 with 1d <br/>
CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d') <br/>
CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d') <br/><br/>
-- based on the latest snapshot <br/>
CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
</td>
Expand All @@ -109,7 +109,7 @@ This section introduce all available spark procedures about paimon.
<li>version: id of the snapshot or name of tag that will roll back to.</li>
</td>
<td>
CALL sys.rollback(table => 'default.T', version => 'my_tag')<br/>
CALL sys.rollback(table => 'default.T', version => 'my_tag')<br/><br/>
CALL sys.rollback(table => 'default.T', version => 10)
</td>
</tr>
Expand Down Expand Up @@ -146,9 +146,7 @@ This section introduce all available spark procedures about paimon.
<li>database_or_table: empty or the target database name or the target table identifier, if you specify multiple tags, delimiter is ','</li>
</td>
<td>
CALL sys.repair('test_db.T')
</td>
<td>
CALL sys.repair('test_db.T')<br/><br/>
CALL sys.repair('test_db.T,test_db01,test_db.T2')
</td>
</tr>
Expand All @@ -162,8 +160,8 @@ This section introduce all available spark procedures about paimon.
<li>snapshot(Long): id of the snapshot which the new tag is based on.</li>
</td>
<td>
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch')<br/>
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')<br/>
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch')<br/><br/>
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')<br/><br/>
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', snapshot => 10)
</td>
</tr>
Expand Down
34 changes: 16 additions & 18 deletions paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.OptionalUtils;
Expand All @@ -38,6 +39,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.stream.Collectors;

/**
* Global stats, supports the following stats.
Expand Down Expand Up @@ -119,18 +121,16 @@ public Map<String, ColStats<?>> colStats() {
public void serializeFieldsToString(TableSchema schema) {
try {
if (colStats != null) {
Map<String, DataType> fields =
schema.fields().stream()
.collect(Collectors.toMap(DataField::name, DataField::type));
for (Map.Entry<String, ColStats<?>> entry : colStats.entrySet()) {
String colName = entry.getKey();
ColStats<?> colStats = entry.getValue();
DataType type =
schema.fields().stream()
.filter(field -> field.name().equals(colName))
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Unable to obtain the latest schema"))
.type();
DataType type = fields.get(colName);
if (type == null) {
throw new IllegalStateException("Unable to obtain the latest schema");
}
colStats.serializeFieldsToString(type);
}
}
Expand All @@ -142,18 +142,16 @@ public void serializeFieldsToString(TableSchema schema) {
public void deserializeFieldsFromString(TableSchema schema) {
try {
if (colStats != null) {
Map<String, DataType> fields =
schema.fields().stream()
.collect(Collectors.toMap(DataField::name, DataField::type));
for (Map.Entry<String, ColStats<?>> entry : colStats.entrySet()) {
String colName = entry.getKey();
ColStats<?> colStats = entry.getValue();
DataType type =
schema.fields().stream()
.filter(field -> field.name().equals(colName))
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Unable to obtain the latest schema"))
.type();
DataType type = fields.get(colName);
if (type == null) {
throw new IllegalStateException("Unable to obtain the latest schema");
}
colStats.deserializeFieldsFromString(type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public String writeStats(Statistics stats) {
public Optional<Statistics> readStats() {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
throw new IllegalStateException("Unable to obtain the latest schema");
throw new IllegalStateException("Unable to obtain the latest snapshot");
}
return readStats(latestSnapshotId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ public Path bucketPath(BinaryRow partition, int bucket) {

public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) {
String partitionPath = getPartitionString(partition);
if (partitionPath.isEmpty()) {
return new Path(BUCKET_PATH_PREFIX + bucket);
} else {
return new Path(getPartitionString(partition) + "/" + BUCKET_PATH_PREFIX + bucket);
}
String fullPath =
partitionPath.isEmpty()
? BUCKET_PATH_PREFIX + bucket
: partitionPath + "/" + BUCKET_PATH_PREFIX + bucket;
return new Path(fullPath);
}

/** IMPORTANT: This method is NOT THREAD SAFE. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static <T, U> Iterable<T> parallelismBatchIterable(
if (queueSize == null) {
queueSize = COMMON_IO_FORK_JOIN_POOL.getParallelism();
} else if (queueSize <= 0) {
throw new NegativeArraySizeException("queue size should not be negetive");
throw new NegativeArraySizeException("queue size should not be negative");
}

final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input, queueSize));
Expand Down

0 comments on commit ff64c8c

Please sign in to comment.