Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Oct 29, 2024
1 parent 91d225b commit edcd647
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.io.IOException;
import java.util.List;

import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;

/** Table partition mark done action for Flink. */
public class MarkPartitionDoneAction extends TableActionBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.io.IOException;
import java.util.List;

import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger.markDone;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.sink.partition.PartitionCollector;
import org.apache.paimon.flink.sink.partition.PartitionListeners;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -43,7 +43,7 @@ public class StoreCommitter implements Committer<Committable, ManifestCommittabl

private final TableCommitImpl commit;
@Nullable private final CommitterMetrics committerMetrics;
private final PartitionCollector partitionCollector;
private final PartitionListeners partitionListeners;

public StoreCommitter(FileStoreTable table, TableCommit commit, Context context) {
this.commit = (TableCommitImpl) commit;
Expand All @@ -56,8 +56,8 @@ public StoreCommitter(FileStoreTable table, TableCommit commit, Context context)
}

try {
this.partitionCollector =
PartitionCollector.create(
this.partitionListeners =
PartitionListeners.create(
context.streamingCheckpointEnabled(),
context.isRestored(),
context.stateStore(),
Expand Down Expand Up @@ -111,21 +111,21 @@ public void commit(List<ManifestCommittable> committables)
throws IOException, InterruptedException {
commit.commitMultiple(committables, false);
calcNumBytesAndRecordsOut(committables);
partitionCollector.notifyCommittable(committables);
partitionListeners.notifyCommittable(committables);
}

@Override
public int filterAndCommit(
List<ManifestCommittable> globalCommittables, boolean checkAppendFiles) {
int committed = commit.filterAndCommitMultiple(globalCommittables, checkAppendFiles);
partitionCollector.notifyCommittable(globalCommittables);
partitionListeners.notifyCommittable(globalCommittables);
return committed;
}

@Override
public Map<Long, List<Committable>> groupByCheckpoint(Collection<Committable> committables) {
try {
partitionCollector.snapshotState();
partitionListeners.snapshotState();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -140,7 +140,7 @@ public Map<Long, List<Committable>> groupByCheckpoint(Collection<Committable> co
@Override
public void close() throws Exception {
commit.close();
partitionCollector.close();
partitionListeners.close();
}

private void calcNumBytesAndRecordsOut(List<ManifestCommittable> committables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.io.Closeable;
import java.util.List;

/** The partition trigger. */
public interface PartitionTrigger extends Closeable {
/** The partition listener. */
public interface PartitionListener extends Closeable {

void notifyCommittable(List<ManifestCommittable> committables);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,42 @@
import java.util.List;

/** Partition collector. */
public class PartitionCollector implements Closeable {
public class PartitionListeners implements Closeable {

private final List<PartitionTrigger> triggers;
private final List<PartitionListener> listeners;

private PartitionCollector(List<PartitionTrigger> triggers) {
this.triggers = triggers;
private PartitionListeners(List<PartitionListener> listeners) {
this.listeners = listeners;
}

public void notifyCommittable(List<ManifestCommittable> committables) {
for (PartitionTrigger trigger : triggers) {
for (PartitionListener trigger : listeners) {
trigger.notifyCommittable(committables);
}
}

public void snapshotState() throws Exception {
for (PartitionTrigger trigger : triggers) {
for (PartitionListener trigger : listeners) {
trigger.snapshotState();
}
}

@Override
public void close() throws IOException {
IOUtils.closeAllQuietly(triggers);
IOUtils.closeAllQuietly(listeners);
}

public static PartitionCollector create(
public static PartitionListeners create(
boolean isStreaming,
boolean isRestored,
OperatorStateStore stateStore,
FileStoreTable table)
throws Exception {
List<PartitionTrigger> triggers = new ArrayList<>();
PartitionMarkDoneTrigger.create(
List<PartitionListener> listeners = new ArrayList<>();
PartitionMarkDoneListener.create(
table.coreOptions(), isStreaming, isRestored, stateStore, table)
.ifPresent(triggers::add);
.ifPresent(listeners::add);

return new PartitionCollector(triggers);
return new PartitionListeners(listeners);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;

/** Trigger to mark partitions done with streaming job. */
public class PartitionMarkDoneTrigger implements PartitionTrigger {
public class PartitionMarkDoneListener implements PartitionListener {

private static final ListStateDescriptor<List<String>> PENDING_PARTITIONS_STATE_DESC =
new ListStateDescriptor<>(
Expand All @@ -82,7 +82,7 @@ public class PartitionMarkDoneTrigger implements PartitionTrigger {
private final boolean waitCompaction;
private final InternalRowPartitionComputer partitionComputer;

public PartitionMarkDoneTrigger(
public PartitionMarkDoneListener(
State state,
PartitionTimeExtractor timeExtractor,
@Nullable Duration timeInterval,
Expand All @@ -104,7 +104,7 @@ public PartitionMarkDoneTrigger(
partitionComputer);
}

public PartitionMarkDoneTrigger(
public PartitionMarkDoneListener(
State state,
PartitionTimeExtractor timeExtractor,
@Nullable Duration timeInterval,
Expand Down Expand Up @@ -282,7 +282,7 @@ private static boolean disablePartitionMarkDone(
return table.partitionKeys().isEmpty();
}

public static Optional<PartitionTrigger> create(
public static Optional<PartitionListener> create(
CoreOptions coreOptions,
boolean isStreaming,
boolean isRestored,
Expand Down Expand Up @@ -313,8 +313,8 @@ public static Optional<PartitionTrigger> create(

Options options = coreOptions.toConfiguration();
return Optional.of(
new PartitionMarkDoneTrigger(
new PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState(
new PartitionMarkDoneListener(
new PartitionMarkDoneListener.PartitionMarkDoneTriggerState(
isRestored, stateStore),
new PartitionTimeExtractor(
coreOptions.partitionTimestampPattern(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.assertj.core.api.Assertions.assertThat;

class PartitionCollectorTest extends TableTestBase {
class PartitionListenersTest extends TableTestBase {

@Test
public void testTriggerByCompaction() throws Exception {
Expand Down Expand Up @@ -85,8 +85,8 @@ private void innerTest(boolean deletionVectors) throws Exception {
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
Path location = catalog.getTableLocation(identifier);
Path successFile = new Path(location, "a=0/_SUCCESS");
PartitionCollector markDone =
PartitionCollector.create(false, false, new MockOperatorStateStore(), table);
PartitionListeners markDone =
PartitionListeners.create(false, false, new MockOperatorStateStore(), table);

notifyCommits(markDone, true);
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
Expand All @@ -97,7 +97,7 @@ private void innerTest(boolean deletionVectors) throws Exception {
}
}

private void notifyCommits(PartitionCollector markDone, boolean isCompact) {
private void notifyCommits(PartitionListeners markDone, boolean isCompact) {
ManifestCommittable committable = new ManifestCommittable(Long.MAX_VALUE);
DataFileMeta file = DataFileTestUtils.newFile();
CommitMessageImpl compactMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link PartitionMarkDoneTrigger}. */
/** Test for {@link PartitionMarkDoneListener}. */
class PartitionMarkDoneTriggerTest {

private static final Duration timeInterval = Duration.ofDays(1);
private static final Duration idleTime = Duration.ofMinutes(15);

private List<String> pendingPartitions;
private PartitionMarkDoneTrigger.State state;
private PartitionMarkDoneListener.State state;
private PartitionTimeExtractor extractor;

@BeforeEach
public void before() throws Exception {
this.pendingPartitions = new ArrayList<>();
this.state =
new PartitionMarkDoneTrigger.State() {
new PartitionMarkDoneListener.State() {
@Override
public List<String> restore() {
return new ArrayList<>(pendingPartitions);
Expand All @@ -70,8 +70,8 @@ public void update(List<String> partitions) {

@Test
public void testWithoutEndInput() throws Exception {
PartitionMarkDoneTrigger trigger =
new PartitionMarkDoneTrigger(
PartitionMarkDoneListener trigger =
new PartitionMarkDoneListener(
state,
extractor,
timeInterval,
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testWithoutEndInput() throws Exception {
// test restore
pendingPartitions.add("dt=2024-02-04");
trigger =
new PartitionMarkDoneTrigger(
new PartitionMarkDoneListener(
state,
extractor,
timeInterval,
Expand All @@ -148,8 +148,8 @@ public void testWithoutEndInput() throws Exception {

@Test
public void testWithEndInput() throws Exception {
PartitionMarkDoneTrigger trigger =
new PartitionMarkDoneTrigger(
PartitionMarkDoneListener trigger =
new PartitionMarkDoneListener(
state,
extractor,
timeInterval,
Expand All @@ -172,8 +172,8 @@ public void testWithEndInput() throws Exception {

@Test
public void testParseNonDateFormattedPartition() throws Exception {
PartitionMarkDoneTrigger trigger =
new PartitionMarkDoneTrigger(
PartitionMarkDoneListener trigger =
new PartitionMarkDoneListener(
state,
extractor,
timeInterval,
Expand Down

0 comments on commit edcd647

Please sign in to comment.