Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] fix bug of watermark override with append only table #3865

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/concepts/spec/snapshot.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ Snapshot File is JSON, it includes:
12. totalRecordCount: record count of all changes occurred in this snapshot.
13. deltaRecordCount: record count of all new changes occurred in this snapshot.
14. changelogRecordCount: record count of all changelog produced in this snapshot.
15. watermark: watermark for input records, from Flink watermark mechanism, null if there is no watermark.
15. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark.
16. statistics: stats file name for statistics of this table.
6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Bucket number for file store.<br />It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).</td>
<td>Bucket number for file store.<br />It should either be equal to -1 (dynamic bucket mode or unaware bucket mode), or it must be greater than 0 (fixed bucket mode).</td>
</tr>
<tr>
<td><h5>bucket-key</h5></td>
Expand Down Expand Up @@ -742,7 +742,7 @@
<td><h5>source.split.target-size</h5></td>
<td style="word-wrap: break-word;">128 mb</td>
<td>MemorySize</td>
<td>Target size of a source split when scanning a bucket.</td>
<td>Target size of a source split when scanning a partition (unaware bucket mode) or bucket (non unaware bucket mode).</td>
</tr>
<tr>
<td><h5>spill-compression</h5></td>
Expand Down Expand Up @@ -856,7 +856,7 @@
<td><h5>write-max-writers-to-spill</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. </td>
<td>When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory.</td>
</tr>
<tr>
<td><h5>write-only</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class CoreOptions implements Serializable {
.text("Bucket number for file store.")
.linebreak()
.text(
"It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).")
"It should either be equal to -1 (dynamic bucket mode or unaware bucket mode), or it must be greater than 0 (fixed bucket mode).")
.build());

