From f0d5cfe5d8be94ebe64ac7b5a47d3473f542c54f Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Sat, 10 Aug 2024 23:17:58 +0800 Subject: [PATCH] [core] support check partition expire when batch mode or bounded stream --- .../generated/core_configuration.html | 18 +- .../java/org/apache/paimon/CoreOptions.java | 11 ++ .../org/apache/paimon/AbstractFileStore.java | 3 +- .../paimon/operation/PartitionExpire.java | 21 ++- .../paimon/table/AbstractFileStoreTable.java | 4 +- .../FlinkEndInputPartitionExpireITCase.java | 165 ++++++++++++++++++ .../flink/FlinkEndInputWatermarkITCase.java | 8 +- 7 files changed, 210 insertions(+), 20 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index b0497b3450da..ee36a01de8f6 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -242,6 +242,12 @@ Boolean Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys. + +
end-input.check-partition-expire
+ false + Boolean + Optional endInput check partition expire used in case of batch mode or bounded stream. +
fields.default-aggregate-function
(none) @@ -302,6 +308,12 @@ Map Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used. + +
force-lookup
+ false + Boolean + Whether to force the use of lookup for compaction. +
full-compaction.delta-commits
(none) @@ -345,12 +357,6 @@ Integer The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading. - -
force-lookup
- false - Boolean - Whether to force the use of lookup for compaction. -
lookup-wait
true diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index aa4bcb6858ea..b9cab19b5d88 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -962,6 +962,13 @@ public class CoreOptions implements Serializable { "Read incremental changes between start timestamp (exclusive) and end timestamp, " + "for example, 't1,t2' means changes between timestamp t1 and timestamp t2."); + public static final ConfigOption END_INPUT_CHECK_PARTITION_EXPIRE = + key("end-input.check-partition-expire") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional endInput check partition expire used in case of batch mode or bounded stream."); + public static final String STATS_MODE_SUFFIX = "stats-mode"; public static final ConfigOption METADATA_STATS_MODE = @@ -1533,6 +1540,10 @@ public int deleteFileThreadNum() { .orElseGet(() -> Runtime.getRuntime().availableProcessors()); } + public boolean endInputCheckPartitionExpire() { + return options.get(END_INPUT_CHECK_PARTITION_EXPIRE); + } + public ExpireConfig expireConfig() { return ExpireConfig.builder() .snapshotRetainMax(snapshotNumRetainMax()) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 5120db295b69..21fb87562d2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -288,7 +288,8 @@ public PartitionExpire newPartitionExpire(String commitUser) { PartitionExpireStrategy.createPartitionExpireStrategy(options, partitionType()), newScan(), newCommit(commitUser), - metastoreClient); + metastoreClient, + options.endInputCheckPartitionExpire()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index b769fc8990dc..2f5ca780c82d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -45,10 +45,9 @@ public class PartitionExpire { private final FileStoreScan scan; private final FileStoreCommit commit; private final MetastoreClient metastoreClient; - private LocalDateTime lastCheck; - private final PartitionExpireStrategy strategy; + private final boolean endInputCheckPartitionExpire; public PartitionExpire( Duration expirationTime, @@ -56,7 +55,8 @@ public PartitionExpire( PartitionExpireStrategy strategy, FileStoreScan scan, FileStoreCommit commit, - @Nullable MetastoreClient metastoreClient) { + @Nullable MetastoreClient metastoreClient, + boolean endInputCheckPartitionExpire) { this.expirationTime = expirationTime; this.checkInterval = checkInterval; this.strategy = strategy; @@ -64,6 +64,17 @@ public PartitionExpire( this.commit = commit; this.metastoreClient = metastoreClient; this.lastCheck = LocalDateTime.now(); + this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; + } + + public PartitionExpire( + Duration expirationTime, + Duration checkInterval, + PartitionExpireStrategy strategy, + FileStoreScan scan, + FileStoreCommit commit, + @Nullable MetastoreClient metastoreClient) { + this(expirationTime, checkInterval, strategy, scan, commit, metastoreClient, false); } public PartitionExpire withLock(Lock lock) { @@ -82,7 +93,9 @@ void setLastCheck(LocalDateTime time) { @VisibleForTesting List> expire(LocalDateTime now, long commitIdentifier) { - if (checkInterval.isZero() || now.isAfter(lastCheck.plus(checkInterval))) { + if (checkInterval.isZero() + || now.isAfter(lastCheck.plus(checkInterval)) + || (endInputCheckPartitionExpire && Long.MAX_VALUE == commitIdentifier)) { List> expired = doExpire(now.minus(expirationTime), commitIdentifier); lastCheck = now; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index d30bd11efda6..f109c76fb70f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -386,9 +386,9 @@ public TableCommitImpl newCommit(String commitUser) { catalogEnvironment.lockFactory().create(), CoreOptions.fromMap(options()).consumerExpireTime(), new ConsumerManager(fileIO, path, snapshotManager().branch()), - coreOptions().snapshotExpireExecutionMode(), + options.snapshotExpireExecutionMode(), name(), - coreOptions().forceCreatingSnapshot()); + options.forceCreatingSnapshot()); } private List createCommitCallbacks(String commitUser) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java new file mode 100644 index 000000000000..63eebcd7d1f2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.paimon.CoreOptions.BUCKET; +import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.apache.paimon.CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE; +import static org.apache.paimon.CoreOptions.FILE_FORMAT; +import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL; +import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME; +import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER; +import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; +import static org.apache.paimon.utils.FailingFileIO.retryArtificialException; + +/** ITCase for partition expire when end input. */ +@ExtendWith(ParameterizedTestExtension.class) +public class FlinkEndInputPartitionExpireITCase extends CatalogITCaseBase { + + private static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new RowType.RowField("v", new IntType()), + new RowType.RowField("p", new VarCharType(10)), + // rename key + new RowType.RowField("_k", new IntType()))); + + private static final List SOURCE_DATA = + Arrays.asList( + wrap(GenericRowData.of(0, StringData.fromString("20240101"), 1)), + wrap(GenericRowData.of(0, StringData.fromString("20240101"), 2)), + wrap(GenericRowData.of(0, StringData.fromString("20240103"), 1)), + wrap(GenericRowData.of(5, StringData.fromString("20240103"), 1)), + wrap(GenericRowData.of(6, StringData.fromString("20240105"), 1))); + + private static SerializableRowData wrap(RowData row) { + return new SerializableRowData(row, InternalSerializers.create(TABLE_TYPE)); + } + + private final StreamExecutionEnvironment env; + + public FlinkEndInputPartitionExpireITCase() { + this.env = streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build(); + } + + @Parameters(name = "isBatch-{0}") + public static List getVarSeg() { + return Arrays.asList(true, false); + } + + @TestTemplate + public void testEndInputPartitionExpire() throws Exception { + FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2}); + + // write + DataStreamSource source = + env.fromCollection(SOURCE_DATA, InternalTypeInfo.of(TABLE_TYPE)); + DataStream input = + source.map( + (MapFunction) + r -> + Row.of( + r.getInt(0), + r.getString(1).toString(), + r.getInt(2))) + .setParallelism(source.getParallelism()); + DataType inputType = + DataTypes.ROW( + DataTypes.FIELD("v", DataTypes.INT()), + DataTypes.FIELD("p", DataTypes.STRING()), + DataTypes.FIELD("_k", DataTypes.INT())); + new FlinkSinkBuilder(table).forRow(input, inputType).build(); + env.execute(); + + Assertions.assertEquals(2, table.snapshotManager().snapshotCount()); + Assertions.assertEquals( + Snapshot.CommitKind.OVERWRITE, table.snapshotManager().snapshot(2).commitKind()); + } + + private FileStoreTable buildFileStoreTable(int[] partitions, int[] primaryKey) + throws Exception { + Options options = new Options(); + options.set(BUCKET, 3); + options.set(PATH, getTempDirPath()); + options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO); + options.set(PARTITION_EXPIRATION_TIME, Duration.ofDays(2)); + options.set(PARTITION_EXPIRATION_CHECK_INTERVAL, Duration.ofHours(1)); + options.set(PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd"); + options.set(END_INPUT_CHECK_PARTITION_EXPIRE, true); + + Path tablePath = new CoreOptions(options.toMap()).path(); + if (primaryKey.length == 0) { + options.set(BUCKET_KEY, "_k"); + } + Schema schema = + new Schema( + toDataType(TABLE_TYPE).getFields(), + Arrays.stream(partitions) + .mapToObj(i -> TABLE_TYPE.getFieldNames().get(i)) + .collect(Collectors.toList()), + Arrays.stream(primaryKey) + .mapToObj(i -> TABLE_TYPE.getFieldNames().get(i)) + .collect(Collectors.toList()), + options.toMap(), + ""); + return retryArtificialException( + () -> { + new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema); + return FileStoreTableFactory.create(LocalFileIO.create(), options); + }); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java index 19fd139d82e6..c85b570dd53a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java @@ -19,10 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.CoreOptions; -import org.apache.paimon.flink.sink.FixedBucketSink; import org.apache.paimon.flink.sink.FlinkSinkBuilder; -import org.apache.paimon.flink.source.ContinuousFileStoreSource; -import org.apache.paimon.flink.source.StaticFileStoreSource; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; @@ -64,10 +61,7 @@ import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; import static org.apache.paimon.utils.FailingFileIO.retryArtificialException; -/** - * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} and {@link - * FixedBucketSink}. - */ +/** ITCase for user define watermark when end input. */ @ExtendWith(ParameterizedTestExtension.class) public class FlinkEndInputWatermarkITCase extends CatalogITCaseBase {