Skip to content

Commit

Permalink
[core] support check partition expire when batch mode or bounded stream
Browse files Browse the repository at this point in the history
  • Loading branch information
hongli.wwj committed Aug 10, 2024
1 parent 265937e commit f0d5cfe
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 20 deletions.
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.</td>
</tr>
<tr>
<td><h5>end-input.check-partition-expire</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Optional endInput check partition expire used in case of batch mode or bounded stream.</td>
</tr>
<tr>
<td><h5>fields.default-aggregate-function</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -302,6 +308,12 @@
<td>Map</td>
<td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
</tr>
<tr>
<td><h5>force-lookup</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to force the use of lookup for compaction.</td>
</tr>
<tr>
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -345,12 +357,6 @@
<td>Integer</td>
<td>The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.</td>
</tr>
<tr>
<td><h5>force-lookup</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to force the use of lookup for compaction.</td>
</tr>
<tr>
<td><h5>lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,13 @@ public class CoreOptions implements Serializable {
"Read incremental changes between start timestamp (exclusive) and end timestamp, "
+ "for example, 't1,t2' means changes between timestamp t1 and timestamp t2.");

public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE =
key("end-input.check-partition-expire")
.booleanType()
.defaultValue(false)
.withDescription(
"Optional endInput check partition expire used in case of batch mode or bounded stream.");

public static final String STATS_MODE_SUFFIX = "stats-mode";

public static final ConfigOption<String> METADATA_STATS_MODE =
Expand Down Expand Up @@ -1533,6 +1540,10 @@ public int deleteFileThreadNum() {
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
}

public boolean endInputCheckPartitionExpire() {
return options.get(END_INPUT_CHECK_PARTITION_EXPIRE);
}