@Immutable
Expand Down Expand Up @@ -368,7 +368,8 @@ public class CoreOptions implements Serializable {
key("source.split.target-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription("Target size of a source split when scanning a bucket.");
.withDescription(
"Target size of a source split when scanning a partition (unaware bucket mode) or bucket (non unaware bucket mode).");

public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
key("source.split.open-file-cost")
Expand Down Expand Up @@ -412,7 +413,7 @@ public class CoreOptions implements Serializable {
.intType()
.defaultValue(5)
.withDescription(
"When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. ");
"When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory.");

public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
key("write-manifest-cache")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public interface Filter<T> {
Filter<?> ALWAYS_TRUE = t -> true;

/**
* Evaluates this predicate on the given argument.
* Evaluates this filter on the given argument.
*
* @param t the input argument
* @return {@code true} if the input argument matches the predicate, otherwise {@code false}
* @return {@code true} if the input argument matches the filter, otherwise {@code false}
*/
boolean test(T t);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void testSetAndGet() throws IOException, ClassNotFoundException {

@Test
public void testWriter() {

int arity = 13;
BinaryRow row = new BinaryRow(arity);
BinaryRowWriter writer = new BinaryRowWriter(row, 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public AppendOnlyFileStoreWrite newWrite(
}

private AppendOnlyFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
ScanBucketFilter bucketKeyFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
public void pushdown(Predicate predicate) {
Expand All @@ -132,7 +132,7 @@ public void pushdown(Predicate predicate) {

return new AppendOnlyFileStoreScan(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager(),
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
}

private KeyValueFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
ScanBucketFilter bucketKeyFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
public void pushdown(Predicate keyFilter) {
Expand All @@ -224,7 +224,7 @@ public void pushdown(Predicate keyFilter) {
};
return new KeyValueFileStoreScan(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager(),
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final FileIndexOptions fileIndexOptions;

private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;
private final MemorySize maxDiskSize;

public AppendOnlyWriter(
FileIO fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds());
}

public RecordLevelExpire(int timeField, int expireTime) {
private RecordLevelExpire(int timeField, int expireTime) {
this.timeField = timeField;
this.expireTime = expireTime;
}
Expand All @@ -78,7 +78,7 @@ public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactor
return file -> wrap(readerFactory.createRecordReader(file));
}

public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(
kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public String toString() {
*/
public static Filter<InternalRow> createCacheRowFilter(
@Nullable ManifestCacheFilter manifestCacheFilter, int numOfBuckets) {
// manifestCacheFilter is not null only when write
if (manifestCacheFilter == null) {
return Filter.alwaysTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
@Nullable Integer manifestReadParallelism)
throws Exception {
// 1. should trigger full compaction

List<ManifestFileMeta> base = new ArrayList<>();
long totalManifestSize = 0;
int i = 0;
Expand Down Expand Up @@ -276,14 +275,12 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2. do full compaction

LOG.info(
"Start Manifest File Full Compaction, pick the number of delete file: {}, total manifest file size: {}",
deltaDeleteFileNum,
totalManifestSize);

// 2.1. try to skip base files by partition filter

Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
FileEntry.mergeEntries(manifestFile, delta, deltaMerged, manifestReadParallelism);

Expand Down Expand Up @@ -317,7 +314,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2.2. try to skip base files by reading entries

Set<Identifier> deleteEntries = new HashSet<>();
deltaMerged.forEach(
(k, v) -> {
Expand Down Expand Up @@ -349,7 +345,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2.3. merge

RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
manifestFile.createRollingWriter();
Exception exception = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan() {
List<ManifestEntry> files = new ArrayList<>();
long skippedByPartitionAndStats = startDataFiles - mergedEntries.size();
for (ManifestEntry file : mergedEntries) {
// checkNumOfBuckets is true only when write
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
Expand Down Expand Up @@ -487,6 +488,7 @@ private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
.read(
manifest.fileName(),
manifest.fileSize(),
// ManifestEntry#createCacheRowFilter alway return true when read
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
partitionFilter, bucketFilter, fileNameFilter, numOfBuckets));
Expand All @@ -502,6 +504,7 @@ private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) {
// use filter for ManifestEntry
// currently, projection is not pushed down to file format
// see SimpleFileEntrySerializer
// ManifestEntry#createCacheRowFilter alway return true when read
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
partitionFilter, bucketFilter, fileNameFilter, numOfBuckets));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {

public AppendOnlyFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketFilter,
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
Expand All @@ -65,7 +65,7 @@ public AppendOnlyFileStoreScan(
boolean fileIndexReadEnabled) {
super(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager,
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {

public KeyValueFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketFilter,
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
Expand All @@ -66,7 +66,7 @@ public KeyValueFileStoreScan(
MergeEngine mergeEngine) {
super(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager,
schemaManager,
schema,
Expand Down Expand Up @@ -152,8 +152,8 @@ private List<ManifestEntry> filterWholeBucketPerFile(List<ManifestEntry> entries
}

private List<ManifestEntry> filterWholeBucketAllFiles(List<ManifestEntry> entries) {
// entries come from the same bucket, if any of it doesn't meet the request, we could
// filter the bucket.
// entries come from the same bucket, if any of it meet the request, we could
// filter the whole bucket.
for (ManifestEntry entry : entries) {
if (filterByValueFilter(entry)) {
return entries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
// filter: value = 1
// if we perform filter push down on values, data file 1 will be chosen, but data
// file 2 will be ignored, and the final result will be key = a, value = 1 while the
// correct result is an empty set
// correct result is an empty set.
List<Predicate> keyFilters =
pickTransformFieldMapping(
splitAnd(predicate),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private List<T> readWithIOException(
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
throws IOException {
// cache is alway null when read
if (cache != null) {
return cache.read(fileName, fileSize, loadFilter, readFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected ManifestFile createManifestFile(String pathStr) {
path,
getPartitionType(),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString()),
CoreOptions.FILE_FORMAT.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down Expand Up @@ -166,7 +166,7 @@ protected void assertSameContent(

protected List<ManifestFileMeta> createBaseManifestFileMetas(boolean hasPartition) {
List<ManifestFileMeta> input = new ArrayList<>();
// base with 3 partition ,16 entry each parition
// base with 3 partition, 16 entry each partition
for (int j = 0; j < 3; j++) {
List<ManifestEntry> entrys = new ArrayList<>();
for (int i = 0; i < 16; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ private void buildForDividedMode() {
}

private void buildForCombinedMode() {

ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
Expand Down Expand Up @@ -224,7 +223,6 @@ private void buildForTraditionalCompaction(
String fullName,
FileStoreTable table,
boolean isStreaming) {

CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(fullName, table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
*
* @param <T> the result of scanning file :
* <ol>
* <li>Tuple2<{@link Split},String> for the table with multi buckets, such as dynamic or fixed
* bucket table.
* <li>{@link MultiTableAppendOnlyCompactionTask} for the table witch fixed single bucket
* ,such as unaware bucket table.
* <li>Tuple2<{@link Split}, String> for the table with multi buckets, such as dynamic or
* fixed bucket table.
* <li>{@link MultiTableAppendOnlyCompactionTask} for the table with fixed single bucket ,
* such as unaware bucket table.
* </ol>
*/
public abstract class MultiTableScanBase<T> implements AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ public void withPartitionPredicate(Predicate predicate) {

public void build() {
// build source from UnawareSourceFunction
DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
DataStreamSource<AppendOnlyCompactionTask> source = buildSource(false);

// from source, construct the full flink job
sinkFromSource(source);
}

public DataStream<Committable> fetchUncommitted(String commitUser) {
DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
DataStreamSource<AppendOnlyCompactionTask> source = buildSource(true);

// rebalance input to default or assigned parallelism
DataStream<AppendOnlyCompactionTask> rebalanced = rebalanceInput(source);
Expand All @@ -87,11 +87,11 @@ public DataStream<Committable> fetchUncommitted(String commitUser) {
.doWrite(rebalanced, commitUser, rebalanced.getParallelism());
}

private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
private DataStreamSource<AppendOnlyCompactionTask> buildSource(boolean emitMaxWatermark) {
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
table, isContinuous, scanInterval, partitionPredicate);
table, isContinuous, scanInterval, partitionPredicate, emitMaxWatermark);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* A dedicated operator for manual triggered compaction.
*
* <p>In-coming records are generated by sources built from {@link
* org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The records will contain
* org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The records will contain
* partition keys, bucket number, table name and database name.
*/
public class MultiTablesStoreCompactOperator
Expand Down Expand Up @@ -173,7 +173,6 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
@Override
protected List<MultiTableCommittable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException {

List<MultiTableCommittable> committables = new LinkedList<>();
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
Identifier key = entry.getKey();
Expand Down
Loading