Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hzjhjjyy authored Jul 19, 2024
2 parents fbf2611 + b7e11d7 commit b8dd28d
Show file tree
Hide file tree
Showing 95 changed files with 3,156 additions and 248 deletions.
2 changes: 1 addition & 1 deletion docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ All available procedures are listed below.
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</li>
<li>timestamp_formatter: the formatter to format timestamp from string.</li>
<li>expire_strategy: the expiration strategy for partition expiration.</li>
<li>expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.</li>
</td>
<td>
-- for Flink 1.18<br/><br/>
Expand Down
4 changes: 2 additions & 2 deletions docs/content/maintenance/manage-branches.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ You can read or write with branch as below.
```sql
-- read from branch 'branch1'
SELECT * FROM t /*+ OPTIONS('branch' = 'branch1') */;
SELECT * FROM `t$branch_branch1`;

-- write to branch 'branch1'
INSERT INTO t /*+ OPTIONS('branch' = 'branch1') */ SELECT ...
INSERT INTO `t$branch_branch1` SELECT ...
```
{{< /tab >}}
Expand Down
18 changes: 18 additions & 0 deletions docs/content/maintenance/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,24 @@ SELECT * FROM my_table$tags;
*/
```

### Branches Table

You can query the branches of the table.

```sql
SELECT * FROM my_table$branches;

