Skip to content

Commit

Permalink
[core] Generate changelog by copying data when records are insert-only
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jun 21, 2024
1 parent c46a256 commit ea06094
Show file tree
Hide file tree
Showing 39 changed files with 767 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ public boolean rename(Path src, Path dst) throws IOException {
}
}

@Override
public boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOException {
if (Files.exists(toPath(targetPath))) {
return false;
}
Files.copy(toPath(sourcePath), toPath(targetPath), StandardCopyOption.COPY_ATTRIBUTES);
return true;
}

private java.nio.file.Path toPath(Path path) {
return toFile(path).toPath();
}

/**
* Converts the given Path to a File for this file system. If the path is empty, we will return
* <tt>new File(".")</tt> instead of <tt>new File("")</tt>, since the latter returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RecordAttributeManager;

import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -87,12 +88,14 @@ public RawFileSplitRead newRead() {

@Override
public AppendOnlyFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, null);
}

@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
String commitUser,
ManifestCacheFilter manifestFilter,
RecordAttributeManager recordAttributeManager) {
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
Expand Down
6 changes: 5 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -76,7 +77,10 @@ public interface FileStore<T> extends Serializable {

FileStoreWrite<T> newWrite(String commitUser);

FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter manifestFilter);
FileStoreWrite<T> newWrite(
String commitUser,
ManifestCacheFilter manifestFilter,
RecordAttributeManager recordAttributeManager);

FileStoreCommit newCommit(String commitUser);

Expand Down
13 changes: 10 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.apache.paimon.utils.ValueEqualiserSupplier;

import javax.annotation.Nullable;

import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -152,11 +155,14 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {

@Override
public KeyValueFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, null);
}

@Override
public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
public KeyValueFileStoreWrite newWrite(
String commitUser,
ManifestCacheFilter manifestFilter,
@Nullable RecordAttributeManager recordAttributeManager) {
IndexMaintainer.Factory<KeyValue> indexFactory = null;
if (bucketMode() == BucketMode.HASH_DYNAMIC) {
indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
Expand Down Expand Up @@ -185,7 +191,8 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
deletionVectorsMaintainerFactory,
options,
keyValueFieldsExtractor,
tableName);
tableName,
recordAttributeManager);
}

private Map<String, FileStorePathFactory> format2PathFactory() {
Expand Down
20 changes: 20 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,26 @@ public DataFileMeta upgrade(int newLevel) {
fileSource);
}

public DataFileMeta rename(String newFileName) {
return new DataFileMeta(
newFileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(fileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -123,6 +124,21 @@ public void deleteFile(String filename, int level) {
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename));
}

public void copyFile(String sourceFileName, String targetFileName, int level)
throws IOException {
Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName);
Path targetPath = formatContext.pathFactory(level).toPath(targetFileName);
fileIO.copyFileUtf8(sourcePath, targetPath);
}

public FileIO getFileIO() {
return fileIO;
}

public Path newChangelogPath(int level) {
return formatContext.pathFactory(level).newChangelogPath();
}

public static Builder builder(
FileIO fileIO,
long schemaId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
Expand All @@ -37,6 +39,7 @@
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BucketRecordAttributeManager;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.RecordWriter;
Expand Down Expand Up @@ -78,6 +81,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private final LinkedHashMap<String, DataFileMeta> compactBefore;
private final LinkedHashSet<DataFileMeta> compactAfter;
private final LinkedHashSet<DataFileMeta> compactChangelog;
@Nullable private final BucketRecordAttributeManager recordAttributeManager;
private final boolean isCopyMethodOverridden;

@Nullable private CompactDeletionFile compactDeletionFile;

Expand All @@ -98,7 +103,8 @@ public MergeTreeWriter(
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment,
@Nullable FieldsComparator userDefinedSeqComparator) {
@Nullable FieldsComparator userDefinedSeqComparator,
@Nullable BucketRecordAttributeManager recordAttributeManager) {
this.writeBufferSpillable = writeBufferSpillable;
this.maxDiskSize = maxDiskSize;
this.sortMaxFan = sortMaxFan;
Expand All @@ -114,6 +120,7 @@ public MergeTreeWriter(
this.commitForceCompact = commitForceCompact;
this.changelogProducer = changelogProducer;
this.userDefinedSeqComparator = userDefinedSeqComparator;
this.recordAttributeManager = recordAttributeManager;

this.newFiles = new LinkedHashSet<>();
this.deletedFiles = new LinkedHashSet<>();
Expand All @@ -133,6 +140,20 @@ public MergeTreeWriter(
compactChangelog.addAll(increment.compactIncrement().changelogFiles());
updateCompactDeletionFile(increment.compactDeletionFile());
}

// TODO: Verify the performance of the default implementation of copy,
// and remove the following code block if proved to be of no regression.
Class<? extends FileIO> clazz = writerFactory.getFileIO().getClass();
boolean isCopyMethodOverridden;
try {
isCopyMethodOverridden =
clazz.getMethod("copyFileUtf8", Path.class, Path.class)
.getDeclaringClass()
.equals(clazz);
} catch (NoSuchMethodException e) {
isCopyMethodOverridden = false;
}
this.isCopyMethodOverridden = isCopyMethodOverridden;
}

private long newSequenceNumber() {
Expand Down Expand Up @@ -213,7 +234,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
}

final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
changelogProducer == ChangelogProducer.INPUT
changelogProducer == ChangelogProducer.INPUT & !canChangelogOptimizedToCopy()
? writerFactory.createRollingChangelogFileWriter(0)
: null;
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
Expand All @@ -232,22 +253,53 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
dataWriter.close();
}

List<DataFileMeta> fileMetas = dataWriter.result();
if (changelogWriter != null) {
newFilesChangelog.addAll(changelogWriter.result());
} else if (canChangelogOptimizedToCopy()) {
List<DataFileMeta> changelogMetas = getChangelogFileMetaFromDataFile(fileMetas);
for (int i = 0; i < changelogMetas.size(); i++) {
writerFactory.copyFile(
fileMetas.get(i).fileName(), changelogMetas.get(i).fileName(), 0);
}
newFilesChangelog.addAll(changelogMetas);
}

for (DataFileMeta fileMeta : dataWriter.result()) {
for (DataFileMeta fileMeta : fileMetas) {
newFiles.add(fileMeta);
compactManager.addNewFile(fileMeta);
}

writeBuffer.clear();
if (recordAttributeManager != null) {
recordAttributeManager.onFlush();
}
}

trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
}

private List<DataFileMeta> getChangelogFileMetaFromDataFile(
List<DataFileMeta> dataFileMetaList) {
List<DataFileMeta> changelogFileMetaList = new ArrayList<>();
for (DataFileMeta dataFileMeta : dataFileMetaList) {
DataFileMeta changelogFileMeta =
dataFileMeta.rename(writerFactory.newChangelogPath(0).getName());
changelogFileMetaList.add(changelogFileMeta);
}
return changelogFileMetaList;
}

private boolean canChangelogOptimizedToCopy() {
boolean canChangelogOptimizedToCopy = isCopyMethodOverridden;
canChangelogOptimizedToCopy &= changelogProducer == ChangelogProducer.INPUT;
canChangelogOptimizedToCopy &=
recordAttributeManager != null
&& recordAttributeManager.areAllRecordsInsertOnlySinceLastFlush();
return canChangelogOptimizedToCopy;
}

@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
flushWriteBuffer(waitCompaction, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;

Expand Down Expand Up @@ -101,6 +102,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private final RowType keyType;
private final RowType valueType;
@Nullable private final RecordLevelExpire recordLevelExpire;
@Nullable private final RecordAttributeManager recordAttributeManager;

public KeyValueFileStoreWrite(
FileIO fileIO,
Expand All @@ -121,7 +123,8 @@ public KeyValueFileStoreWrite(
@Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory,
CoreOptions options,
KeyValueFieldsExtractor extractor,
String tableName) {
String tableName,
@Nullable RecordAttributeManager recordAttributeManager) {
super(
commitUser,
snapshotManager,
Expand All @@ -134,6 +137,7 @@ public KeyValueFileStoreWrite(
this.keyType = keyType;
this.valueType = valueType;
this.udsComparatorSupplier = udsComparatorSupplier;
this.recordAttributeManager = recordAttributeManager;
this.readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
fileIO,
Expand Down Expand Up @@ -210,7 +214,11 @@ protected MergeTreeWriter createWriter(
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement,
UserDefinedSeqComparator.create(valueType, options));
UserDefinedSeqComparator.create(valueType, options),
recordAttributeManager == null
? null
: recordAttributeManager.getBucketRecordAttributeManager(
partition, bucket));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -128,9 +129,12 @@ public FileStoreWrite<T> newWrite(String commitUser) {
}

@Override
public FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
public FileStoreWrite<T> newWrite(
String commitUser,
ManifestCacheFilter manifestFilter,
RecordAttributeManager recordAttributeManager) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newWrite(commitUser, manifestFilter);
return wrapped.newWrite(commitUser, manifestFilter, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -265,9 +266,12 @@ public TableWriteImpl<?> newWrite(String commitUser) {
}

@Override
public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
public TableWriteImpl<?> newWrite(
String commitUser,
ManifestCacheFilter manifestFilter,
RecordAttributeManager recordAttributeManager) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newWrite(commitUser, manifestFilter);
return wrapped.newWrite(commitUser, manifestFilter, null);
}

@Override
Expand Down
Loading

0 comments on commit ea06094

Please sign in to comment.