Skip to content

Commit

Permalink
[core] rename NewFilesIncrement to DataIncrement (#3120)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored Mar 29, 2024
1 parent f477c31 commit 0d91a85
Show file tree
Hide file tree
Showing 16 changed files with 66 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
Expand Down Expand Up @@ -68,7 +68,7 @@ public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws Exception
partition,
0, // bucket 0 is bucket for unaware-bucket table for compatibility with the old
// design
NewFilesIncrement.emptyIncrement(),
DataIncrement.emptyIncrement(),
compactIncrement);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
Expand Down Expand Up @@ -235,8 +235,8 @@ private void trySyncLatestCompaction(boolean blocking)
}

private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(
DataIncrement dataIncrement =
new DataIncrement(
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
Collections.emptyList());
Expand All @@ -251,7 +251,7 @@ private CommitIncrement drainIncrement() {
compactBefore.clear();
compactAfter.clear();

return new CommitIncrement(newFilesIncrement, compactIncrement);
return new CommitIncrement(dataIncrement, compactIncrement);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public int level() {
* <li>Paimon 0.2
* <ul>
* <li>Stores changelog files for {@link CoreOptions.ChangelogProducer#INPUT}. Changelog
* files are moved to {@link NewFilesIncrement} since Paimon 0.3.
* files are moved to {@link DataIncrement} since Paimon 0.3.
* </ul>
* </ul>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import java.util.stream.Collectors;

/** Newly created data files and changelog files. */
public class NewFilesIncrement {
public class DataIncrement {

private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> changelogFiles;

public NewFilesIncrement(
public DataIncrement(
List<DataFileMeta> newFiles,
List<DataFileMeta> deletedFiles,
List<DataFileMeta> changelogFiles) {
Expand All @@ -39,8 +39,8 @@ public NewFilesIncrement(
this.changelogFiles = changelogFiles;
}

public static NewFilesIncrement emptyIncrement() {
return new NewFilesIncrement(
public static DataIncrement emptyIncrement() {
return new DataIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
}

Expand Down Expand Up @@ -69,7 +69,7 @@ public boolean equals(Object o) {
return false;
}

NewFilesIncrement that = (NewFilesIncrement) o;
DataIncrement that = (DataIncrement) o;
return Objects.equals(newFiles, that.newFiles)
&& Objects.equals(changelogFiles, that.changelogFiles);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
Expand Down Expand Up @@ -254,8 +254,8 @@ public void sync() throws Exception {
}

private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(
DataIncrement dataIncrement =
new DataIncrement(
new ArrayList<>(newFiles),
new ArrayList<>(deletedFiles),
new ArrayList<>(newFilesChangelog));
Expand All @@ -272,7 +272,7 @@ private CommitIncrement drainIncrement() {
compactAfter.clear();
compactChangelog.clear();

return new CommitIncrement(newFilesIncrement, compactIncrement);
return new CommitIncrement(dataIncrement, compactIncrement);
}

private void updateCompactResult(CompactResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
Expand Down Expand Up @@ -86,8 +86,7 @@ public static CommitMessage commitFile(BinaryRow partition, List<DataFileMeta> d
return new CommitMessageImpl(
partition,
0,
new NewFilesIncrement(
dataFileMetas, Collections.emptyList(), Collections.emptyList()),
new DataIncrement(dataFileMetas, Collections.emptyList(), Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;

import java.io.IOException;
import java.io.ObjectInputStream;
Expand All @@ -45,33 +45,33 @@ public class CommitMessageImpl implements CommitMessage {

private transient BinaryRow partition;
private transient int bucket;
private transient NewFilesIncrement newFilesIncrement;
private transient DataIncrement dataIncrement;
private transient CompactIncrement compactIncrement;
private transient IndexIncrement indexIncrement;

@VisibleForTesting
public CommitMessageImpl(
BinaryRow partition,
int bucket,
NewFilesIncrement newFilesIncrement,
DataIncrement dataIncrement,
CompactIncrement compactIncrement) {
this(
partition,
bucket,
newFilesIncrement,
dataIncrement,
compactIncrement,
new IndexIncrement(Collections.emptyList()));
}

public CommitMessageImpl(
BinaryRow partition,
int bucket,
NewFilesIncrement newFilesIncrement,
DataIncrement dataIncrement,
CompactIncrement compactIncrement,
IndexIncrement indexIncrement) {
this.partition = partition;
this.bucket = bucket;
this.newFilesIncrement = newFilesIncrement;
this.dataIncrement = dataIncrement;
this.compactIncrement = compactIncrement;
this.indexIncrement = indexIncrement;
}
Expand All @@ -86,8 +86,8 @@ public int bucket() {
return bucket;
}

public NewFilesIncrement newFilesIncrement() {
return newFilesIncrement;
public DataIncrement newFilesIncrement() {
return dataIncrement;
}

public CompactIncrement compactIncrement() {
Expand All @@ -99,9 +99,7 @@ public IndexIncrement indexIncrement() {
}

public boolean isEmpty() {
return newFilesIncrement.isEmpty()
&& compactIncrement.isEmpty()
&& indexIncrement.isEmpty();
return dataIncrement.isEmpty() && compactIncrement.isEmpty() && indexIncrement.isEmpty();
}

private void writeObject(ObjectOutputStream out) throws IOException {
Expand All @@ -118,7 +116,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
CommitMessageImpl message = (CommitMessageImpl) CACHE.get().deserialize(version, bytes);
this.partition = message.partition;
this.bucket = message.bucket;
this.newFilesIncrement = message.newFilesIncrement;
this.dataIncrement = message.dataIncrement;
this.compactIncrement = message.compactIncrement;
this.indexIncrement = message.indexIncrement;
}
Expand All @@ -135,14 +133,14 @@ public boolean equals(Object o) {
CommitMessageImpl that = (CommitMessageImpl) o;
return bucket == that.bucket
&& Objects.equals(partition, that.partition)
&& Objects.equals(newFilesIncrement, that.newFilesIncrement)
&& Objects.equals(dataIncrement, that.dataIncrement)
&& Objects.equals(compactIncrement, that.compactIncrement)
&& Objects.equals(indexIncrement, that.indexIncrement);
}

@Override
public int hashCode() {
return Objects.hash(partition, bucket, newFilesIncrement, compactIncrement, indexIncrement);
return Objects.hash(partition, bucket, dataIncrement, compactIncrement, indexIncrement);
}

@Override
Expand All @@ -154,6 +152,6 @@ public String toString() {
+ "newFilesIncrement = %s, "
+ "compactIncrement = %s, "
+ "indexIncrement = %s}",
partition, bucket, newFilesIncrement, compactIncrement, indexIncrement);
partition, bucket, dataIncrement, compactIncrement, indexIncrement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -116,7 +116,7 @@ private CommitMessage deserialize(DataInputView view) throws IOException {
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
new NewFilesIncrement(
new DataIncrement(
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@
package org.apache.paimon.utils;

import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;

/** Changes to commit. */
public class CommitIncrement {

private final NewFilesIncrement newFilesIncrement;
private final DataIncrement dataIncrement;
private final CompactIncrement compactIncrement;

public CommitIncrement(NewFilesIncrement newFilesIncrement, CompactIncrement compactIncrement) {
this.newFilesIncrement = newFilesIncrement;
public CommitIncrement(DataIncrement dataIncrement, CompactIncrement compactIncrement) {
this.dataIncrement = dataIncrement;
this.compactIncrement = compactIncrement;
}

public NewFilesIncrement newFilesIncrement() {
return newFilesIncrement;
public DataIncrement newFilesIncrement() {
return dataIncrement;
}

public CompactIncrement compactIncrement() {
Expand All @@ -42,6 +42,6 @@ public CompactIncrement compactIncrement() {

@Override
public String toString() {
return newFilesIncrement.toString() + "\n" + compactIncrement;
return dataIncrement.toString() + "\n" + compactIncrement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;
Expand Down Expand Up @@ -111,7 +111,7 @@ private CommitMessage createCommitMessage(BinaryRow partition, int bucket, Index
return new CommitMessageImpl(
partition,
bucket,
new NewFilesIncrement(
new DataIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;

Expand Down Expand Up @@ -73,10 +73,10 @@ private static void addFileCommittables(
List<CommitMessage> commitMessages = new ArrayList<>();
int length = ThreadLocalRandom.current().nextInt(10) + 1;
for (int i = 0; i < length; i++) {
NewFilesIncrement newFilesIncrement = randomNewFilesIncrement();
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
new CommitMessageImpl(partition, bucket, newFilesIncrement, compactIncrement);
new CommitMessageImpl(partition, bucket, dataIncrement, compactIncrement);
commitMessages.add(commitMessage);
committable.addFileCommittable(commitMessage);
}
Expand All @@ -88,8 +88,8 @@ private static void addFileCommittables(
}
}

public static NewFilesIncrement randomNewFilesIncrement() {
return new NewFilesIncrement(
public static DataIncrement randomNewFilesIncrement() {
return new DataIncrement(
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.paimon.table.sink;

import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;

import org.junit.jupiter.api.Test;

Expand All @@ -39,13 +39,12 @@ public class CommitMessageSerializerTest {
@Test
public void test() throws IOException {
CommitMessageSerializer serializer = new CommitMessageSerializer();
NewFilesIncrement newFilesIncrement = randomNewFilesIncrement();
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
IndexIncrement indexIncrement =
new IndexIncrement(Arrays.asList(randomIndexFile(), randomIndexFile()));
CommitMessageImpl committable =
new CommitMessageImpl(
row(0), 1, newFilesIncrement, compactIncrement, indexIncrement);
new CommitMessageImpl(row(0), 1, dataIncrement, compactIncrement, indexIncrement);
CommitMessageImpl newCommittable =
(CommitMessageImpl) serializer.deserialize(2, serializer.serialize(committable));
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
Expand Down
Loading

0 comments on commit 0d91a85

Please sign in to comment.