Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] rename NewFilesIncrement to DataIncrement #3120

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
Expand Down Expand Up @@ -68,7 +68,7 @@ public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws Exception
partition,
0, // bucket 0 is bucket for unaware-bucket table for compatibility with the old
// design
NewFilesIncrement.emptyIncrement(),
DataIncrement.emptyIncrement(),
compactIncrement);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
Expand Down Expand Up @@ -235,8 +235,8 @@ private void trySyncLatestCompaction(boolean blocking)
}

private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(
DataIncrement dataIncrement =
new DataIncrement(
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
Collections.emptyList());
Expand All @@ -251,7 +251,7 @@ private CommitIncrement drainIncrement() {
compactBefore.clear();
compactAfter.clear();

return new CommitIncrement(newFilesIncrement, compactIncrement);
return new CommitIncrement(dataIncrement, compactIncrement);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public int level() {
* <li>Paimon 0.2
* <ul>
* <li>Stores changelog files for {@link CoreOptions.ChangelogProducer#INPUT}. Changelog
* files are moved to {@link NewFilesIncrement} since Paimon 0.3.
* files are moved to {@link DataIncrement} since Paimon 0.3.
* </ul>
* </ul>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import java.util.stream.Collectors;

/** Newly created data files and changelog files. */
public class NewFilesIncrement {
public class DataIncrement {

private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> changelogFiles;

public NewFilesIncrement(
public DataIncrement(
List<DataFileMeta> newFiles,
List<DataFileMeta> deletedFiles,
List<DataFileMeta> changelogFiles) {
Expand All @@ -39,8 +39,8 @@ public NewFilesIncrement(
this.changelogFiles = changelogFiles;
}

public static NewFilesIncrement emptyIncrement() {
return new NewFilesIncrement(
public static DataIncrement emptyIncrement() {
return new DataIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
}

Expand Down Expand Up @@ -69,7 +69,7 @@ public boolean equals(Object o) {
return false;
}

NewFilesIncrement that = (NewFilesIncrement) o;
DataIncrement that = (DataIncrement) o;
return Objects.equals(newFiles, that.newFiles)
&& Objects.equals(changelogFiles, that.changelogFiles);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
Expand Down Expand Up @@ -254,8 +254,8 @@ public void sync() throws Exception {
}

private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(
DataIncrement dataIncrement =
new DataIncrement(
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
new ArrayList<>(newFilesChangelog));
Expand All @@ -272,7 +272,7 @@ private CommitIncrement drainIncrement() {
compactAfter.clear();
compactChangelog.clear();

return new CommitIncrement(newFilesIncrement, compactIncrement);
return new CommitIncrement(dataIncrement, compactIncrement);
}

private void updateCompactResult(CompactResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
Expand Down Expand Up @@ -86,8 +86,7 @@ public static CommitMessage commitFile(BinaryRow partition, List<DataFileMeta> d
return new CommitMessageImpl(
partition,
0,
new NewFilesIncrement(
dataFileMetas, Collections.emptyList(), Collections.emptyList()),
new DataIncrement(dataFileMetas, Collections.emptyList(), Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;

import java.io.IOException;
import java.io.ObjectInputStream;
Expand All @@ -45,33 +45,33 @@ public class CommitMessageImpl implements CommitMessage {

private transient BinaryRow partition;
private transient int bucket;
private transient NewFilesIncrement newFilesIncrement;
private transient DataIncrement dataIncrement;
private transient CompactIncrement compactIncrement;
private transient IndexIncrement indexIncrement;

@VisibleForTesting
public CommitMessageImpl(
BinaryRow partition,
int bucket,
NewFilesIncrement newFilesIncrement,
DataIncrement dataIncrement,
CompactIncrement compactIncrement) {
this(
partition,
bucket,
newFilesIncrement,
dataIncrement,
compactIncrement,
new IndexIncrement(Collections.emptyList()));
}

public CommitMessageImpl(
BinaryRow partition,
int bucket,
NewFilesIncrement newFilesIncrement,
DataIncrement dataIncrement,
CompactIncrement compactIncrement,
IndexIncrement indexIncrement) {
this.partition = partition;
this.bucket = bucket;
this.newFilesIncrement = newFilesIncrement;
this.dataIncrement = dataIncrement;
this.compactIncrement = compactIncrement;
this.indexIncrement = indexIncrement;
}
Expand All @@ -86,8 +86,8 @@ public int bucket() {
return bucket;
}

public NewFilesIncrement newFilesIncrement() {
return newFilesIncrement;
public DataIncrement newFilesIncrement() {
return dataIncrement;
}

public CompactIncrement compactIncrement() {
Expand All @@ -99,9 +99,7 @@ public IndexIncrement indexIncrement() {
}

public boolean isEmpty() {
return newFilesIncrement.isEmpty()
&& compactIncrement.isEmpty()
&& indexIncrement.isEmpty();
return dataIncrement.isEmpty() && compactIncrement.isEmpty() && indexIncrement.isEmpty();
}

private void writeObject(ObjectOutputStream out) throws IOException {
Expand All @@ -118,7 +116,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
CommitMessageImpl message = (CommitMessageImpl) CACHE.get().deserialize(version, bytes);
this.partition = message.partition;
this.bucket = message.bucket;
this.newFilesIncrement = message.newFilesIncrement;
this.dataIncrement = message.dataIncrement;
this.compactIncrement = message.compactIncrement;
this.indexIncrement = message.indexIncrement;
}
Expand All @@ -135,14 +133,14 @@ public boolean equals(Object o) {
CommitMessageImpl that = (CommitMessageImpl) o;
return bucket == that.bucket
&& Objects.equals(partition, that.partition)
&& Objects.equals(newFilesIncrement, that.newFilesIncrement)
&& Objects.equals(dataIncrement, that.dataIncrement)
&& Objects.equals(compactIncrement, that.compactIncrement)
&& Objects.equals(indexIncrement, that.indexIncrement);
}

@Override
public int hashCode() {
return Objects.hash(partition, bucket, newFilesIncrement, compactIncrement, indexIncrement);
return Objects.hash(partition, bucket, dataIncrement, compactIncrement, indexIncrement);
}

@Override
Expand All @@ -154,6 +152,6 @@ public String toString() {
+ "newFilesIncrement = %s, "
+ "compactIncrement = %s, "
+ "indexIncrement = %s}",
partition, bucket, newFilesIncrement, compactIncrement, indexIncrement);
partition, bucket, dataIncrement, compactIncrement, indexIncrement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -116,7 +116,7 @@ private CommitMessage deserialize(DataInputView view) throws IOException {
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
new NewFilesIncrement(
new DataIncrement(
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@
package org.apache.paimon.utils;

import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;

/** Changes to commit. */
public class CommitIncrement {

private final NewFilesIncrement newFilesIncrement;
private final DataIncrement dataIncrement;
private final CompactIncrement compactIncrement;

public CommitIncrement(NewFilesIncrement newFilesIncrement, CompactIncrement compactIncrement) {
this.newFilesIncrement = newFilesIncrement;
public CommitIncrement(DataIncrement dataIncrement, CompactIncrement compactIncrement) {
this.dataIncrement = dataIncrement;
this.compactIncrement = compactIncrement;
}

public NewFilesIncrement newFilesIncrement() {
return newFilesIncrement;
public DataIncrement newFilesIncrement() {
return dataIncrement;
}

public CompactIncrement compactIncrement() {
Expand All @@ -42,6 +42,6 @@ public CompactIncrement compactIncrement() {

@Override
public String toString() {
return newFilesIncrement.toString() + "\n" + compactIncrement;
return dataIncrement.toString() + "\n" + compactIncrement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;
Expand Down Expand Up @@ -111,7 +111,7 @@ private CommitMessage createCommitMessage(BinaryRow partition, int bucket, Index
return new CommitMessageImpl(
partition,
bucket,
new NewFilesIncrement(
new DataIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;

Expand Down Expand Up @@ -73,10 +73,10 @@ private static void addFileCommittables(
List<CommitMessage> commitMessages = new ArrayList<>();
int length = ThreadLocalRandom.current().nextInt(10) + 1;
for (int i = 0; i < length; i++) {
NewFilesIncrement newFilesIncrement = randomNewFilesIncrement();
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(partition, bucket, newFilesIncrement, compactIncrement);
new CommitMessageImpl(partition, bucket, dataIncrement, compactIncrement);
commitMessages.add(commitMessage);
committable.addFileCommittable(commitMessage);
}
Expand All @@ -88,8 +88,8 @@ private static void addFileCommittables(
}
}

public static NewFilesIncrement randomNewFilesIncrement() {
return new NewFilesIncrement(
public static DataIncrement randomNewFilesIncrement() {
return new DataIncrement(
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.paimon.table.sink;

import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;

import org.junit.jupiter.api.Test;

Expand All @@ -39,13 +39,12 @@ public class CommitMessageSerializerTest {
@Test
public void test() throws IOException {
CommitMessageSerializer serializer = new CommitMessageSerializer();
NewFilesIncrement newFilesIncrement = randomNewFilesIncrement();
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
IndexIncrement indexIncrement =
new IndexIncrement(Arrays.asList(randomIndexFile(), randomIndexFile()));
CommitMessageImpl committable =
new CommitMessageImpl(
row(0), 1, newFilesIncrement, compactIncrement, indexIncrement);
new CommitMessageImpl(row(0), 1, dataIncrement, compactIncrement, indexIncrement);
CommitMessageImpl newCommittable =
(CommitMessageImpl) serializer.deserialize(2, serializer.serialize(committable));
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
Expand Down
Loading
Loading