From 4dbf7b92866537183c5dae67b2363b0fa07077c9 Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Wed, 31 Jul 2024 23:07:05 +0800 Subject: [PATCH] [core] fix bug of watermark override with append only table --- docs/content/concepts/spec/snapshot.md | 2 +- .../generated/core_configuration.html | 6 ++-- .../java/org/apache/paimon/CoreOptions.java | 7 +++-- .../java/org/apache/paimon/utils/Filter.java | 4 +-- .../org/apache/paimon/data/BinaryRowTest.java | 1 - .../apache/paimon/AppendOnlyFileStore.java | 4 +-- .../org/apache/paimon/KeyValueFileStore.java | 4 +-- .../paimon/append/AppendOnlyWriter.java | 2 +- .../apache/paimon/io/RecordLevelExpire.java | 4 +-- .../apache/paimon/manifest/ManifestEntry.java | 1 + .../paimon/manifest/ManifestFileMeta.java | 5 ---- .../operation/AbstractFileStoreScan.java | 3 ++ .../operation/AppendOnlyFileStoreScan.java | 4 +-- .../operation/KeyValueFileStoreScan.java | 8 ++--- .../table/PrimaryKeyFileStoreTable.java | 2 +- .../org/apache/paimon/utils/ObjectsFile.java | 1 + .../manifest/ManifestFileMetaTestBase.java | 4 +-- .../flink/action/CompactDatabaseAction.java | 2 -- .../flink/compact/MultiTableScanBase.java | 8 ++--- .../UnawareBucketCompactionTopoBuilder.java | 8 ++--- .../sink/MultiTablesStoreCompactOperator.java | 3 +- .../source/BucketUnawareCompactSource.java | 10 ++++++- .../CombinedTableCompactorSourceBuilder.java | 5 ++-- .../CombinedAwareBatchSourceFunction.java | 4 +-- .../CombinedAwareStreamingSourceFunction.java | 1 - .../UnawareBucketAppendOnlyTableITCase.java | 30 +++++++++++++++++++ 26 files changed, 84 insertions(+), 49 deletions(-) diff --git a/docs/content/concepts/spec/snapshot.md b/docs/content/concepts/spec/snapshot.md index 5c0f58ac44b0..f710fb35b02b 100644 --- a/docs/content/concepts/spec/snapshot.md +++ b/docs/content/concepts/spec/snapshot.md @@ -61,5 +61,5 @@ Snapshot File is JSON, it includes: 12. totalRecordCount: record count of all changes occurred in this snapshot. 13. deltaRecordCount: record count of all new changes occurred in this snapshot. 14. changelogRecordCount: record count of all changelog produced in this snapshot. -15. watermark: watermark for input records, from Flink watermark mechanism, null if there is no watermark. +15. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark. 16. statistics: stats file name for statistics of this table. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 446931acacda..f0a33b7ec4c4 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -42,7 +42,7 @@
bucket
-1 Integer - Bucket number for file store.
It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode). + Bucket number for file store.
It should either be equal to -1 (dynamic bucket mode or unaware bucket mode), or it must be greater than 0 (fixed bucket mode).
bucket-key
@@ -742,7 +742,7 @@
source.split.target-size
128 mb MemorySize - Target size of a source split when scanning a bucket. + Target size of a source split when scanning a partition (unaware bucket mode) or bucket (non unaware bucket mode).
spill-compression
@@ -856,7 +856,7 @@
write-max-writers-to-spill
5 Integer - When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. + When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory.
write-only
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 17bff3653229..67faa060da28 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -93,7 +93,7 @@ public class CoreOptions implements Serializable { .text("Bucket number for file store.") .linebreak() .text( - "It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).") + "It should either be equal to -1 (dynamic bucket mode or unaware bucket mode), or it must be greater than 0 (fixed bucket mode).") .build()); @Immutable @@ -368,7 +368,8 @@ public class CoreOptions implements Serializable { key("source.split.target-size") .memoryType() .defaultValue(MemorySize.ofMebiBytes(128)) - .withDescription("Target size of a source split when scanning a bucket."); + .withDescription( + "Target size of a source split when scanning a partition (unaware bucket mode) or bucket (non unaware bucket mode)."); public static final ConfigOption SOURCE_SPLIT_OPEN_FILE_COST = key("source.split.open-file-cost") @@ -412,7 +413,7 @@ public class CoreOptions implements Serializable { .intType() .defaultValue(5) .withDescription( - "When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. "); + "When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory."); public static final ConfigOption WRITE_MANIFEST_CACHE = key("write-manifest-cache") diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java index 2764bc77363a..92d1a5413066 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java @@ -30,10 +30,10 @@ public interface Filter { Filter ALWAYS_TRUE = t -> true; /** - * Evaluates this predicate on the given argument. + * Evaluates this filter on the given argument. * * @param t the input argument - * @return {@code true} if the input argument matches the predicate, otherwise {@code false} + * @return {@code true} if the input argument matches the filter, otherwise {@code false} */ boolean test(T t); diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java index f7f64a536c20..0212e548b2ce 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java @@ -108,7 +108,6 @@ public void testSetAndGet() throws IOException, ClassNotFoundException { @Test public void testWriter() { - int arity = 13; BinaryRow row = new BinaryRow(arity); BinaryRowWriter writer = new BinaryRowWriter(row, 20); diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 3cd7bb3b6959..edb5b552beac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -107,7 +107,7 @@ public AppendOnlyFileStoreWrite newWrite( } private AppendOnlyFileStoreScan newScan(boolean forWrite) { - ScanBucketFilter bucketFilter = + ScanBucketFilter bucketKeyFilter = new ScanBucketFilter(bucketKeyType) { @Override public void pushdown(Predicate predicate) { @@ -132,7 +132,7 @@ public void pushdown(Predicate predicate) { return new AppendOnlyFileStoreScan( partitionType, - bucketFilter, + bucketKeyFilter, snapshotManager(), schemaManager, schema, diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 26341d045c35..22a9a739bc9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -204,7 +204,7 @@ private Map format2PathFactory() { } private KeyValueFileStoreScan newScan(boolean forWrite) { - ScanBucketFilter bucketFilter = + ScanBucketFilter bucketKeyFilter = new ScanBucketFilter(bucketKeyType) { @Override public void pushdown(Predicate keyFilter) { @@ -224,7 +224,7 @@ public void pushdown(Predicate keyFilter) { }; return new KeyValueFileStoreScan( partitionType, - bucketFilter, + bucketKeyFilter, snapshotManager(), schemaManager, schema, diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 897674b3b347..b5344a7f7ec7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -83,7 +83,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final FileIndexOptions fileIndexOptions; private MemorySegmentPool memorySegmentPool; - private MemorySize maxDiskSize; + private final MemorySize maxDiskSize; public AppendOnlyWriter( FileIO fileIO, diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java index a49e31fc91bd..4a61b66a798a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java @@ -69,7 +69,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) { return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds()); } - public RecordLevelExpire(int timeField, int expireTime) { + private RecordLevelExpire(int timeField, int expireTime) { this.timeField = timeField; this.expireTime = expireTime; } @@ -78,7 +78,7 @@ public FileReaderFactory wrap(FileReaderFactory readerFactor return file -> wrap(readerFactory.createRecordReader(file)); } - public RecordReader wrap(RecordReader reader) { + private RecordReader wrap(RecordReader reader) { int currentTime = (int) (System.currentTimeMillis() / 1000); return reader.filter( kv -> { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index af1740b00b09..d629e3b0943d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -145,6 +145,7 @@ public String toString() { */ public static Filter createCacheRowFilter( @Nullable ManifestCacheFilter manifestCacheFilter, int numOfBuckets) { + // manifestCacheFilter is not null only when write if (manifestCacheFilter == null) { return Filter.alwaysTrue(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java index 32ecc23b5956..4e6a1310c20d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java @@ -246,7 +246,6 @@ public static Optional> tryFullCompaction( @Nullable Integer manifestReadParallelism) throws Exception { // 1. should trigger full compaction - List base = new ArrayList<>(); long totalManifestSize = 0; int i = 0; @@ -276,14 +275,12 @@ public static Optional> tryFullCompaction( } // 2. do full compaction - LOG.info( "Start Manifest File Full Compaction, pick the number of delete file: {}, total manifest file size: {}", deltaDeleteFileNum, totalManifestSize); // 2.1. try to skip base files by partition filter - Map deltaMerged = new LinkedHashMap<>(); FileEntry.mergeEntries(manifestFile, delta, deltaMerged, manifestReadParallelism); @@ -317,7 +314,6 @@ public static Optional> tryFullCompaction( } // 2.2. try to skip base files by reading entries - Set deleteEntries = new HashSet<>(); deltaMerged.forEach( (k, v) -> { @@ -349,7 +345,6 @@ public static Optional> tryFullCompaction( } // 2.3. merge - RollingFileWriter writer = manifestFile.createRollingWriter(); Exception exception = null; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 5e6f914fe4c6..c9e42ad23e09 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -298,6 +298,7 @@ private Pair> doPlan() { List files = new ArrayList<>(); long skippedByPartitionAndStats = startDataFiles - mergedEntries.size(); for (ManifestEntry file : mergedEntries) { + // checkNumOfBuckets is true only when write if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) { String partInfo = partitionType.getFieldCount() > 0 @@ -487,6 +488,7 @@ private List readManifestFileMeta(ManifestFileMeta manifest) { .read( manifest.fileName(), manifest.fileSize(), + // ManifestEntry#createCacheRowFilter alway return true when read ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets), ManifestEntry.createEntryRowFilter( partitionFilter, bucketFilter, fileNameFilter, numOfBuckets)); @@ -502,6 +504,7 @@ private List readSimpleEntries(ManifestFileMeta manifest) { // use filter for ManifestEntry // currently, projection is not pushed down to file format // see SimpleFileEntrySerializer + // ManifestEntry#createCacheRowFilter alway return true when read ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets), ManifestEntry.createEntryRowFilter( partitionFilter, bucketFilter, fileNameFilter, numOfBuckets)); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 056c82e82e59..2305aa3c84ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -53,7 +53,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan { public AppendOnlyFileStoreScan( RowType partitionType, - ScanBucketFilter bucketFilter, + ScanBucketFilter bucketKeyFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, TableSchema schema, @@ -65,7 +65,7 @@ public AppendOnlyFileStoreScan( boolean fileIndexReadEnabled) { super( partitionType, - bucketFilter, + bucketKeyFilter, snapshotManager, schemaManager, schema, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index ab19cc49f8da..01ae98c844b4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -52,7 +52,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan { public KeyValueFileStoreScan( RowType partitionType, - ScanBucketFilter bucketFilter, + ScanBucketFilter bucketKeyFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, TableSchema schema, @@ -66,7 +66,7 @@ public KeyValueFileStoreScan( MergeEngine mergeEngine) { super( partitionType, - bucketFilter, + bucketKeyFilter, snapshotManager, schemaManager, schema, @@ -152,8 +152,8 @@ private List filterWholeBucketPerFile(List entries } private List filterWholeBucketAllFiles(List entries) { - // entries come from the same bucket, if any of it doesn't meet the request, we could - // filter the bucket. + // entries come from the same bucket, if any of it meet the request, we could + // filter the whole bucket. for (ManifestEntry entry : entries) { if (filterByValueFilter(entry)) { return entries; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index b1e5b5366c3d..042971fc87ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -129,7 +129,7 @@ protected BiConsumer nonPartitionFilterConsumer() { // filter: value = 1 // if we perform filter push down on values, data file 1 will be chosen, but data // file 2 will be ignored, and the final result will be key = a, value = 1 while the - // correct result is an empty set + // correct result is an empty set. List keyFilters = pickTransformFieldMapping( splitAnd(predicate), diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java index a457a868aa0b..91d1e86ed79c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java @@ -123,6 +123,7 @@ private List readWithIOException( Filter loadFilter, Filter readFilter) throws IOException { + // cache is alway null when read if (cache != null) { return cache.read(fileName, fileSize, loadFilter, readFilter); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index f4b3c69ba8b1..fdb3cb7ff462 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -133,7 +133,7 @@ protected ManifestFile createManifestFile(String pathStr) { path, getPartitionType(), "default", - CoreOptions.FILE_FORMAT.defaultValue().toString()), + CoreOptions.FILE_FORMAT.defaultValue()), Long.MAX_VALUE, null) .create(); @@ -166,7 +166,7 @@ protected void assertSameContent( protected List createBaseManifestFileMetas(boolean hasPartition) { List input = new ArrayList<>(); - // base with 3 partition ,16 entry each parition + // base with 3 partition, 16 entry each partition for (int j = 0; j < 3; j++) { List entrys = new ArrayList<>(); for (int i = 0; i < 16; i++) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 051b2baab410..68a26b97cbe5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -185,7 +185,6 @@ private void buildForDividedMode() { } private void buildForCombinedMode() { - ReadableConfig conf = env.getConfiguration(); boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; @@ -224,7 +223,6 @@ private void buildForTraditionalCompaction( String fullName, FileStoreTable table, boolean isStreaming) { - CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(fullName, table); CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java index 52995efacf92..32a24bebd164 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java @@ -42,10 +42,10 @@ * * @param the result of scanning file : *
    - *
  1. Tuple2<{@link Split},String> for the table with multi buckets, such as dynamic or fixed - * bucket table. - *
  2. {@link MultiTableAppendOnlyCompactionTask} for the table witch fixed single bucket - * ,such as unaware bucket table. + *
  3. Tuple2<{@link Split}, String> for the table with multi buckets, such as dynamic or + * fixed bucket table. + *
  4. {@link MultiTableAppendOnlyCompactionTask} for the table with fixed single bucket , + * such as unaware bucket table. *
*/ public abstract class MultiTableScanBase implements AutoCloseable { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java index d10c5c11cb6a..ee44a72d4114 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java @@ -71,14 +71,14 @@ public void withPartitionPredicate(Predicate predicate) { public void build() { // build source from UnawareSourceFunction - DataStreamSource source = buildSource(); + DataStreamSource source = buildSource(false); // from source, construct the full flink job sinkFromSource(source); } public DataStream fetchUncommitted(String commitUser) { - DataStreamSource source = buildSource(); + DataStreamSource source = buildSource(true); // rebalance input to default or assigned parallelism DataStream rebalanced = rebalanceInput(source); @@ -87,11 +87,11 @@ public DataStream fetchUncommitted(String commitUser) { .doWrite(rebalanced, commitUser, rebalanced.getParallelism()); } - private DataStreamSource buildSource() { + private DataStreamSource buildSource(boolean emitMaxWatermark) { long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis(); BucketUnawareCompactSource source = new BucketUnawareCompactSource( - table, isContinuous, scanInterval, partitionPredicate); + table, isContinuous, scanInterval, partitionPredicate, emitMaxWatermark); return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index f253a3bf8e79..f12f7cbd88f8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -51,7 +51,7 @@ * A dedicated operator for manual triggered compaction. * *

In-coming records are generated by sources built from {@link - * org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The records will contain + * org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The records will contain * partition keys, bucket number, table name and database name. */ public class MultiTablesStoreCompactOperator @@ -173,7 +173,6 @@ public void processElement(StreamRecord element) throws Exception { @Override protected List prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { - List committables = new LinkedList<>(); for (Map.Entry entry : writes.entrySet()) { Identifier key = entry.getKey(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java index 7926fa60a566..936b2cd2c70c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,16 +61,19 @@ public class BucketUnawareCompactSource extends RichSourceFunction ctx; private volatile boolean isRunning = true; + private final boolean emitMaxWatermark; public BucketUnawareCompactSource( FileStoreTable table, boolean isStreaming, long scanInterval, - @Nullable Predicate filter) { + @Nullable Predicate filter, + boolean emitMaxWatermark) { this.table = table; this.streaming = isStreaming; this.scanInterval = scanInterval; this.filter = filter; + this.emitMaxWatermark = emitMaxWatermark; } @Override @@ -94,6 +98,10 @@ public void run(SourceContext sourceContext) throws Ex List tasks = compactionCoordinator.run(); isEmpty = tasks.isEmpty(); tasks.forEach(ctx::collect); + + if (emitMaxWatermark) { + ctx.emitWatermark(Watermark.MAX_WATERMARK); + } } catch (EndOfScanException esf) { LOG.info("Catching EndOfStreamException, the stream is finished."); return; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java index 6ee9d849af89..e5cb7d971f76 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java @@ -37,10 +37,11 @@ import java.util.regex.Pattern; /** - * source builder to build a Flink compactor source for multi-tables. This is for dedicated - * compactor jobs in combined mode. + * Source builder to build a Flink compactor source for multi-tables. This is for dedicated + * compactor jobs with combined mode. */ public class CombinedTableCompactorSourceBuilder { + private final Catalog.Loader catalogLoader; private final Pattern includingPattern; private final Pattern excludingPattern; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java index 22ef330291ff..e8a91b5415ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java @@ -84,9 +84,9 @@ void scanTable() throws Exception { if (scanResult == IS_EMPTY) { // Currently, in the combined mode, there are two scan tasks for the table of two // different bucket type (multi bucket & unaware bucket) running concurrently. - // There will be a situation that there is only one task compaction , therefore this + // There will be a situation that there is only one task compaction, therefore this // should not be thrown exception here. - LOGGER.info("No file were collected for the table of aware-bucket"); + LOGGER.info("No file were collected for the table of aware-bucket."); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java index bff690ea30c2..8d4107a0da8e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java @@ -95,7 +95,6 @@ public static DataStream buildSource( Pattern excludingPattern, Pattern databasePattern, long monitorInterval) { - CombinedAwareStreamingSourceFunction function = new CombinedAwareStreamingSourceFunction( catalogLoader, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java index 85bee4bb55a4..55bd89b0189c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.File; @@ -208,6 +209,35 @@ public void testCompactionInStreamingMode() throws Exception { assertThat(rows.size()).isEqualTo(10); } + @Test + public void testCompactionInStreamingModeWithMaxWatermark() throws Exception { + batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')"); + batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')"); + + sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500)); + sEnv.executeSql( + "CREATE TEMPORARY TABLE Orders_in (\n" + + " f0 INT,\n" + + " f1 STRING,\n" + + " ts TIMESTAMP(3),\n" + + "WATERMARK FOR ts AS ts - INTERVAL '0' SECOND" + + ") WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '1',\n" + + " 'number-of-rows' = '10'\n" + + ")"); + + assertStreamingHasCompact("INSERT INTO append_table SELECT f0, f1 FROM Orders_in", 60000); + // ensure data gen finished + Thread.sleep(5000); + + Snapshot snapshot = findLatestSnapshot("append_table"); + Assertions.assertNotNull(snapshot); + Long watermark = snapshot.watermark(); + Assertions.assertNotNull(watermark); + Assertions.assertTrue(watermark > Long.MIN_VALUE); + } + @Test public void testRejectDelete() { testRejectChanges(RowKind.DELETE);