From ed485ea61873c5029c28cb0961209d11c982ca39 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Mon, 28 Oct 2024 13:30:11 +0800 Subject: [PATCH 1/3] [flink] PartitionMarkDone enables the use of various partition trigger strategies. --- .../procedure/MarkPartitionDoneProcedure.java | 2 +- .../flink/action/MarkPartitionDoneAction.java | 2 +- .../procedure/MarkPartitionDoneProcedure.java | 2 +- .../paimon/flink/sink/StoreCommitter.java | 30 ++-- .../sink/partition/PartitionCollector.java | 71 ++++++++ .../sink/partition/PartitionMarkDone.java | 164 ------------------ .../partition/PartitionMarkDoneTrigger.java | 144 +++++++++++++-- .../sink/partition/PartitionTrigger.java | 13 ++ ...eTest.java => PartitionCollectorTest.java} | 8 +- .../PartitionMarkDoneTriggerTest.java | 41 ++++- 10 files changed, 269 insertions(+), 208 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/{PartitionMarkDoneTest.java => PartitionCollectorTest.java} (96%) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index d70cccf6ba25..15d0c9e5d972 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -32,7 +32,7 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java index 9fd906ee44dc..e49d16b2afd4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; /** Table partition mark done action for Flink. */ public class MarkPartitionDoneAction extends TableActionBase { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index f0a89a0bb32b..e96e6ab3f25b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -35,7 +35,7 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index 9ae53f401325..840f84e39a12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -20,7 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; -import org.apache.paimon.flink.sink.partition.PartitionMarkDone; +import org.apache.paimon.flink.sink.partition.PartitionCollector; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; @@ -43,7 +43,7 @@ public class StoreCommitter implements Committer committables) throws IOException, InterruptedException { commit.commitMultiple(committables, false); calcNumBytesAndRecordsOut(committables); - if (partitionMarkDone != null) { - partitionMarkDone.notifyCommittable(committables); - } + partitionCollector.notifyCommittable(committables); } @Override public int filterAndCommit( List globalCommittables, boolean checkAppendFiles) { int committed = commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles); - if (partitionMarkDone != null) { - partitionMarkDone.notifyCommittable(globalCommittables); - } + partitionCollector.notifyCommittable(globalCommittables); return committed; } @Override public Map> groupByCheckpoint(Collection committables) { - if (partitionMarkDone != null) { - try { - partitionMarkDone.snapshotState(); - } catch (Exception e) { - throw new RuntimeException(e); - } + try { + partitionCollector.snapshotState(); + } catch (Exception e) { + throw new RuntimeException(e); } Map> grouped = new HashMap<>(); @@ -146,9 +140,7 @@ public Map> groupByCheckpoint(Collection co @Override public void close() throws Exception { commit.close(); - if (partitionMarkDone != null) { - partitionMarkDone.close(); - } + partitionCollector.close(); } private void calcNumBytesAndRecordsOut(List committables) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java new file mode 100644 index 000000000000..2e692b908a87 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.IOUtils; + +import org.apache.flink.api.common.state.OperatorStateStore; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** Partition collector. */ +public class PartitionCollector implements Closeable { + + private final List triggers; + + private PartitionCollector(List triggers) { + this.triggers = triggers; + } + + public void notifyCommittable(List committables) { + for (PartitionTrigger trigger : triggers) { + trigger.notifyCommittable(committables); + } + } + + public void snapshotState() throws Exception { + for (PartitionTrigger trigger : triggers) { + trigger.snapshotState(); + } + } + + @Override + public void close() throws IOException { + IOUtils.closeAllQuietly(triggers); + } + + public static PartitionCollector create( + boolean isStreaming, + boolean isRestored, + OperatorStateStore stateStore, + FileStoreTable table) + throws Exception { + List triggers = new ArrayList<>(); + PartitionMarkDoneTrigger.create( + table.coreOptions(), isStreaming, isRestored, stateStore, table) + .ifPresent(triggers::add); + + return new PartitionCollector(triggers); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java deleted file mode 100644 index d0825bcdb752..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink.partition; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.CoreOptions.MergeEngine; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.manifest.ManifestCommittable; -import org.apache.paimon.options.Options; -import org.apache.paimon.partition.actions.PartitionMarkDoneAction; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.CommitMessageImpl; -import org.apache.paimon.utils.IOUtils; -import org.apache.paimon.utils.InternalRowPartitionComputer; -import org.apache.paimon.utils.PartitionPathUtils; - -import org.apache.flink.api.common.state.OperatorStateStore; - -import javax.annotation.Nullable; - -import java.io.Closeable; -import java.io.IOException; -import java.time.Duration; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; -import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE; - -/** Mark partition done. */ -public class PartitionMarkDone implements Closeable { - - private final InternalRowPartitionComputer partitionComputer; - private final PartitionMarkDoneTrigger trigger; - private final List actions; - private final boolean waitCompaction; - - @Nullable - public static PartitionMarkDone create( - boolean isStreaming, - boolean isRestored, - OperatorStateStore stateStore, - FileStoreTable table) - throws Exception { - CoreOptions coreOptions = table.coreOptions(); - Options options = coreOptions.toConfiguration(); - - if (disablePartitionMarkDone(isStreaming, table, options)) { - return null; - } - - InternalRowPartitionComputer partitionComputer = - new InternalRowPartitionComputer( - coreOptions.partitionDefaultName(), - table.schema().logicalPartitionType(), - table.partitionKeys().toArray(new String[0]), - coreOptions.legacyPartitionName()); - - PartitionMarkDoneTrigger trigger = - PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore); - - List actions = - PartitionMarkDoneAction.createActions(table, coreOptions); - - // if batch read skip level 0 files, we should wait compaction to mark done - // otherwise, some data may not be readable, and there might be data delays - boolean waitCompaction = - !table.primaryKeys().isEmpty() - && (coreOptions.deletionVectorsEnabled() - || coreOptions.mergeEngine() == MergeEngine.FIRST_ROW); - - return new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction); - } - - private static boolean disablePartitionMarkDone( - boolean isStreaming, FileStoreTable table, Options options) { - boolean partitionMarkDoneWhenEndInput = options.get(PARTITION_MARK_DONE_WHEN_END_INPUT); - if (!isStreaming && !partitionMarkDoneWhenEndInput) { - return true; - } - - Duration idleToDone = options.get(PARTITION_IDLE_TIME_TO_DONE); - if (isStreaming && idleToDone == null) { - return true; - } - - return table.partitionKeys().isEmpty(); - } - - public PartitionMarkDone( - InternalRowPartitionComputer partitionComputer, - PartitionMarkDoneTrigger trigger, - List actions, - boolean waitCompaction) { - this.partitionComputer = partitionComputer; - this.trigger = trigger; - this.actions = actions; - this.waitCompaction = waitCompaction; - } - - public void notifyCommittable(List committables) { - Set partitions = new HashSet<>(); - boolean endInput = false; - for (ManifestCommittable committable : committables) { - for (CommitMessage commitMessage : committable.fileCommittables()) { - CommitMessageImpl message = (CommitMessageImpl) commitMessage; - if (waitCompaction - || !message.indexIncrement().isEmpty() - || !message.newFilesIncrement().isEmpty()) { - partitions.add(message.partition()); - } - } - if (committable.identifier() == Long.MAX_VALUE) { - endInput = true; - } - } - - partitions.stream() - .map(partitionComputer::generatePartValues) - .map(PartitionPathUtils::generatePartitionPath) - .forEach(trigger::notifyPartition); - - markDone(trigger.donePartitions(endInput), actions); - } - - public static void markDone(List partitions, List actions) { - for (String partition : partitions) { - try { - for (PartitionMarkDoneAction action : actions) { - action.markDone(partition); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - public void snapshotState() throws Exception { - trigger.snapshotState(); - } - - @Override - public void close() throws IOException { - IOUtils.closeAllQuietly(actions); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java index 0094c5511809..e3919cbb34ec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java @@ -20,9 +20,18 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionTimeExtractor; +import org.apache.paimon.partition.actions.PartitionMarkDoneAction; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.PartitionPathUtils; import org.apache.paimon.utils.StringUtils; import org.apache.flink.api.common.state.ListState; @@ -33,6 +42,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; @@ -40,9 +50,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE; @@ -50,7 +63,7 @@ import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; /** Trigger to mark partitions done with streaming job. */ -public class PartitionMarkDoneTrigger { +public class PartitionMarkDoneTrigger implements PartitionTrigger { private static final ListStateDescriptor> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor<>( @@ -65,13 +78,19 @@ public class PartitionMarkDoneTrigger { @Nullable private final Long idleTime; private final boolean markDoneWhenEndInput; private final Map pendingPartitions; + private final List actions; + private final boolean waitCompaction; + private final InternalRowPartitionComputer partitionComputer; public PartitionMarkDoneTrigger( State state, PartitionTimeExtractor timeExtractor, @Nullable Duration timeInterval, @Nullable Duration idleTime, - boolean markDoneWhenEndInput) + boolean markDoneWhenEndInput, + List actions, + boolean waitCompaction, + InternalRowPartitionComputer partitionComputer) throws Exception { this( state, @@ -79,7 +98,10 @@ public PartitionMarkDoneTrigger( timeInterval, idleTime, System.currentTimeMillis(), - markDoneWhenEndInput); + markDoneWhenEndInput, + actions, + waitCompaction, + partitionComputer); } public PartitionMarkDoneTrigger( @@ -88,7 +110,10 @@ public PartitionMarkDoneTrigger( @Nullable Duration timeInterval, @Nullable Duration idleTime, long currentTimeMillis, - boolean markDoneWhenEndInput) + boolean markDoneWhenEndInput, + List actions, + boolean waitCompaction, + InternalRowPartitionComputer partitionComputer) throws Exception { this.pendingPartitions = new HashMap<>(); this.state = state; @@ -96,9 +121,37 @@ public PartitionMarkDoneTrigger( this.timeInterval = timeInterval == null ? null : timeInterval.toMillis(); this.idleTime = idleTime == null ? null : idleTime.toMillis(); this.markDoneWhenEndInput = markDoneWhenEndInput; + this.actions = actions; + this.waitCompaction = waitCompaction; + this.partitionComputer = partitionComputer; state.restore().forEach(p -> pendingPartitions.put(p, currentTimeMillis)); } + @Override + public void notifyCommittable(List committables) { + Set partitions = new HashSet<>(); + boolean endInput = false; + for (ManifestCommittable committable : committables) { + for (CommitMessage commitMessage : committable.fileCommittables()) { + CommitMessageImpl message = (CommitMessageImpl) commitMessage; + if (waitCompaction + || !message.indexIncrement().isEmpty() + || !message.newFilesIncrement().isEmpty()) { + partitions.add(message.partition()); + } + } + if (committable.identifier() == Long.MAX_VALUE) { + endInput = true; + } + } + + partitions.stream() + .map(partitionComputer::generatePartValues) + .map(PartitionPathUtils::generatePartitionPath) + .forEach(this::notifyPartition); + markDone(donePartitions(endInput), actions); + } + public void notifyPartition(String partition) { notifyPartition(partition, System.currentTimeMillis()); } @@ -160,6 +213,23 @@ public void snapshotState() throws Exception { state.update(new ArrayList<>(pendingPartitions.keySet())); } + public static void markDone(List partitions, List actions) { + for (String partition : partitions) { + try { + for (PartitionMarkDoneAction action : actions) { + action.markDone(partition); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void close() throws IOException { + IOUtils.closeAllQuietly(actions); + } + /** State to store partitions. */ public interface State { List restore() throws Exception; @@ -197,17 +267,63 @@ public void update(List partitions) throws Exception { } } - public static PartitionMarkDoneTrigger create( - CoreOptions coreOptions, boolean isRestored, OperatorStateStore stateStore) + private static boolean disablePartitionMarkDone( + boolean isStreaming, FileStoreTable table, Options options) { + boolean partitionMarkDoneWhenEndInput = options.get(PARTITION_MARK_DONE_WHEN_END_INPUT); + if (!isStreaming && !partitionMarkDoneWhenEndInput) { + return true; + } + + Duration idleToDone = options.get(PARTITION_IDLE_TIME_TO_DONE); + if (isStreaming && idleToDone == null) { + return true; + } + + return table.partitionKeys().isEmpty(); + } + + public static Optional create( + CoreOptions coreOptions, + boolean isStreaming, + boolean isRestored, + OperatorStateStore stateStore, + FileStoreTable table) throws Exception { + + if (disablePartitionMarkDone(isStreaming, table, coreOptions.toConfiguration())) { + return Optional.empty(); + } + + List actions = + PartitionMarkDoneAction.createActions(table, coreOptions); + + // if batch read skip level 0 files, we should wait compaction to mark done + // otherwise, some data may not be readable, and there might be data delays + boolean waitCompaction = + !table.primaryKeys().isEmpty() + && (coreOptions.deletionVectorsEnabled() + || coreOptions.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW); + + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + coreOptions.partitionDefaultName(), + table.schema().logicalPartitionType(), + table.partitionKeys().toArray(new String[0]), + coreOptions.legacyPartitionName()); + Options options = coreOptions.toConfiguration(); - return new PartitionMarkDoneTrigger( - new PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState(isRestored, stateStore), - new PartitionTimeExtractor( - coreOptions.partitionTimestampPattern(), - coreOptions.partitionTimestampFormatter()), - options.get(PARTITION_TIME_INTERVAL), - options.get(PARTITION_IDLE_TIME_TO_DONE), - options.get(PARTITION_MARK_DONE_WHEN_END_INPUT)); + return Optional.of( + new PartitionMarkDoneTrigger( + new PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState( + isRestored, stateStore), + new PartitionTimeExtractor( + coreOptions.partitionTimestampPattern(), + coreOptions.partitionTimestampFormatter()), + options.get(PARTITION_TIME_INTERVAL), + options.get(PARTITION_IDLE_TIME_TO_DONE), + options.get(PARTITION_MARK_DONE_WHEN_END_INPUT), + actions, + waitCompaction, + partitionComputer)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java new file mode 100644 index 000000000000..5a8b0753629d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java @@ -0,0 +1,13 @@ +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.manifest.ManifestCommittable; + +import java.io.Closeable; +import java.util.List; + +public interface PartitionTrigger extends Closeable { + + void notifyCommittable(List committables); + + void snapshotState() throws Exception; +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java index ecef6d337a6b..4e69c55bdf37 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java @@ -54,7 +54,7 @@ import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.assertj.core.api.Assertions.assertThat; -class PartitionMarkDoneTest extends TableTestBase { +class PartitionCollectorTest extends TableTestBase { @Test public void testTriggerByCompaction() throws Exception { @@ -85,8 +85,8 @@ private void innerTest(boolean deletionVectors) throws Exception { FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); Path location = catalog.getTableLocation(identifier); Path successFile = new Path(location, "a=0/_SUCCESS"); - PartitionMarkDone markDone = - PartitionMarkDone.create(false, false, new MockOperatorStateStore(), table); + PartitionCollector markDone = + PartitionCollector.create(false, false, new MockOperatorStateStore(), table); notifyCommits(markDone, true); assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors); @@ -97,7 +97,7 @@ private void innerTest(boolean deletionVectors) throws Exception { } } - private void notifyCommits(PartitionMarkDone markDone, boolean isCompact) { + private void notifyCommits(PartitionCollector markDone, boolean isCompact) { ManifestCommittable committable = new ManifestCommittable(Long.MAX_VALUE); DataFileMeta file = DataFileTestUtils.newFile(); CommitMessageImpl compactMessage; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java index b00906d1c175..092a08e6fe30 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java @@ -20,6 +20,9 @@ import org.apache.paimon.partition.PartitionTimeExtractor; import org.apache.paimon.testutils.assertj.PaimonAssertions; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,11 +33,13 @@ import java.time.LocalTime; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +/** Test for {@link PartitionMarkDoneTrigger}. */ class PartitionMarkDoneTriggerTest { private static final Duration timeInterval = Duration.ofDays(1); @@ -72,7 +77,14 @@ public void testWithoutEndInput() throws Exception { timeInterval, idleTime, toEpochMillis("2024-02-01"), - false); + false, + Collections.emptyList(), + false, + new InternalRowPartitionComputer( + "", + RowType.builder().field("p1", DataTypes.STRING()).build(), + new String[] {"p1"}, + false)); // test not reach partition end + idle time trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01")); @@ -118,7 +130,14 @@ public void testWithoutEndInput() throws Exception { timeInterval, idleTime, toEpochMillis("2024-02-06"), - false); + false, + Collections.emptyList(), + false, + new InternalRowPartitionComputer( + "", + RowType.builder().field("p1", DataTypes.STRING()).build(), + new String[] {"p1"}, + false)); partitions = trigger.donePartitions(false, toEpochMillis("2024-02-06")); assertThat(partitions).isEmpty(); partitions = @@ -136,7 +155,14 @@ public void testWithEndInput() throws Exception { timeInterval, idleTime, toEpochMillis("2024-02-01"), - true); + true, + Collections.emptyList(), + false, + new InternalRowPartitionComputer( + "", + RowType.builder().field("p1", DataTypes.STRING()).build(), + new String[] {"p1"}, + false)); // test not reach partition end + idle time trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01")); @@ -153,7 +179,14 @@ public void testParseNonDateFormattedPartition() throws Exception { timeInterval, idleTime, toEpochMillis("2024-02-01"), - true); + true, + Collections.emptyList(), + false, + new InternalRowPartitionComputer( + "", + RowType.builder().field("p1", DataTypes.STRING()).build(), + new String[] {"p1"}, + false)); assertThatThrownBy(() -> trigger.extractDateTime("unknown")) .satisfies( From 91d225bcc401a108196ad5bad65ac1cd68ca44e5 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Mon, 28 Oct 2024 13:38:27 +0800 Subject: [PATCH 2/3] format --- .../sink/partition/PartitionTrigger.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java index 5a8b0753629d..4bb9097042fc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.paimon.flink.sink.partition; import org.apache.paimon.manifest.ManifestCommittable; @@ -5,6 +23,7 @@ import java.io.Closeable; import java.util.List; +/** The partition trigger. */ public interface PartitionTrigger extends Closeable { void notifyCommittable(List committables); From edcd647e249f2da5679f81ed89c609f6df005e6c Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 29 Oct 2024 15:12:50 +0800 Subject: [PATCH 3/3] rename --- .../procedure/MarkPartitionDoneProcedure.java | 2 +- .../flink/action/MarkPartitionDoneAction.java | 2 +- .../procedure/MarkPartitionDoneProcedure.java | 2 +- .../paimon/flink/sink/StoreCommitter.java | 16 ++++++------- ...ionTrigger.java => PartitionListener.java} | 4 ++-- ...Collector.java => PartitionListeners.java} | 24 +++++++++---------- ...er.java => PartitionMarkDoneListener.java} | 12 +++++----- ...rTest.java => PartitionListenersTest.java} | 8 +++---- .../PartitionMarkDoneTriggerTest.java | 20 ++++++++-------- 9 files changed, 45 insertions(+), 45 deletions(-) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/{PartitionTrigger.java => PartitionListener.java} (92%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/{PartitionCollector.java => PartitionListeners.java} (74%) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/{PartitionMarkDoneTrigger.java => PartitionMarkDoneListener.java} (97%) rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/{PartitionCollectorTest.java => PartitionListenersTest.java} (96%) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index 15d0c9e5d972..eab06ebfbfd5 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -32,7 +32,7 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java index e49d16b2afd4..9a943b829dab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; /** Table partition mark done action for Flink. */ public class MarkPartitionDoneAction extends TableActionBase { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index e96e6ab3f25b..066e2b9845d6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -35,7 +35,7 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index 840f84e39a12..aa831ff4385f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -20,7 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; -import org.apache.paimon.flink.sink.partition.PartitionCollector; +import org.apache.paimon.flink.sink.partition.PartitionListeners; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; @@ -43,7 +43,7 @@ public class StoreCommitter implements Committer committables) throws IOException, InterruptedException { commit.commitMultiple(committables, false); calcNumBytesAndRecordsOut(committables); - partitionCollector.notifyCommittable(committables); + partitionListeners.notifyCommittable(committables); } @Override public int filterAndCommit( List globalCommittables, boolean checkAppendFiles) { int committed = commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles); - partitionCollector.notifyCommittable(globalCommittables); + partitionListeners.notifyCommittable(globalCommittables); return committed; } @Override public Map> groupByCheckpoint(Collection committables) { try { - partitionCollector.snapshotState(); + partitionListeners.snapshotState(); } catch (Exception e) { throw new RuntimeException(e); } @@ -140,7 +140,7 @@ public Map> groupByCheckpoint(Collection co @Override public void close() throws Exception { commit.close(); - partitionCollector.close(); + partitionListeners.close(); } private void calcNumBytesAndRecordsOut(List committables) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java similarity index 92% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java index 4bb9097042fc..65d25fbc0271 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java @@ -23,8 +23,8 @@ import java.io.Closeable; import java.util.List; -/** The partition trigger. */ -public interface PartitionTrigger extends Closeable { +/** The partition listener. */ +public interface PartitionListener extends Closeable { void notifyCommittable(List committables); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java similarity index 74% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java index 2e692b908a87..0c68b5dce9f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java @@ -30,42 +30,42 @@ import java.util.List; /** Partition collector. */ -public class PartitionCollector implements Closeable { +public class PartitionListeners implements Closeable { - private final List triggers; + private final List listeners; - private PartitionCollector(List triggers) { - this.triggers = triggers; + private PartitionListeners(List listeners) { + this.listeners = listeners; } public void notifyCommittable(List committables) { - for (PartitionTrigger trigger : triggers) { + for (PartitionListener trigger : listeners) { trigger.notifyCommittable(committables); } } public void snapshotState() throws Exception { - for (PartitionTrigger trigger : triggers) { + for (PartitionListener trigger : listeners) { trigger.snapshotState(); } } @Override public void close() throws IOException { - IOUtils.closeAllQuietly(triggers); + IOUtils.closeAllQuietly(listeners); } - public static PartitionCollector create( + public static PartitionListeners create( boolean isStreaming, boolean isRestored, OperatorStateStore stateStore, FileStoreTable table) throws Exception { - List triggers = new ArrayList<>(); - PartitionMarkDoneTrigger.create( + List listeners = new ArrayList<>(); + PartitionMarkDoneListener.create( table.coreOptions(), isStreaming, isRestored, stateStore, table) - .ifPresent(triggers::add); + .ifPresent(listeners::add); - return new PartitionCollector(triggers); + return new PartitionListeners(listeners); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java similarity index 97% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java index e3919cbb34ec..4707a2613d35 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java @@ -63,7 +63,7 @@ import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; /** Trigger to mark partitions done with streaming job. */ -public class PartitionMarkDoneTrigger implements PartitionTrigger { +public class PartitionMarkDoneListener implements PartitionListener { private static final ListStateDescriptor> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor<>( @@ -82,7 +82,7 @@ public class PartitionMarkDoneTrigger implements PartitionTrigger { private final boolean waitCompaction; private final InternalRowPartitionComputer partitionComputer; - public PartitionMarkDoneTrigger( + public PartitionMarkDoneListener( State state, PartitionTimeExtractor timeExtractor, @Nullable Duration timeInterval, @@ -104,7 +104,7 @@ public PartitionMarkDoneTrigger( partitionComputer); } - public PartitionMarkDoneTrigger( + public PartitionMarkDoneListener( State state, PartitionTimeExtractor timeExtractor, @Nullable Duration timeInterval, @@ -282,7 +282,7 @@ private static boolean disablePartitionMarkDone( return table.partitionKeys().isEmpty(); } - public static Optional create( + public static Optional create( CoreOptions coreOptions, boolean isStreaming, boolean isRestored, @@ -313,8 +313,8 @@ public static Optional create( Options options = coreOptions.toConfiguration(); return Optional.of( - new PartitionMarkDoneTrigger( - new PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState( + new PartitionMarkDoneListener( + new PartitionMarkDoneListener.PartitionMarkDoneTriggerState( isRestored, stateStore), new PartitionTimeExtractor( coreOptions.partitionTimestampPattern(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java index 4e69c55bdf37..0f3991a8ae1e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java @@ -54,7 +54,7 @@ import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.assertj.core.api.Assertions.assertThat; -class PartitionCollectorTest extends TableTestBase { +class PartitionListenersTest extends TableTestBase { @Test public void testTriggerByCompaction() throws Exception { @@ -85,8 +85,8 @@ private void innerTest(boolean deletionVectors) throws Exception { FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); Path location = catalog.getTableLocation(identifier); Path successFile = new Path(location, "a=0/_SUCCESS"); - PartitionCollector markDone = - PartitionCollector.create(false, false, new MockOperatorStateStore(), table); + PartitionListeners markDone = + PartitionListeners.create(false, false, new MockOperatorStateStore(), table); notifyCommits(markDone, true); assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors); @@ -97,7 +97,7 @@ private void innerTest(boolean deletionVectors) throws Exception { } } - private void notifyCommits(PartitionCollector markDone, boolean isCompact) { + private void notifyCommits(PartitionListeners markDone, boolean isCompact) { ManifestCommittable committable = new ManifestCommittable(Long.MAX_VALUE); DataFileMeta file = DataFileTestUtils.newFile(); CommitMessageImpl compactMessage; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java index 092a08e6fe30..bbf555dc38fc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java @@ -39,21 +39,21 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link PartitionMarkDoneTrigger}. */ +/** Test for {@link PartitionMarkDoneListener}. */ class PartitionMarkDoneTriggerTest { private static final Duration timeInterval = Duration.ofDays(1); private static final Duration idleTime = Duration.ofMinutes(15); private List pendingPartitions; - private PartitionMarkDoneTrigger.State state; + private PartitionMarkDoneListener.State state; private PartitionTimeExtractor extractor; @BeforeEach public void before() throws Exception { this.pendingPartitions = new ArrayList<>(); this.state = - new PartitionMarkDoneTrigger.State() { + new PartitionMarkDoneListener.State() { @Override public List restore() { return new ArrayList<>(pendingPartitions); @@ -70,8 +70,8 @@ public void update(List partitions) { @Test public void testWithoutEndInput() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval, @@ -124,7 +124,7 @@ public void testWithoutEndInput() throws Exception { // test restore pendingPartitions.add("dt=2024-02-04"); trigger = - new PartitionMarkDoneTrigger( + new PartitionMarkDoneListener( state, extractor, timeInterval, @@ -148,8 +148,8 @@ public void testWithoutEndInput() throws Exception { @Test public void testWithEndInput() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval, @@ -172,8 +172,8 @@ public void testWithEndInput() throws Exception { @Test public void testParseNonDateFormattedPartition() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval,