Skip to content

Commit

Permalink
[core] cancel manifest format factory cache for while .
Browse files Browse the repository at this point in the history
  • Loading branch information
ranxianglei authored and ranxianglei.rxl committed Nov 18, 2024
1 parent 710af06 commit 669dc30
Showing 1 changed file with 2 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsCollector;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.ObjectCacheManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.SingleFileWriter;
Expand All @@ -41,9 +40,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;

/**
* This file includes several {@link ManifestEntry}s, representing the additional changes since last
Expand Down Expand Up @@ -204,57 +201,16 @@ public ManifestFile create() {
return create(null);
}

private static class FormatReaderFactoryKey {
private final RowType entryType;
private final List<Predicate> filters;
private final String formatIdentifier;

public FormatReaderFactoryKey(
String formatIdentifier, RowType entryType, List<Predicate> filters) {
this.entryType = entryType;
this.filters = filters;
this.formatIdentifier = formatIdentifier;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
FormatReaderFactoryKey that = (FormatReaderFactoryKey) o;
return Objects.equals(entryType, that.entryType)
&& Objects.equals(filters, that.filters)
&& Objects.equals(formatIdentifier, that.formatIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(entryType, filters, formatIdentifier);
}
}

private static final ObjectCacheManager<FormatReaderFactoryKey, FormatReaderFactory>
readers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);
private static final ObjectCacheManager<FormatReaderFactoryKey, FormatWriterFactory>
writers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000);

public ManifestFile create(List<Predicate> filters) {
String formatIdentifier = this.fileFormat.getFormatIdentifier();
RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
FormatReaderFactoryKey formatReaderFactoryKey =
new FormatReaderFactoryKey(formatIdentifier, entryType, filters);
return new ManifestFile(
fileIO,
schemaManager,
partitionType,
ManifestEntrySerializer.getInstance(),
entryType,
readers.get(
formatReaderFactoryKey,
(ignore) -> fileFormat.createReaderFactory(entryType, filters)),
writers.get(
formatReaderFactoryKey,
(ignore) -> fileFormat.createWriterFactory(entryType)),
fileFormat.createReaderFactory(entryType, filters),
fileFormat.createWriterFactory(entryType),
compression,
pathFactory.manifestFileFactory(),
suggestedFileSize,
Expand Down

0 comments on commit 669dc30

Please sign in to comment.