public ExpireConfig expireConfig() {
return ExpireConfig.builder()
.snapshotRetainMax(snapshotNumRetainMax())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ public PartitionExpire newPartitionExpire(String commitUser) {
PartitionExpireStrategy.createPartitionExpireStrategy(options, partitionType()),
newScan(),
newCommit(commitUser),
metastoreClient);
metastoreClient,
options.endInputCheckPartitionExpire());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,36 @@ public class PartitionExpire {
private final FileStoreScan scan;
private final FileStoreCommit commit;
private final MetastoreClient metastoreClient;

private LocalDateTime lastCheck;

private final PartitionExpireStrategy strategy;
private final boolean endInputCheckPartitionExpire;

public PartitionExpire(
Duration expirationTime,
Duration checkInterval,
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient) {
@Nullable MetastoreClient metastoreClient,
boolean endInputCheckPartitionExpire) {
this.expirationTime = expirationTime;
this.checkInterval = checkInterval;
this.strategy = strategy;
this.scan = scan;
this.commit = commit;
this.metastoreClient = metastoreClient;
this.lastCheck = LocalDateTime.now();
this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
}

public PartitionExpire(
Duration expirationTime,
Duration checkInterval,
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
@Nullable MetastoreClient metastoreClient) {
this(expirationTime, checkInterval, strategy, scan, commit, metastoreClient, false);
}

public PartitionExpire withLock(Lock lock) {
Expand All @@ -82,7 +93,9 @@ void setLastCheck(LocalDateTime time) {

@VisibleForTesting
List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) {
if (checkInterval.isZero() || now.isAfter(lastCheck.plus(checkInterval))) {
if (checkInterval.isZero()
|| now.isAfter(lastCheck.plus(checkInterval))
|| (endInputCheckPartitionExpire && Long.MAX_VALUE == commitIdentifier)) {
List<Map<String, String>> expired =
doExpire(now.minus(expirationTime), commitIdentifier);
lastCheck = now;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ public TableCommitImpl newCommit(String commitUser) {
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
coreOptions().snapshotExpireExecutionMode(),
options.snapshotExpireExecutionMode(),
name(),
coreOptions().forceCreatingSnapshot());
options.forceCreatingSnapshot());
}

private List<CommitCallback> createCommitCallbacks(String commitUser) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;

/** ITCase for partition expire when end input. */
@ExtendWith(ParameterizedTestExtension.class)
public class FlinkEndInputPartitionExpireITCase extends CatalogITCaseBase {

private static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new RowType.RowField("v", new IntType()),
new RowType.RowField("p", new VarCharType(10)),
// rename key
new RowType.RowField("_k", new IntType())));

private static final List<RowData> SOURCE_DATA =
Arrays.asList(
wrap(GenericRowData.of(0, StringData.fromString("20240101"), 1)),
wrap(GenericRowData.of(0, StringData.fromString("20240101"), 2)),
wrap(GenericRowData.of(0, StringData.fromString("20240103"), 1)),
wrap(GenericRowData.of(5, StringData.fromString("20240103"), 1)),
wrap(GenericRowData.of(6, StringData.fromString("20240105"), 1)));

private static SerializableRowData wrap(RowData row) {
return new SerializableRowData(row, InternalSerializers.create(TABLE_TYPE));
}

private final StreamExecutionEnvironment env;

public FlinkEndInputPartitionExpireITCase() {
this.env = streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
}

@Parameters(name = "isBatch-{0}")
public static List<Boolean> getVarSeg() {
return Arrays.asList(true, false);
}

@TestTemplate
public void testEndInputPartitionExpire() throws Exception {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] {1, 2});

// write
DataStreamSource<RowData> source =
env.fromCollection(SOURCE_DATA, InternalTypeInfo.of(TABLE_TYPE));
DataStream<Row> input =
source.map(
(MapFunction<RowData, Row>)
r ->
Row.of(
r.getInt(0),
r.getString(1).toString(),
r.getInt(2)))
.setParallelism(source.getParallelism());
DataType inputType =
DataTypes.ROW(
DataTypes.FIELD("v", DataTypes.INT()),
DataTypes.FIELD("p", DataTypes.STRING()),
DataTypes.FIELD("_k", DataTypes.INT()));
new FlinkSinkBuilder(table).forRow(input, inputType).build();
env.execute();

Assertions.assertEquals(2, table.snapshotManager().snapshotCount());
Assertions.assertEquals(
Snapshot.CommitKind.OVERWRITE, table.snapshotManager().snapshot(2).commitKind());
}

private FileStoreTable buildFileStoreTable(int[] partitions, int[] primaryKey)
throws Exception {
Options options = new Options();
options.set(BUCKET, 3);
options.set(PATH, getTempDirPath());
options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
options.set(PARTITION_EXPIRATION_TIME, Duration.ofDays(2));
options.set(PARTITION_EXPIRATION_CHECK_INTERVAL, Duration.ofHours(1));
options.set(PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd");
options.set(END_INPUT_CHECK_PARTITION_EXPIRE, true);

Path tablePath = new CoreOptions(options.toMap()).path();
if (primaryKey.length == 0) {
options.set(BUCKET_KEY, "_k");
}
Schema schema =
new Schema(
toDataType(TABLE_TYPE).getFields(),
Arrays.stream(partitions)
.mapToObj(i -> TABLE_TYPE.getFieldNames().get(i))
.collect(Collectors.toList()),
Arrays.stream(primaryKey)
.mapToObj(i -> TABLE_TYPE.getFieldNames().get(i))
.collect(Collectors.toList()),
options.toMap(),
"");
return retryArtificialException(
() -> {
new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema);
return FileStoreTableFactory.create(LocalFileIO.create(), options);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
package org.apache.paimon.flink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
import org.apache.paimon.flink.source.StaticFileStoreSource;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -64,10 +61,7 @@
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;

/**
* ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} and {@link
* FixedBucketSink}.
*/
/** ITCase for user define watermark when end input. */
@ExtendWith(ParameterizedTestExtension.class)
public class FlinkEndInputWatermarkITCase extends CatalogITCaseBase {

Expand Down

0 comments on commit f0d5cfe

Please sign in to comment.