diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index d70cccf6ba25..eab06ebfbfd5 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -32,7 +32,7 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java index 9fd906ee44dc..9a943b829dab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; /** Table partition mark done action for Flink. */ public class MarkPartitionDoneAction extends TableActionBase { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index f0a89a0bb32b..066e2b9845d6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -35,7 +35,7 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index 9ae53f401325..aa831ff4385f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -20,7 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; -import org.apache.paimon.flink.sink.partition.PartitionMarkDone; +import org.apache.paimon.flink.sink.partition.PartitionListeners; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; @@ -43,7 +43,7 @@ public class StoreCommitter implements Committer committables) throws IOException, InterruptedException { commit.commitMultiple(committables, false); calcNumBytesAndRecordsOut(committables); - if (partitionMarkDone != null) { - partitionMarkDone.notifyCommittable(committables); - } + partitionListeners.notifyCommittable(committables); } @Override public int filterAndCommit( List globalCommittables, boolean checkAppendFiles) { int committed = commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles); - if (partitionMarkDone != null) { - partitionMarkDone.notifyCommittable(globalCommittables); - } + partitionListeners.notifyCommittable(globalCommittables); return committed; } @Override public Map> groupByCheckpoint(Collection committables) { - if (partitionMarkDone != null) { - try { - partitionMarkDone.snapshotState(); - } catch (Exception e) { - throw new RuntimeException(e); - } + try { + partitionListeners.snapshotState(); + } catch (Exception e) { + throw new RuntimeException(e); } Map> grouped = new HashMap<>(); @@ -146,9 +140,7 @@ public Map> groupByCheckpoint(Collection co @Override public void close() throws Exception { commit.close(); - if (partitionMarkDone != null) { - partitionMarkDone.close(); - } + partitionListeners.close(); } private void calcNumBytesAndRecordsOut(List committables) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java new file mode 100644 index 000000000000..65d25fbc0271 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.manifest.ManifestCommittable; + +import java.io.Closeable; +import java.util.List; + +/** The partition listener. */ +public interface PartitionListener extends Closeable { + + void notifyCommittable(List committables); + + void snapshotState() throws Exception; +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java new file mode 100644 index 000000000000..0c68b5dce9f7 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.partition; + +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.IOUtils; + +import org.apache.flink.api.common.state.OperatorStateStore; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** Partition collector. */ +public class PartitionListeners implements Closeable { + + private final List listeners; + + private PartitionListeners(List listeners) { + this.listeners = listeners; + } + + public void notifyCommittable(List committables) { + for (PartitionListener trigger : listeners) { + trigger.notifyCommittable(committables); + } + } + + public void snapshotState() throws Exception { + for (PartitionListener trigger : listeners) { + trigger.snapshotState(); + } + } + + @Override + public void close() throws IOException { + IOUtils.closeAllQuietly(listeners); + } + + public static PartitionListeners create( + boolean isStreaming, + boolean isRestored, + OperatorStateStore stateStore, + FileStoreTable table) + throws Exception { + List listeners = new ArrayList<>(); + PartitionMarkDoneListener.create( + table.coreOptions(), isStreaming, isRestored, stateStore, table) + .ifPresent(listeners::add); + + return new PartitionListeners(listeners); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java deleted file mode 100644 index d0825bcdb752..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink.partition; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.CoreOptions.MergeEngine; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.manifest.ManifestCommittable; -import org.apache.paimon.options.Options; -import org.apache.paimon.partition.actions.PartitionMarkDoneAction; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.CommitMessageImpl; -import org.apache.paimon.utils.IOUtils; -import org.apache.paimon.utils.InternalRowPartitionComputer; -import org.apache.paimon.utils.PartitionPathUtils; - -import org.apache.flink.api.common.state.OperatorStateStore; - -import javax.annotation.Nullable; - -import java.io.Closeable; -import java.io.IOException; -import java.time.Duration; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; -import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE; - -/** Mark partition done. */ -public class PartitionMarkDone implements Closeable { - - private final InternalRowPartitionComputer partitionComputer; - private final PartitionMarkDoneTrigger trigger; - private final List actions; - private final boolean waitCompaction; - - @Nullable - public static PartitionMarkDone create( - boolean isStreaming, - boolean isRestored, - OperatorStateStore stateStore, - FileStoreTable table) - throws Exception { - CoreOptions coreOptions = table.coreOptions(); - Options options = coreOptions.toConfiguration(); - - if (disablePartitionMarkDone(isStreaming, table, options)) { - return null; - } - - InternalRowPartitionComputer partitionComputer = - new InternalRowPartitionComputer( - coreOptions.partitionDefaultName(), - table.schema().logicalPartitionType(), - table.partitionKeys().toArray(new String[0]), - coreOptions.legacyPartitionName()); - - PartitionMarkDoneTrigger trigger = - PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore); - - List actions = - PartitionMarkDoneAction.createActions(table, coreOptions); - - // if batch read skip level 0 files, we should wait compaction to mark done - // otherwise, some data may not be readable, and there might be data delays - boolean waitCompaction = - !table.primaryKeys().isEmpty() - && (coreOptions.deletionVectorsEnabled() - || coreOptions.mergeEngine() == MergeEngine.FIRST_ROW); - - return new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction); - } - - private static boolean disablePartitionMarkDone( - boolean isStreaming, FileStoreTable table, Options options) { - boolean partitionMarkDoneWhenEndInput = options.get(PARTITION_MARK_DONE_WHEN_END_INPUT); - if (!isStreaming && !partitionMarkDoneWhenEndInput) { - return true; - } - - Duration idleToDone = options.get(PARTITION_IDLE_TIME_TO_DONE); - if (isStreaming && idleToDone == null) { - return true; - } - - return table.partitionKeys().isEmpty(); - } - - public PartitionMarkDone( - InternalRowPartitionComputer partitionComputer, - PartitionMarkDoneTrigger trigger, - List actions, - boolean waitCompaction) { - this.partitionComputer = partitionComputer; - this.trigger = trigger; - this.actions = actions; - this.waitCompaction = waitCompaction; - } - - public void notifyCommittable(List committables) { - Set partitions = new HashSet<>(); - boolean endInput = false; - for (ManifestCommittable committable : committables) { - for (CommitMessage commitMessage : committable.fileCommittables()) { - CommitMessageImpl message = (CommitMessageImpl) commitMessage; - if (waitCompaction - || !message.indexIncrement().isEmpty() - || !message.newFilesIncrement().isEmpty()) { - partitions.add(message.partition()); - } - } - if (committable.identifier() == Long.MAX_VALUE) { - endInput = true; - } - } - - partitions.stream() - .map(partitionComputer::generatePartValues) - .map(PartitionPathUtils::generatePartitionPath) - .forEach(trigger::notifyPartition); - - markDone(trigger.donePartitions(endInput), actions); - } - - public static void markDone(List partitions, List actions) { - for (String partition : partitions) { - try { - for (PartitionMarkDoneAction action : actions) { - action.markDone(partition); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - public void snapshotState() throws Exception { - trigger.snapshotState(); - } - - @Override - public void close() throws IOException { - IOUtils.closeAllQuietly(actions); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java similarity index 57% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java index 0094c5511809..4707a2613d35 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java @@ -20,9 +20,18 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionTimeExtractor; +import org.apache.paimon.partition.actions.PartitionMarkDoneAction; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.PartitionPathUtils; import org.apache.paimon.utils.StringUtils; import org.apache.flink.api.common.state.ListState; @@ -33,6 +42,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; @@ -40,9 +50,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE; @@ -50,7 +63,7 @@ import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; /** Trigger to mark partitions done with streaming job. */ -public class PartitionMarkDoneTrigger { +public class PartitionMarkDoneListener implements PartitionListener { private static final ListStateDescriptor> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor<>( @@ -65,13 +78,19 @@ public class PartitionMarkDoneTrigger { @Nullable private final Long idleTime; private final boolean markDoneWhenEndInput; private final Map pendingPartitions; + private final List actions; + private final boolean waitCompaction; + private final InternalRowPartitionComputer partitionComputer; - public PartitionMarkDoneTrigger( + public PartitionMarkDoneListener( State state, PartitionTimeExtractor timeExtractor, @Nullable Duration timeInterval, @Nullable Duration idleTime, - boolean markDoneWhenEndInput) + boolean markDoneWhenEndInput, + List actions, + boolean waitCompaction, + InternalRowPartitionComputer partitionComputer) throws Exception { this( state, @@ -79,16 +98,22 @@ public PartitionMarkDoneTrigger( timeInterval, idleTime, System.currentTimeMillis(), - markDoneWhenEndInput); + markDoneWhenEndInput, + actions, + waitCompaction, + partitionComputer); } - public PartitionMarkDoneTrigger( + public PartitionMarkDoneListener( State state, PartitionTimeExtractor timeExtractor, @Nullable Duration timeInterval, @Nullable Duration idleTime, long currentTimeMillis, - boolean markDoneWhenEndInput) + boolean markDoneWhenEndInput, + List actions, + boolean waitCompaction, + InternalRowPartitionComputer partitionComputer) throws Exception { this.pendingPartitions = new HashMap<>(); this.state = state; @@ -96,9 +121,37 @@ public PartitionMarkDoneTrigger( this.timeInterval = timeInterval == null ? null : timeInterval.toMillis(); this.idleTime = idleTime == null ? null : idleTime.toMillis(); this.markDoneWhenEndInput = markDoneWhenEndInput; + this.actions = actions; + this.waitCompaction = waitCompaction; + this.partitionComputer = partitionComputer; state.restore().forEach(p -> pendingPartitions.put(p, currentTimeMillis)); } + @Override + public void notifyCommittable(List committables) { + Set partitions = new HashSet<>(); + boolean endInput = false; + for (ManifestCommittable committable : committables) { + for (CommitMessage commitMessage : committable.fileCommittables()) { + CommitMessageImpl message = (CommitMessageImpl) commitMessage; + if (waitCompaction + || !message.indexIncrement().isEmpty() + || !message.newFilesIncrement().isEmpty()) { + partitions.add(message.partition()); + } + } + if (committable.identifier() == Long.MAX_VALUE) { + endInput = true; + } + } + + partitions.stream() + .map(partitionComputer::generatePartValues) + .map(PartitionPathUtils::generatePartitionPath) + .forEach(this::notifyPartition); + markDone(donePartitions(endInput), actions); + } + public void notifyPartition(String partition) { notifyPartition(partition, System.currentTimeMillis()); } @@ -160,6 +213,23 @@ public void snapshotState() throws Exception { state.update(new ArrayList<>(pendingPartitions.keySet())); } + public static void markDone(List partitions, List actions) { + for (String partition : partitions) { + try { + for (PartitionMarkDoneAction action : actions) { + action.markDone(partition); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void close() throws IOException { + IOUtils.closeAllQuietly(actions); + } + /** State to store partitions. */ public interface State { List restore() throws Exception; @@ -197,17 +267,63 @@ public void update(List partitions) throws Exception { } } - public static PartitionMarkDoneTrigger create( - CoreOptions coreOptions, boolean isRestored, OperatorStateStore stateStore) + private static boolean disablePartitionMarkDone( + boolean isStreaming, FileStoreTable table, Options options) { + boolean partitionMarkDoneWhenEndInput = options.get(PARTITION_MARK_DONE_WHEN_END_INPUT); + if (!isStreaming && !partitionMarkDoneWhenEndInput) { + return true; + } + + Duration idleToDone = options.get(PARTITION_IDLE_TIME_TO_DONE); + if (isStreaming && idleToDone == null) { + return true; + } + + return table.partitionKeys().isEmpty(); + } + + public static Optional create( + CoreOptions coreOptions, + boolean isStreaming, + boolean isRestored, + OperatorStateStore stateStore, + FileStoreTable table) throws Exception { + + if (disablePartitionMarkDone(isStreaming, table, coreOptions.toConfiguration())) { + return Optional.empty(); + } + + List actions = + PartitionMarkDoneAction.createActions(table, coreOptions); + + // if batch read skip level 0 files, we should wait compaction to mark done + // otherwise, some data may not be readable, and there might be data delays + boolean waitCompaction = + !table.primaryKeys().isEmpty() + && (coreOptions.deletionVectorsEnabled() + || coreOptions.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW); + + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + coreOptions.partitionDefaultName(), + table.schema().logicalPartitionType(), + table.partitionKeys().toArray(new String[0]), + coreOptions.legacyPartitionName()); + Options options = coreOptions.toConfiguration(); - return new PartitionMarkDoneTrigger( - new PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState(isRestored, stateStore), - new PartitionTimeExtractor( - coreOptions.partitionTimestampPattern(), - coreOptions.partitionTimestampFormatter()), - options.get(PARTITION_TIME_INTERVAL), - options.get(PARTITION_IDLE_TIME_TO_DONE), - options.get(PARTITION_MARK_DONE_WHEN_END_INPUT)); + return Optional.of( + new PartitionMarkDoneListener( + new PartitionMarkDoneListener.PartitionMarkDoneTriggerState( + isRestored, stateStore), + new PartitionTimeExtractor( + coreOptions.partitionTimestampPattern(), + coreOptions.partitionTimestampFormatter()), + options.get(PARTITION_TIME_INTERVAL), + options.get(PARTITION_IDLE_TIME_TO_DONE), + options.get(PARTITION_MARK_DONE_WHEN_END_INPUT), + actions, + waitCompaction, + partitionComputer)); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java index ecef6d337a6b..0f3991a8ae1e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java @@ -54,7 +54,7 @@ import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.assertj.core.api.Assertions.assertThat; -class PartitionMarkDoneTest extends TableTestBase { +class PartitionListenersTest extends TableTestBase { @Test public void testTriggerByCompaction() throws Exception { @@ -85,8 +85,8 @@ private void innerTest(boolean deletionVectors) throws Exception { FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); Path location = catalog.getTableLocation(identifier); Path successFile = new Path(location, "a=0/_SUCCESS"); - PartitionMarkDone markDone = - PartitionMarkDone.create(false, false, new MockOperatorStateStore(), table); + PartitionListeners markDone = + PartitionListeners.create(false, false, new MockOperatorStateStore(), table); notifyCommits(markDone, true); assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors); @@ -97,7 +97,7 @@ private void innerTest(boolean deletionVectors) throws Exception { } } - private void notifyCommits(PartitionMarkDone markDone, boolean isCompact) { + private void notifyCommits(PartitionListeners markDone, boolean isCompact) { ManifestCommittable committable = new ManifestCommittable(Long.MAX_VALUE); DataFileMeta file = DataFileTestUtils.newFile(); CommitMessageImpl compactMessage; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java index b00906d1c175..bbf555dc38fc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java @@ -20,6 +20,9 @@ import org.apache.paimon.partition.PartitionTimeExtractor; import org.apache.paimon.testutils.assertj.PaimonAssertions; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,25 +33,27 @@ import java.time.LocalTime; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +/** Test for {@link PartitionMarkDoneListener}. */ class PartitionMarkDoneTriggerTest { private static final Duration timeInterval = Duration.ofDays(1); private static final Duration idleTime = Duration.ofMinutes(15); private List pendingPartitions; - private PartitionMarkDoneTrigger.State state; + private PartitionMarkDoneListener.State state; private PartitionTimeExtractor extractor; @BeforeEach public void before() throws Exception { this.pendingPartitions = new ArrayList<>(); this.state = - new PartitionMarkDoneTrigger.State() { + new PartitionMarkDoneListener.State() { @Override public List restore() { return new ArrayList<>(pendingPartitions); @@ -65,14 +70,21 @@ public void update(List partitions) { @Test public void testWithoutEndInput() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval, idleTime, toEpochMillis("2024-02-01"), - false); + false, + Collections.emptyList(), + false, + new InternalRowPartitionComputer( + "", + RowType.builder().field("p1", DataTypes.STRING()).build(), + new String[] {"p1"}, + false)); // test not reach partition end + idle time trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01")); @@ -112,13 +124,20 @@ public void testWithoutEndInput() throws Exception { // test restore pendingPartitions.add("dt=2024-02-04"); trigger = - new PartitionMarkDoneTrigger( + new PartitionMarkDoneListener( state, extractor, timeInterval, idleTime, toEpochMillis("2024-02-06"), - false); + false, + Collections.emptyList(), + false, + new InternalRowPartitionComputer( + "", + RowType.builder().field("p1", DataTypes.STRING()).build(), + new String[] {"p1"}, + false)); partitions = trigger.donePartitions(false, toEpochMillis("2024-02-06")); assertThat(partitions).isEmpty(); partitions = @@ -129,14 +148,21 @@ public void testWithoutEndInput() throws Exception { @Test public void testWithEndInput() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval, idleTime, toEpochMillis("2024-02-01"), - true); + true, + Collections.emptyList(), + false, + new InternalRowPartitionComputer( + "", + RowType.builder().field("p1", DataTypes.STRING()).build(), + new String[] {"p1"}, + false)); // test not reach partition end + idle time trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01")); @@ -146,14 +172,21 @@ public void testWithEndInput() throws Exception { @Test public void testParseNonDateFormattedPartition() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval, idleTime, toEpochMillis("2024-02-01"), - true); + true, + Collections.emptyList(), + false, + new InternalRowPartitionComputer( + "", + RowType.builder().field("p1", DataTypes.STRING()).build(), + new String[] {"p1"}, + false)); assertThatThrownBy(() -> trigger.extractDateTime("unknown")) .satisfies(