diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index 15d0c9e5d972..eab06ebfbfd5 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -32,7 +32,7 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java index e49d16b2afd4..9a943b829dab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; /** Table partition mark done action for Flink. */ public class MarkPartitionDoneAction extends TableActionBase { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index e96e6ab3f25b..066e2b9845d6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -35,7 +35,7 @@ import java.io.IOException; import java.util.List; -import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone; +import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index 840f84e39a12..aa831ff4385f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -20,7 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; -import org.apache.paimon.flink.sink.partition.PartitionCollector; +import org.apache.paimon.flink.sink.partition.PartitionListeners; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; @@ -43,7 +43,7 @@ public class StoreCommitter implements Committer committables) throws IOException, InterruptedException { commit.commitMultiple(committables, false); calcNumBytesAndRecordsOut(committables); - partitionCollector.notifyCommittable(committables); + partitionListeners.notifyCommittable(committables); } @Override public int filterAndCommit( List globalCommittables, boolean checkAppendFiles) { int committed = commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles); - partitionCollector.notifyCommittable(globalCommittables); + partitionListeners.notifyCommittable(globalCommittables); return committed; } @Override public Map> groupByCheckpoint(Collection committables) { try { - partitionCollector.snapshotState(); + partitionListeners.snapshotState(); } catch (Exception e) { throw new RuntimeException(e); } @@ -140,7 +140,7 @@ public Map> groupByCheckpoint(Collection co @Override public void close() throws Exception { commit.close(); - partitionCollector.close(); + partitionListeners.close(); } private void calcNumBytesAndRecordsOut(List committables) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java similarity index 92% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java index 4bb9097042fc..65d25fbc0271 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java @@ -23,8 +23,8 @@ import java.io.Closeable; import java.util.List; -/** The partition trigger. */ -public interface PartitionTrigger extends Closeable { +/** The partition listener. */ +public interface PartitionListener extends Closeable { void notifyCommittable(List committables); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java similarity index 74% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java index 2e692b908a87..0c68b5dce9f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionCollector.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java @@ -30,42 +30,42 @@ import java.util.List; /** Partition collector. */ -public class PartitionCollector implements Closeable { +public class PartitionListeners implements Closeable { - private final List triggers; + private final List listeners; - private PartitionCollector(List triggers) { - this.triggers = triggers; + private PartitionListeners(List listeners) { + this.listeners = listeners; } public void notifyCommittable(List committables) { - for (PartitionTrigger trigger : triggers) { + for (PartitionListener trigger : listeners) { trigger.notifyCommittable(committables); } } public void snapshotState() throws Exception { - for (PartitionTrigger trigger : triggers) { + for (PartitionListener trigger : listeners) { trigger.snapshotState(); } } @Override public void close() throws IOException { - IOUtils.closeAllQuietly(triggers); + IOUtils.closeAllQuietly(listeners); } - public static PartitionCollector create( + public static PartitionListeners create( boolean isStreaming, boolean isRestored, OperatorStateStore stateStore, FileStoreTable table) throws Exception { - List triggers = new ArrayList<>(); - PartitionMarkDoneTrigger.create( + List listeners = new ArrayList<>(); + PartitionMarkDoneListener.create( table.coreOptions(), isStreaming, isRestored, stateStore, table) - .ifPresent(triggers::add); + .ifPresent(listeners::add); - return new PartitionCollector(triggers); + return new PartitionListeners(listeners); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java similarity index 97% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java index e3919cbb34ec..4707a2613d35 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java @@ -63,7 +63,7 @@ import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; /** Trigger to mark partitions done with streaming job. */ -public class PartitionMarkDoneTrigger implements PartitionTrigger { +public class PartitionMarkDoneListener implements PartitionListener { private static final ListStateDescriptor> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor<>( @@ -82,7 +82,7 @@ public class PartitionMarkDoneTrigger implements PartitionTrigger { private final boolean waitCompaction; private final InternalRowPartitionComputer partitionComputer; - public PartitionMarkDoneTrigger( + public PartitionMarkDoneListener( State state, PartitionTimeExtractor timeExtractor, @Nullable Duration timeInterval, @@ -104,7 +104,7 @@ public PartitionMarkDoneTrigger( partitionComputer); } - public PartitionMarkDoneTrigger( + public PartitionMarkDoneListener( State state, PartitionTimeExtractor timeExtractor, @Nullable Duration timeInterval, @@ -282,7 +282,7 @@ private static boolean disablePartitionMarkDone( return table.partitionKeys().isEmpty(); } - public static Optional create( + public static Optional create( CoreOptions coreOptions, boolean isStreaming, boolean isRestored, @@ -313,8 +313,8 @@ public static Optional create( Options options = coreOptions.toConfiguration(); return Optional.of( - new PartitionMarkDoneTrigger( - new PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState( + new PartitionMarkDoneListener( + new PartitionMarkDoneListener.PartitionMarkDoneTriggerState( isRestored, stateStore), new PartitionTimeExtractor( coreOptions.partitionTimestampPattern(), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java index 4e69c55bdf37..0f3991a8ae1e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionCollectorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionListenersTest.java @@ -54,7 +54,7 @@ import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.assertj.core.api.Assertions.assertThat; -class PartitionCollectorTest extends TableTestBase { +class PartitionListenersTest extends TableTestBase { @Test public void testTriggerByCompaction() throws Exception { @@ -85,8 +85,8 @@ private void innerTest(boolean deletionVectors) throws Exception { FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); Path location = catalog.getTableLocation(identifier); Path successFile = new Path(location, "a=0/_SUCCESS"); - PartitionCollector markDone = - PartitionCollector.create(false, false, new MockOperatorStateStore(), table); + PartitionListeners markDone = + PartitionListeners.create(false, false, new MockOperatorStateStore(), table); notifyCommits(markDone, true); assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors); @@ -97,7 +97,7 @@ private void innerTest(boolean deletionVectors) throws Exception { } } - private void notifyCommits(PartitionCollector markDone, boolean isCompact) { + private void notifyCommits(PartitionListeners markDone, boolean isCompact) { ManifestCommittable committable = new ManifestCommittable(Long.MAX_VALUE); DataFileMeta file = DataFileTestUtils.newFile(); CommitMessageImpl compactMessage; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java index 092a08e6fe30..bbf555dc38fc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java @@ -39,21 +39,21 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link PartitionMarkDoneTrigger}. */ +/** Test for {@link PartitionMarkDoneListener}. */ class PartitionMarkDoneTriggerTest { private static final Duration timeInterval = Duration.ofDays(1); private static final Duration idleTime = Duration.ofMinutes(15); private List pendingPartitions; - private PartitionMarkDoneTrigger.State state; + private PartitionMarkDoneListener.State state; private PartitionTimeExtractor extractor; @BeforeEach public void before() throws Exception { this.pendingPartitions = new ArrayList<>(); this.state = - new PartitionMarkDoneTrigger.State() { + new PartitionMarkDoneListener.State() { @Override public List restore() { return new ArrayList<>(pendingPartitions); @@ -70,8 +70,8 @@ public void update(List partitions) { @Test public void testWithoutEndInput() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval, @@ -124,7 +124,7 @@ public void testWithoutEndInput() throws Exception { // test restore pendingPartitions.add("dt=2024-02-04"); trigger = - new PartitionMarkDoneTrigger( + new PartitionMarkDoneListener( state, extractor, timeInterval, @@ -148,8 +148,8 @@ public void testWithoutEndInput() throws Exception { @Test public void testWithEndInput() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval, @@ -172,8 +172,8 @@ public void testWithEndInput() throws Exception { @Test public void testParseNonDateFormattedPartition() throws Exception { - PartitionMarkDoneTrigger trigger = - new PartitionMarkDoneTrigger( + PartitionMarkDoneListener trigger = + new PartitionMarkDoneListener( state, extractor, timeInterval,