diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index 76afb7747a20..865e68b5e5b2 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -42,6 +42,7 @@ Paimon supports automatic creation of tags in writing job. You can set `'tag.automatic-creation'` to `process-time` or `watermark`: - `process-time`: Create TAG based on the time of the machine. - `watermark`: Create TAG based on the watermark of the Sink input. +- `batch`: In a batch processing scenario, a tag is generated after the current task is completed. {{< hint info >}} If you choose Watermark, you may need to specify the time zone of watermark, if watermark is not in the diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 4274ea8d2b9e..6b7d688379f7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1960,8 +1960,10 @@ public enum TagCreationMode implements DescribedEnum { "Based on the time of the machine, create TAG once the processing time passes period time plus delay."), WATERMARK( "watermark", - "Based on the watermark of the input, create TAG once the watermark passes period time plus delay."); - + "Based on the watermark of the input, create TAG once the watermark passes period time plus delay."), + BATCH( + "batch", + "In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed."); private final String value; private final String description; diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java index 671e8841dd57..cd1a3fda1b06 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java @@ -47,7 +47,8 @@ private TagPreview(CoreOptions options) { } public static TagPreview create(CoreOptions options) { - if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE) { + if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE + && options.tagToPartitionPreview() != CoreOptions.TagCreationMode.BATCH) { return new TagPreview(options); } return null; diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java index dadfde3a752d..e017b215121c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java @@ -79,6 +79,7 @@ static TagTimeExtractor createForTagPreview(CoreOptions options) { static TagTimeExtractor create(CoreOptions.TagCreationMode mode, CoreOptions options) { switch (mode) { case NONE: + case BATCH: return null; case PROCESS_TIME: return new ProcessTimeExtractor(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java new file mode 100644 index 000000000000..dff75ff29b33 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.SetupableStreamOperator; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.SortedMap; + +/** + * Commit {@link Committable} for snapshot using the {@link CommitterOperator}. When the task is + * completed, the corresponding tag is generated. + */ +public class BatchWriteGeneratorTagOperator + implements OneInputStreamOperator, + SetupableStreamOperator, + BoundedOneInput { + + private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-"; + + private static final long serialVersionUID = 1L; + + private final CommitterOperator commitOperator; + + protected final FileStoreTable table; + + public BatchWriteGeneratorTagOperator( + CommitterOperator commitOperator, FileStoreTable table) { + this.table = table; + this.commitOperator = commitOperator; + } + + @Override + public void initializeState(StreamTaskStateInitializer streamTaskStateManager) + throws Exception { + commitOperator.initializeState(streamTaskStateManager); + } + + @Override + public OperatorSnapshotFutures snapshotState( + long checkpointId, + long timestamp, + CheckpointOptions checkpointOptions, + CheckpointStreamFactory storageLocation) + throws Exception { + return commitOperator.snapshotState( + checkpointId, timestamp, checkpointOptions, storageLocation); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + commitOperator.notifyCheckpointComplete(checkpointId); + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + commitOperator.notifyCheckpointAborted(checkpointId); + } + + private void createTag() { + SnapshotManager snapshotManager = table.snapshotManager(); + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + return; + } + TagManager tagManager = table.tagManager(); + TagDeletion tagDeletion = table.store().newTagDeletion(); + Instant instant = Instant.ofEpochMilli(snapshot.timeMillis()); + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + String tagName = + BATCH_WRITE_TAG_PREFIX + + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + try { + // If the tag already exists, delete the tag + if (tagManager.tagExists(tagName)) { + tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + } + // Create a new tag + tagManager.createTag(snapshot, tagName); + // Expire the tag + expireTag(); + } catch (Exception e) { + if (tagManager.tagExists(tagName)) { + tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + } + } + } + + private void expireTag() { + Integer tagNumRetainedMax = table.coreOptions().tagNumRetainedMax(); + if (tagNumRetainedMax != null) { + SnapshotManager snapshotManager = table.snapshotManager(); + if (snapshotManager.latestSnapshot() == null) { + return; + } + TagManager tagManager = table.tagManager(); + TagDeletion tagDeletion = table.store().newTagDeletion(); + SortedMap tags = tagManager.tags(); + if (tags.size() > tagNumRetainedMax) { + int toDelete = tags.size() - tagNumRetainedMax; + int i = 0; + for (String tag : tags.values()) { + tagManager.deleteTag(tag, tagDeletion, snapshotManager); + i++; + if (i == toDelete) { + break; + } + } + } + } + } + + @Override + public void open() throws Exception { + commitOperator.open(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + commitOperator.processElement(element); + } + + @Override + public void processWatermark(Watermark watermark) throws Exception { + commitOperator.processWatermark(watermark); + } + + @Override + public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + commitOperator.processWatermarkStatus(watermarkStatus); + } + + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { + commitOperator.processLatencyMarker(latencyMarker); + } + + @Override + public void finish() throws Exception { + createTag(); + commitOperator.finish(); + } + + @Override + public void close() throws Exception { + commitOperator.close(); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + commitOperator.prepareSnapshotPreBarrier(checkpointId); + } + + @Override + public void setKeyContextElement1(StreamRecord record) throws Exception { + commitOperator.setKeyContextElement1(record); + } + + @Override + public void setKeyContextElement2(StreamRecord record) throws Exception { + commitOperator.setKeyContextElement2(record); + } + + @Override + public OperatorMetricGroup getMetricGroup() { + return commitOperator.getMetricGroup(); + } + + @Override + public OperatorID getOperatorID() { + return commitOperator.getOperatorID(); + } + + @Override + public void setCurrentKey(Object key) { + commitOperator.setCurrentKey(key); + } + + @Override + public Object getCurrentKey() { + return commitOperator.getCurrentKey(); + } + + @Override + public void setKeyContextElement(StreamRecord record) throws Exception { + commitOperator.setKeyContextElement(record); + } + + @Override + public void endInput() throws Exception { + commitOperator.endInput(); + } + + @Override + public void setup(StreamTask containingTask, StreamConfig config, Output output) { + commitOperator.setup(containingTask, config, output); + } + + @Override + public ChainingStrategy getChainingStrategy() { + return commitOperator.getChainingStrategy(); + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + commitOperator.setChainingStrategy(strategy); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index c9a6f3c05d74..480d41ae8d7a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.CoreOptions.ChangelogProducer; +import org.apache.paimon.CoreOptions.TagCreationMode; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.MemorySize; @@ -203,6 +204,13 @@ protected DataStreamSink doCommit(DataStream written, String com table::tagManager, () -> table.store().newTagDeletion()); } + if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH + && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { + committerOperator = + new BatchWriteGeneratorTagOperator<>( + (CommitterOperator) committerOperator, + table); + } SingleOutputStreamOperator committed = written.transform( GLOBAL_COMMITTER_NAME + " : " + table.name(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java new file mode 100644 index 000000000000..fb5ebf85fb68 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.VersionedSerializerWrapper; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestCommittableSerializer; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Objects; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BatchWriteGeneratorTagOperator}. */ +public class BatchWriteGeneratorTagOperatorTest extends CommitterOperatorTest { + @Test + public void testBatchWriteGeneratorTag() throws Exception { + FileStoreTable table = createFileStoreTable(); + // set tag.automatic-creation = batch + HashMap dynamicOptions = new HashMap<>(); + dynamicOptions.put("tag.automatic-creation", "batch"); + dynamicOptions.put("tag.num-retained-max", "2"); + table = table.copy(dynamicOptions); + + StreamTableWrite write = + table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite(); + + OneInputStreamOperator committerOperator = + createCommitterOperator( + table, + initialCommitUser, + new RestoreAndFailCommittableStateManager<>( + () -> + new VersionedSerializerWrapper<>( + new ManifestCommittableSerializer()))); + + TableCommitImpl tableCommit = table.newCommit(initialCommitUser); + + write.write(GenericRow.of(1, 10L)); + tableCommit.commit(write.prepareCommit(false, 1)); + + SnapshotManager snapshotManager = table.newSnapshotReader().snapshotManager(); + TagManager tagManager = table.tagManager(); + + // Generate tag name + String BATCH_WRITE_TAG_PREFIX = "batch-write-"; + Instant instant = + Instant.ofEpochMilli( + Objects.requireNonNull(snapshotManager.latestSnapshot()).timeMillis()); + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + String tagName = + BATCH_WRITE_TAG_PREFIX + + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + + // No tag is generated before the finish method + assertThat(table.tagManager().tagCount()).isEqualTo(0); + committerOperator.finish(); + // After the finish method, a tag is generated + assertThat(table.tagManager().tagCount()).isEqualTo(1); + // The tag is consistent with the latest snapshot + assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot()); + } + + @Override + protected OneInputStreamOperator createCommitterOperator( + FileStoreTable table, + String commitUser, + CommittableStateManager committableStateManager) { + return new BatchWriteGeneratorTagOperator<>( + (CommitterOperator) + super.createCommitterOperator(table, commitUser, committableStateManager), + table); + } +}