Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hzjhjjyy authored Jun 20, 2024
2 parents 104b7c9 + 737c343 commit bf260f0
Show file tree
Hide file tree
Showing 108 changed files with 2,546 additions and 402 deletions.
2 changes: 1 addition & 1 deletion docs/content/concepts/specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ CREATE TABLE my_table (
f1 STRING
);

INSERT INTO t VALUES (1, 11, '111');
INSERT INTO my_table VALUES (1, 11, '111');
```

Take a look to the disk:
Expand Down
18 changes: 18 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,24 @@ All available procedures are listed below.
CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
</td>
</tr>
<tr>
<td>expire_partitions</td>
<td>
CALL sys.expire_partitions(table, expiration_time, timestamp_formatter)<br/><br/>
</td>
<td>
To expire partitions. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</li>
<li>timestamp_formatter: the formatter to format timestamp from string.</li>
</td>
<td>
-- for Flink 1.18<br/><br/>
CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd')<br/><br/>
-- for Flink 1.19 and later<br/><br/>
CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')<br/><br/>
</td>
</tr>
<tr>
<td>repair</td>
<td>
Expand Down
4 changes: 4 additions & 0 deletions docs/content/flink/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
-- read the snapshot from specified timestamp in unix milliseconds
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- read the snapshot from specified timestamp string ,it will be automatically converted to timestamp in unix milliseconds
-- Supported formats include:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS, use default local time zone
SELECT * FROM t /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */;

-- read tag 'my-tag'
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

Expand Down
1 change: 0 additions & 1 deletion docs/content/primary-key-table/deletion-vectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,5 @@ By specifying `'deletion-vectors.enabled' = 'true'`, the Deletion Vectors mode c
## Limitation

- `changelog-producer` needs to be `none` or `lookup`.
- `changelog-producer.lookup-wait` can't be `false`.
- `merge-engine` can't be `first-row`, because the read of first-row is already no merging, deletion vectors are not needed.
- This mode will filter the data in level-0, so when using time travel to read `APPEND` snapshot, there will be data delay.
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
<td><p>Enum</p></td>
<td>Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads. This can be applied to tables with primary keys. <br /><br />Possible values:<ul><li>"none": No changelog file.</li><li>"input": Double write to a changelog file when flushing memory table, the changelog is from input.</li><li>"full-compaction": Generate changelog files with each full compaction.</li><li>"lookup": Generate changelog files through 'lookup' before committing the data writing.</li></ul></td>
</tr>
<tr>
<td><h5>changelog-producer.lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When changelog-producer is set to LOOKUP, commit will wait for changelog generation by lookup.</td>
</tr>
<tr>
<td><h5>changelog-producer.row-deduplicate</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -581,6 +587,12 @@
<td>String</td>
<td>Optional tag name used in case of "from-snapshot" scan mode.</td>
</tr>
<tr>
<td><h5>scan.timestamp</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Optional timestamp used in case of "from-timestamp" scan mode, it will be automatically converted to timestamp in unix milliseconds, use local time zone</td>
</tr>
<tr>
<td><h5>scan.timestamp-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>changelog-producer.lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When changelog-producer is set to LOOKUP, commit will wait for changelog generation by lookup.</td>
</tr>
<tr>
<td><h5>end-input.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
44 changes: 42 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.MathUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;
Expand All @@ -51,6 +52,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -553,6 +555,13 @@ public class CoreOptions implements Serializable {
.withDeprecatedKeys("log.scan")
.withDescription("Specify the scanning behavior of the source.");

public static final ConfigOption<String> SCAN_TIMESTAMP =
key("scan.timestamp")
.stringType()
.noDefaultValue()
.withDescription(
"Optional timestamp used in case of \"from-timestamp\" scan mode, it will be automatically converted to timestamp in unix milliseconds, use local time zone");

public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLIS =
key("scan.timestamp-millis")
.longType()
Expand Down Expand Up @@ -1194,6 +1203,17 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("Specifies the commit user prefix.");

public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_LOOKUP_WAIT =
key("changelog-producer.lookup-wait")
.booleanType()
.defaultValue(true)
.withDescription(
"When "
+ CoreOptions.CHANGELOG_PRODUCER.key()
+ " is set to "
+ ChangelogProducer.LOOKUP.name()
+ ", commit will wait for changelog generation by lookup.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1609,7 +1629,8 @@ public StartupMode startupMode() {
public static StartupMode startupMode(Options options) {
StartupMode mode = options.get(SCAN_MODE);
if (mode == StartupMode.DEFAULT) {
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()
|| options.getOptional(SCAN_TIMESTAMP).isPresent()) {
return StartupMode.FROM_TIMESTAMP;
} else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
|| options.getOptional(SCAN_TAG_NAME).isPresent()
Expand All @@ -1632,7 +1653,17 @@ public static StartupMode startupMode(Options options) {
}

public Long scanTimestampMills() {
return options.get(SCAN_TIMESTAMP_MILLIS);
String timestampStr = scanTimestamp();
Long timestampMillis = options.get(SCAN_TIMESTAMP_MILLIS);
if (timestampMillis == null && timestampStr != null) {
return DateTimeUtils.parseTimestampData(timestampStr, 3, TimeZone.getDefault())
.getMillisecond();
}
return timestampMillis;
}

public String scanTimestamp() {
return options.get(SCAN_TIMESTAMP);
}

public Long scanWatermark() {
Expand Down Expand Up @@ -1893,6 +1924,11 @@ public String recordLevelTimeField() {
return options.get(RECORD_LEVEL_TIME_FIELD);
}

public boolean prepareCommitWaitCompaction() {
return changelogProducer() == ChangelogProducer.LOOKUP
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down Expand Up @@ -2227,6 +2263,10 @@ public static void setDefaultValues(Options options) {
options.set(SCAN_MODE, StartupMode.FROM_TIMESTAMP);
}

if (options.contains(SCAN_TIMESTAMP) && !options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.FROM_TIMESTAMP);
}

if (options.contains(SCAN_FILE_CREATION_TIME_MILLIS) && !options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.FROM_FILE_CREATION_TIME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,10 @@ private FileIndexReader getFileIndexReader(
indexType,
FileIndexCommon.getFieldType(fields, columnName),
new Options())
.createReader(getBytesWithStartAndLength(startAndLength));
.createReader(
seekableInputStream,
startAndLength.getLeft(),
startAndLength.getRight());
}

private byte[] getBytesWithStartAndLength(Pair<Integer, Integer> startAndLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,33 @@
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.types.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;

/** Utils to check secondary index (e.g. bloom filter) predicate. */
public class FileIndexPredicate implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(FileIndexPredicate.class);

private final FileIndexFormat.Reader reader;
private final Map<String, FileIndexFieldPredicate> fieldPredicates = new HashMap<>();

@Nullable private Path path;

public FileIndexPredicate(Path path, FileIO fileIO, RowType fileRowType) throws IOException {
this(fileIO.newInputStream(path), fileRowType);
this.path = path;
}

public FileIndexPredicate(byte[] serializedBytes, RowType fileRowType) {
Expand All @@ -67,22 +74,13 @@ public boolean testPredicate(@Nullable Predicate filePredicate) {

Set<String> requredFieldNames = getRequiredNames(filePredicate);

List<FileIndexFieldPredicate> testWorkers =
requredFieldNames.stream()
.map(
cname ->
fieldPredicates.computeIfAbsent(
cname,
k ->
new FileIndexFieldPredicate(
cname,
reader.readColumnIndex(cname))))
.collect(Collectors.toList());

for (FileIndexFieldPredicate testWorker : testWorkers) {
if (!testWorker.test(filePredicate)) {
return false;
}
Map<String, Collection<FileIndexReader>> indexReaders = new HashMap<>();
requredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name)));
if (!new FileIndexPredicateTest(indexReaders).test(filePredicate).remain()) {
LOG.debug(
"One file has been filtered: "
+ (path == null ? "in scan stage" : path.toString()));
return false;
}
return true;
}
Expand Down Expand Up @@ -114,55 +112,62 @@ public void close() throws IOException {
}

/** Predicate test worker. */
private static class FileIndexFieldPredicate implements PredicateVisitor<Boolean> {
private static class FileIndexPredicateTest implements PredicateVisitor<FileIndexResult> {

private final String columnName;
private final Collection<FileIndexReader> fileIndexReaders;
private final Map<String, Collection<FileIndexReader>> columnIndexReaders;

public FileIndexFieldPredicate(
String columnName, Collection<FileIndexReader> fileIndexReaders) {
this.columnName = columnName;
this.fileIndexReaders = fileIndexReaders;
public FileIndexPredicateTest(Map<String, Collection<FileIndexReader>> fileIndexReaders) {
this.columnIndexReaders = fileIndexReaders;
}

public Boolean test(Predicate predicate) {
public FileIndexResult test(Predicate predicate) {
return predicate.visit(this);
}

@Override
public Boolean visit(LeafPredicate predicate) {
if (columnName.equals(predicate.fieldName())) {
FieldRef fieldRef =
new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
for (FileIndexReader fileIndexReader : fileIndexReaders) {
if (!predicate
.function()
.visit(fileIndexReader, fieldRef, predicate.literals())) {
return false;
}
public FileIndexResult visit(LeafPredicate predicate) {
FileIndexResult compoundResult = REMAIN;
FieldRef fieldRef =
new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
for (FileIndexReader fileIndexReader : columnIndexReaders.get(predicate.fieldName())) {
compoundResult =
compoundResult.and(
predicate
.function()
.visit(fileIndexReader, fieldRef, predicate.literals()));

if (!compoundResult.remain()) {
return compoundResult;
}
}
return true;
return compoundResult;
}

@Override
public Boolean visit(CompoundPredicate predicate) {

public FileIndexResult visit(CompoundPredicate predicate) {
if (predicate.function() instanceof Or) {
FileIndexResult compoundResult = null;
for (Predicate predicate1 : predicate.children()) {
if (predicate1.visit(this)) {
return true;
}
compoundResult =
compoundResult == null
? predicate1.visit(this)
: compoundResult.or(predicate1.visit(this));
}
return false;
return compoundResult == null ? REMAIN : compoundResult;

} else {
FileIndexResult compoundResult = null;
for (Predicate predicate1 : predicate.children()) {
if (!predicate1.visit(this)) {
return false;
compoundResult =
compoundResult == null
? predicate1.visit(this)
: compoundResult.and(predicate1.visit(this));
// if not remain, no need to test anymore
if (!compoundResult.remain()) {
return compoundResult;
}
}
return true;
return compoundResult == null ? REMAIN : compoundResult;
}
}
}
Expand Down
Loading

0 comments on commit bf260f0

Please sign in to comment.