Skip to content

Commit

Permalink
[core] fix bug of watermark override with append only table
Browse files Browse the repository at this point in the history
  • Loading branch information
hongli.wwj committed Aug 1, 2024
1 parent 5bd3c6f commit 425a360
Show file tree
Hide file tree
Showing 26 changed files with 84 additions and 49 deletions.
2 changes: 1 addition & 1 deletion docs/content/concepts/spec/snapshot.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ Snapshot File is JSON, it includes:
12. totalRecordCount: record count of all changes occurred in this snapshot.
13. deltaRecordCount: record count of all new changes occurred in this snapshot.
14. changelogRecordCount: record count of all changelog produced in this snapshot.
15. watermark: watermark for input records, from Flink watermark mechanism, null if there is no watermark.
15. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark.
16. statistics: stats file name for statistics of this table.
6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Bucket number for file store.<br />It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).</td>
<td>Bucket number for file store.<br />It should either be equal to -1 (dynamic bucket mode or unaware bucket mode), or it must be greater than 0 (fixed bucket mode).</td>
</tr>
<tr>
<td><h5>bucket-key</h5></td>
Expand Down Expand Up @@ -742,7 +742,7 @@
<td><h5>source.split.target-size</h5></td>
<td style="word-wrap: break-word;">128 mb</td>
<td>MemorySize</td>
<td>Target size of a source split when scanning a bucket.</td>
<td>Target size of a source split when scanning a partition (unaware bucket mode) or bucket (non unaware bucket mode).</td>
</tr>
<tr>
<td><h5>spill-compression</h5></td>
Expand Down Expand Up @@ -856,7 +856,7 @@
<td><h5>write-max-writers-to-spill</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. </td>
<td>When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory.</td>
</tr>
<tr>
<td><h5>write-only</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class CoreOptions implements Serializable {
.text("Bucket number for file store.")
.linebreak()
.text(
"It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).")
"It should either be equal to -1 (dynamic bucket mode or unaware bucket mode), or it must be greater than 0 (fixed bucket mode).")
.build());