/*
+----------------------+---------------------------+--------------------------+-------------------------+
| branch_name | created_from_tag | created_from_snapshot | create_time |
+----------------------+---------------------------+--------------------------+-------------------------+
| branch1 | tag1 | 2 | 2024-07-18 20:31:39.084 |
| branch2 | tag2 | 5 | 2024-07-18 21:11:14.373 |
+----------------------+---------------------------+--------------------------+-------------------------+
2 rows in set
*/
```

### Consumers Table

You can query all consumers which contains next snapshot.
Expand Down
5 changes: 1 addition & 4 deletions docs/content/primary-key-table/compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,12 @@ You can use the following strategies for your table:
```shell
num-sorted-run.stop-trigger = 2147483647
sort-spill-threshold = 10
changelog-producer.lookup-wait = false
lookup-wait = false
```

This configuration will generate more files during peak write periods and gradually merge into optimal read
performance during low write periods.

In the case of `'changelog-producer' = 'lookup'`, by default, the lookup will be completed at checkpointing, which
will block the checkpoint. So if you want an asynchronous lookup, you should also set `'changelog-producer.lookup-wait' = 'false'`.

## Dedicated compaction job

In general, if you expect multiple jobs to be written to the same table, you need to separate the compaction. You can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ but only returns input records.)
By default, Partial update can not accept delete records, you can choose one of the following solutions:

- Configure 'ignore-delete' to ignore delete records.
- Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}

Expand Down
3 changes: 2 additions & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ This section introduce all available spark procedures about paimon.
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</li>
<li>timestamp_formatter: the formatter to format timestamp from string.</li>
<li>expire_strategy: specifies the expiration strategy for partition expiration, possible values: 'values-time' or 'update-time' , 'values-time' as default.</li>
</td>
<td>CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')</td>
<td>CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')</td>
</tr>
<tr>
<td>create_tag</td>
Expand Down
15 changes: 14 additions & 1 deletion docs/content/spark/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,12 @@ All Spark's data types are available in package `org.apache.spark.sql.types`.
</tr>
<tr>
<td><code>TimestampType</code></td>
<td><code>TimestampType</code>, <code>LocalZonedTimestamp</code></td>
<td><code>LocalZonedTimestamp</code></td>
<td>true</td>
</tr>
<tr>
<td><code>TimestampNTZType(Spark3.4+)</code></td>
<td><code>TimestampType</code></td>
<td>true</td>
</tr>
<tr>
Expand All @@ -325,3 +330,11 @@ All Spark's data types are available in package `org.apache.spark.sql.types`.
</tr>
</tbody>
</table>

{{< hint warning >}}
Due to the previous design, in Spark3.3 and below, Paimon will map both Paimon's TimestampType and LocalZonedTimestamp to Spark's TimestampType, and only correctly handle with TimestampType.

Therefore, when using Spark3.3 and below, reads Paimon table with LocalZonedTimestamp type written by other engines, such as Flink, the query result of LocalZonedTimestamp type will have time zone offset, which needs to be adjusted manually.

When using Spark3.4 and above, all timestamp types can be parsed correctly.
{{< /hint >}}
30 changes: 24 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@
<td><p>Enum</p></td>
<td>Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads. This can be applied to tables with primary keys. <br /><br />Possible values:<ul><li>"none": No changelog file.</li><li>"input": Double write to a changelog file when flushing memory table, the changelog is from input.</li><li>"full-compaction": Generate changelog files with each full compaction.</li><li>"lookup": Generate changelog files through 'lookup' before committing the data writing.</li></ul></td>
</tr>
<tr>
<td><h5>changelog-producer.lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When changelog-producer is set to LOOKUP, commit will wait for changelog generation by lookup.</td>
</tr>
<tr>
<td><h5>changelog-producer.row-deduplicate</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -194,6 +188,12 @@
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.</td>
</tr>
<tr>
<td><h5>delete-file.thread-num</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</td>
</tr>
<tr>
<td><h5>delete.force-produce-changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -339,6 +339,12 @@
<td>Integer</td>
<td>The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.</td>
</tr>
<tr>
<td><h5>lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When need to lookup, commit will wait for compaction by lookup.</td>
</tr>
<tr>
<td><h5>lookup.cache-file-retention</h5></td>
<td style="word-wrap: break-word;">1 h</td>
Expand Down Expand Up @@ -381,6 +387,12 @@
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
<tr>
<td><h5>lookup.local-file-type</h5></td>
<td style="word-wrap: break-word;">hash</td>
<td><p>Enum</p></td>
<td>The local file type for lookup.<br /><br />Possible values:<ul><li>"sort": Construct a sorted file for lookup.</li><li>"hash": Construct a hash file for lookup.</li></ul></td>
</tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
Expand Down Expand Up @@ -479,6 +491,12 @@
<td>Integer</td>
<td>Turn off the dictionary encoding for all fields in parquet.</td>
</tr>
<tr>
<td><h5>partial-update.remove-record-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to remove the whole row in partial-update engine when -D records are received.</td>
</tr>
<tr>
<td><h5>partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
75 changes: 66 additions & 9 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,14 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

@Immutable
public static final ConfigOption<Boolean> PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
key("partial-update.remove-record-on-delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to remove the whole row in partial-update engine when -D records are received.");

@Immutable
public static final ConfigOption<String> ROWKIND_FIELD =
key("rowkind.field")
Expand Down Expand Up @@ -801,6 +809,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Define partition by table options, cannot define partition on DDL and table options at the same time.");

public static final ConfigOption<LookupLocalFileType> LOOKUP_LOCAL_FILE_TYPE =
key("lookup.local-file-type")
.enumType(LookupLocalFileType.class)
.defaultValue(LookupLocalFileType.HASH)
.withDescription("The local file type for lookup.");

public static final ConfigOption<Float> LOOKUP_HASH_LOAD_FACTOR =
key("lookup.hash-load-factor")
.floatType()
Expand Down Expand Up @@ -1265,16 +1279,13 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("Specifies the commit user prefix.");

public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_LOOKUP_WAIT =
key("changelog-producer.lookup-wait")
public static final ConfigOption<Boolean> LOOKUP_WAIT =
key("lookup-wait")
.booleanType()
.defaultValue(true)
.withFallbackKeys("changelog-producer.lookup-wait")
.withDescription(
"When "
+ CoreOptions.CHANGELOG_PRODUCER.key()
+ " is set to "
+ ChangelogProducer.LOOKUP.name()
+ ", commit will wait for changelog generation by lookup.");
"When need to lookup, commit will wait for compaction by lookup.");

public static final ConfigOption<Boolean> METADATA_ICEBERG_COMPATIBLE =
key("metadata.iceberg-compatible")
Expand All @@ -1284,6 +1295,14 @@ public class CoreOptions implements Serializable {
"When set to true, produce Iceberg metadata after a snapshot is committed, "
+ "so that Iceberg readers can read Paimon's raw files.");

public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
key("delete-file.thread-num")
.intType()
.noDefaultValue()
.withDescription(
"The maximum number of concurrent deleting files. "
+ "By default is the number of processors available to the Java virtual machine.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1508,6 +1527,11 @@ public boolean cleanEmptyDirectories() {
return options.get(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES);
}

public int deleteFileThreadNum() {
return options.getOptional(DELETE_FILE_THREAD_NUM)
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
}

public ExpireConfig expireConfig() {
return ExpireConfig.builder()
.snapshotRetainMax(snapshotNumRetainMax())
Expand Down Expand Up @@ -1603,6 +1627,10 @@ public int cachePageSize() {
return (int) options.get(CACHE_PAGE_SIZE).getBytes();
}

public LookupLocalFileType lookupLocalFileType() {
return options.get(LOOKUP_LOCAL_FILE_TYPE);
}

public MemorySize lookupCacheMaxMemory() {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
}
Expand Down Expand Up @@ -2005,8 +2033,11 @@ public String recordLevelTimeField() {
}

public boolean prepareCommitWaitCompaction() {
return changelogProducer() == ChangelogProducer.LOOKUP
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
if (!needLookup()) {
return false;
}

return options.get(LOOKUP_WAIT);
}

public boolean metadataIcebergCompatible() {
Expand Down Expand Up @@ -2595,4 +2626,30 @@ public InlineElement getDescription() {
return text(description);
}
}

/** Specifies the local file type for lookup. */
public enum LookupLocalFileType implements DescribedEnum {
SORT("sort", "Construct a sorted file for lookup."),

HASH("hash", "Construct a hash file for lookup.");

private final String value;

private final String description;

LookupLocalFileType(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@
/** Implementation of {@link BlockCompressionFactory} for airlift compressors. */
public class AirCompressorFactory implements BlockCompressionFactory {

private final BlockCompressionType type;
private final Compressor internalCompressor;
private final Decompressor internalDecompressor;

public AirCompressorFactory(Compressor internalCompressor, Decompressor internalDecompressor) {
public AirCompressorFactory(
BlockCompressionType type,
Compressor internalCompressor,
Decompressor internalDecompressor) {
this.type = type;
this.internalCompressor = internalCompressor;
this.internalDecompressor = internalDecompressor;
}

@Override
public BlockCompressionType getCompressionType() {
return type;
}

@Override
public BlockCompressor getCompressor() {
return new AirBlockCompressor(internalCompressor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
public interface BlockCompressionFactory {

BlockCompressionType getCompressionType();

BlockCompressor getCompressor();

BlockDecompressor getDecompressor();
Expand All @@ -39,12 +41,31 @@ static BlockCompressionFactory create(String compression) {
switch (compression.toUpperCase()) {
case "NONE":
return null;
case "ZSTD":
return new ZstdBlockCompressionFactory();
case "LZ4":
return new Lz4BlockCompressionFactory();
case "LZO":
return new AirCompressorFactory(new LzoCompressor(), new LzoDecompressor());
case "ZSTD":
return new AirCompressorFactory(
BlockCompressionType.LZO, new LzoCompressor(), new LzoDecompressor());
default:
throw new IllegalStateException("Unknown CompressionMethod " + compression);
}
}

/** Creates {@link BlockCompressionFactory} according to the {@link BlockCompressionType}. */
@Nullable
static BlockCompressionFactory create(BlockCompressionType compression) {
switch (compression) {
case NONE:
return null;
case ZSTD:
return new ZstdBlockCompressionFactory();
case LZ4:
return new Lz4BlockCompressionFactory();
case LZO:
return new AirCompressorFactory(
BlockCompressionType.LZO, new LzoCompressor(), new LzoDecompressor());
default:
throw new IllegalStateException("Unknown CompressionMethod " + compression);
}
Expand Down
Loading

0 comments on commit b8dd28d

Please sign in to comment.