@Immutable
Expand Down Expand Up @@ -368,7 +368,8 @@ public class CoreOptions implements Serializable {
key("source.split.target-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription("Target size of a source split when scanning a bucket.");
.withDescription(
"Target size of a source split when scanning a partition (unaware bucket mode) or bucket (non unaware bucket mode).");

public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
key("source.split.open-file-cost")
Expand Down Expand Up @@ -412,7 +413,7 @@ public class CoreOptions implements Serializable {
.intType()
.defaultValue(5)
.withDescription(
"When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. ");
"When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory.");

public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
key("write-manifest-cache")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public interface Filter<T> {
Filter<?> ALWAYS_TRUE = t -> true;

/**
* Evaluates this predicate on the given argument.
* Evaluates this filter on the given argument.
*
* @param t the input argument
* @return {@code true} if the input argument matches the predicate, otherwise {@code false}
* @return {@code true} if the input argument matches the filter, otherwise {@code false}
*/
boolean test(T t);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void testSetAndGet() throws IOException, ClassNotFoundException {

@Test
public void testWriter() {

int arity = 13;
BinaryRow row = new BinaryRow(arity);
BinaryRowWriter writer = new BinaryRowWriter(row, 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public AppendOnlyFileStoreWrite newWrite(
}

private AppendOnlyFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
ScanBucketFilter bucketKeyFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
public void pushdown(Predicate predicate) {
Expand All @@ -132,7 +132,7 @@ public void pushdown(Predicate predicate) {

return new AppendOnlyFileStoreScan(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager(),
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
}

private KeyValueFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
ScanBucketFilter bucketKeyFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
public void pushdown(Predicate keyFilter) {
Expand All @@ -224,7 +224,7 @@ public void pushdown(Predicate keyFilter) {
};
return new KeyValueFileStoreScan(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager(),
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final FileIndexOptions fileIndexOptions;

private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;
private final MemorySize maxDiskSize;

public AppendOnlyWriter(
FileIO fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds());
}

public RecordLevelExpire(int timeField, int expireTime) {
private RecordLevelExpire(int timeField, int expireTime) {
this.timeField = timeField;
this.expireTime = expireTime;
}
Expand All @@ -78,7 +78,7 @@ public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactor
return file -> wrap(readerFactory.createRecordReader(file));
}

public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(
kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public String toString() {
*/
public static Filter<InternalRow> createCacheRowFilter(
@Nullable ManifestCacheFilter manifestCacheFilter, int numOfBuckets) {
// manifestCacheFilter is not null only when write
if (manifestCacheFilter == null) {
return Filter.alwaysTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
@Nullable Integer manifestReadParallelism)
throws Exception {
// 1. should trigger full compaction

List<ManifestFileMeta> base = new ArrayList<>();
long totalManifestSize = 0;
int i = 0;
Expand Down Expand Up @@ -276,14 +275,12 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2. do full compaction

LOG.info(
"Start Manifest File Full Compaction, pick the number of delete file: {}, total manifest file size: {}",
deltaDeleteFileNum,
totalManifestSize);

// 2.1. try to skip base files by partition filter

Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
FileEntry.mergeEntries(manifestFile, delta, deltaMerged, manifestReadParallelism);

Expand Down Expand Up @@ -317,7 +314,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2.2. try to skip base files by reading entries

Set<Identifier> deleteEntries = new HashSet<>();
deltaMerged.forEach(
(k, v) -> {
Expand Down Expand Up @@ -349,7 +345,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2.3. merge

RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
manifestFile.createRollingWriter();
Exception exception = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan() {
List<ManifestEntry> files = new ArrayList<>();
long skippedByPartitionAndStats = startDataFiles - mergedEntries.size();
for (ManifestEntry file : mergedEntries) {
// checkNumOfBuckets is true only when write
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
Expand Down Expand Up @@ -487,6 +488,7 @@ private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
.read(
manifest.fileName(),
manifest.fileSize(),
// ManifestEntry#createCacheRowFilter alway return true when read
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
partitionFilter, bucketFilter, fileNameFilter, numOfBuckets));
Expand All @@ -502,6 +504,7 @@ private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) {
// use filter for ManifestEntry
// currently, projection is not pushed down to file format
// see SimpleFileEntrySerializer
// ManifestEntry#createCacheRowFilter alway return true when read
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
partitionFilter, bucketFilter, fileNameFilter, numOfBuckets));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {

public AppendOnlyFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketFilter,
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
Expand All @@ -65,7 +65,7 @@ public AppendOnlyFileStoreScan(
boolean fileIndexReadEnabled) {
super(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager,
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {

public KeyValueFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketFilter,
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
Expand All @@ -66,7 +66,7 @@ public KeyValueFileStoreScan(
MergeEngine mergeEngine) {
super(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager,
schemaManager,
schema,
Expand Down Expand Up @@ -152,8 +152,8 @@ private List<ManifestEntry> filterWholeBucketPerFile(List<ManifestEntry> entries
}

private List<ManifestEntry> filterWholeBucketAllFiles(List<ManifestEntry> entries) {
// entries come from the same bucket, if any of it doesn't meet the request, we could
// filter the bucket.
// entries come from the same bucket, if all of them doesn't meet the request, we could
// filter the whole bucket.
for (ManifestEntry entry : entries) {
if (filterByValueFilter(entry)) {
return entries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
// filter: value = 1
// if we perform filter push down on values, data file 1 will be chosen, but data
// file 2 will be ignored, and the final result will be key = a, value = 1 while the
// correct result is an empty set
// correct result is an empty set.
List<Predicate> keyFilters =
pickTransformFieldMapping(
splitAnd(predicate),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private List<T> readWithIOException(
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
throws IOException {
// cache is alway null when read
if (cache != null) {
return cache.read(fileName, fileSize, loadFilter, readFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected ManifestFile createManifestFile(String pathStr) {
path,
getPartitionType(),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString()),
CoreOptions.FILE_FORMAT.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down Expand Up @@ -166,7 +166,7 @@ protected void assertSameContent(

protected List<ManifestFileMeta> createBaseManifestFileMetas(boolean hasPartition) {
List<ManifestFileMeta> input = new ArrayList<>();
// base with 3 partition ,16 entry each parition
// base with 3 partition, 16 entry each partition
for (int j = 0; j < 3; j++) {
List<ManifestEntry> entrys = new ArrayList<>();
for (int i = 0; i < 16; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ private void buildForDividedMode() {
}

private void buildForCombinedMode() {

ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
Expand Down Expand Up @@ -224,7 +223,6 @@ private void buildForTraditionalCompaction(
String fullName,
FileStoreTable table,
boolean isStreaming) {

CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(fullName, table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
*
* @param <T> the result of scanning file :
* <ol>
* <li>Tuple2<{@link Split},String> for the table with multi buckets, such as dynamic or fixed
* bucket table.
* <li>{@link MultiTableAppendOnlyCompactionTask} for the table witch fixed single bucket
* ,such as unaware bucket table.
* <li>Tuple2<{@link Split}, String> for the table with multi buckets, such as dynamic or
* fixed bucket table.
* <li>{@link MultiTableAppendOnlyCompactionTask} for the table with fixed single bucket ,
* such as unaware bucket table.
* </ol>
*/
public abstract class MultiTableScanBase<T> implements AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ public void withPartitionPredicate(Predicate predicate) {

public void build() {
// build source from UnawareSourceFunction
DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
DataStreamSource<AppendOnlyCompactionTask> source = buildSource(false);

// from source, construct the full flink job
sinkFromSource(source);
}

public DataStream<Committable> fetchUncommitted(String commitUser) {
DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
DataStreamSource<AppendOnlyCompactionTask> source = buildSource(true);

// rebalance input to default or assigned parallelism
DataStream<AppendOnlyCompactionTask> rebalanced = rebalanceInput(source);
Expand All @@ -87,11 +87,11 @@ public DataStream<Committable> fetchUncommitted(String commitUser) {
.doWrite(rebalanced, commitUser, rebalanced.getParallelism());
}

private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
private DataStreamSource<AppendOnlyCompactionTask> buildSource(boolean emitMaxWatermark) {
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
table, isContinuous, scanInterval, partitionPredicate);
table, isContinuous, scanInterval, partitionPredicate, emitMaxWatermark);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* A dedicated operator for manual triggered compaction.
*
* <p>In-coming records are generated by sources built from {@link
* org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The records will contain
* org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The records will contain
* partition keys, bucket number, table name and database name.
*/
public class MultiTablesStoreCompactOperator
Expand Down Expand Up @@ -173,7 +173,6 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
@Override
protected List<MultiTableCommittable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException {

List<MultiTableCommittable> committables = new LinkedList<>();
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
Identifier key = entry.getKey();
Expand Down
Loading

0 comments on commit 425a360

Please sign in to comment.