diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 3686fa20c68a..6706d5c421a1 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -68,6 +68,12 @@ Long Controls the max number for which partitions in the catalog are cached. + +
cache.snapshot.max-num-per-table
+ 20 + Integer + Controls the max number for snapshots per table in the catalog are cached. +
client-pool-size
2 diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 6ad9f3350adf..f69af2d59910 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -123,6 +123,13 @@ public class CatalogOptions { .noDefaultValue() .withDescription("Controls the maximum memory to cache manifest content."); + public static final ConfigOption CACHE_SNAPSHOT_MAX_NUM_PER_TABLE = + key("cache.snapshot.max-num-per-table") + .intType() + .defaultValue(20) + .withDescription( + "Controls the max number for snapshots per table in the catalog are cached."); + public static final ConfigOption LINEAGE_META = key("lineage-meta") .stringType() diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 14665961a8a7..ae4552aa7150 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -54,6 +54,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.time.Duration; @@ -79,6 +81,7 @@ abstract class AbstractFileStore implements FileStore { @Nullable private final SegmentsCache writeManifestCache; @Nullable private SegmentsCache readManifestCache; + @Nullable private Cache snapshotCache; protected AbstractFileStore( FileIO fileIO, @@ -116,7 +119,7 @@ public FileStorePathFactory pathFactory() { @Override public SnapshotManager snapshotManager() { - return new SnapshotManager(fileIO, options.path(), options.branch()); + return new SnapshotManager(fileIO, options.path(), options.branch(), snapshotCache); } @Override @@ -340,4 +343,9 @@ public ServiceManager newServiceManager() { public void setManifestCache(SegmentsCache manifestCache) { this.readManifestCache = manifestCache; } + + @Override + public void setSnapshotCache(Cache cache) { + this.snapshotCache = cache; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index f9bf4c8440bd..e50d4ada1397 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -44,6 +44,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.util.List; @@ -107,4 +109,6 @@ public interface FileStore { List createTagCallbacks(); void setManifestCache(SegmentsCache manifestCache); + + void setSnapshotCache(Cache cache); } diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 3b8d2fa15b4b..baee7bad950e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -29,9 +29,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.FileNotFoundException; @@ -65,7 +62,6 @@ @Public @JsonIgnoreProperties(ignoreUnknown = true) public class Snapshot { - private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class); public static final long FIRST_SNAPSHOT_ID = 1; @@ -355,28 +351,6 @@ public String toJson() { return JsonSerdeUtil.toJson(this); } - public static Snapshot fromJson(String json) { - return JsonSerdeUtil.fromJson(json, Snapshot.class); - } - - public static Snapshot fromPath(FileIO fileIO, Path path) { - try { - return Snapshot.fromJson(fileIO.readFileUtf8(path)); - } catch (FileNotFoundException e) { - String errorMessage = - String.format( - "Snapshot file %s does not exist. " - + "It might have been expired by other jobs operating on this table. " - + "In this case, you can avoid concurrent modification issues by configuring " - + "write-only = true and use a dedicated compaction job, or configuring " - + "different expiration thresholds for different jobs.", - path); - throw new RuntimeException(errorMessage, e); - } catch (IOException e) { - throw new RuntimeException("Fails to read snapshot from path " + path, e); - } - } - @Override public int hashCode() { return Objects.hash( @@ -437,4 +411,36 @@ public enum CommitKind { /** Collect statistics. */ ANALYZE } + + // =================== Utils for reading ========================= + + public static Snapshot fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Snapshot.class); + } + + public static Snapshot fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + String errorMessage = + String.format( + "Snapshot file %s does not exist. " + + "It might have been expired by other jobs operating on this table. " + + "In this case, you can avoid concurrent modification issues by configuring " + + "write-only = true and use a dedicated compaction job, or configuring " + + "different expiration thresholds for different jobs.", + path); + throw new RuntimeException(errorMessage, e); + } + } + + public static Snapshot tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + return Snapshot.fromJson(fileIO.readFileUtf8(path)); + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new RuntimeException("Fails to read snapshot from path " + path, e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 003f0edb4fa5..1912ad60623c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -55,6 +55,7 @@ import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD; import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; +import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE; import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES; /** A {@link Catalog} to cache databases and tables and manifests. */ @@ -62,6 +63,9 @@ public class CachingCatalog extends DelegateCatalog { private static final Logger LOG = LoggerFactory.getLogger(CachingCatalog.class); + private final Duration expirationInterval; + private final int snapshotMaxNumPerTable; + protected final Cache databaseCache; protected final Cache tableCache; @Nullable protected final SegmentsCache manifestCache; @@ -75,7 +79,8 @@ public CachingCatalog(Catalog wrapped) { CACHE_EXPIRATION_INTERVAL_MS.defaultValue(), CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(), CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(), - CACHE_PARTITION_MAX_NUM.defaultValue()); + CACHE_PARTITION_MAX_NUM.defaultValue(), + CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue()); } public CachingCatalog( @@ -83,13 +88,15 @@ public CachingCatalog( Duration expirationInterval, MemorySize manifestMaxMemory, long manifestCacheThreshold, - long cachedPartitionMaxNum) { + long cachedPartitionMaxNum, + int snapshotMaxNumPerTable) { this( wrapped, expirationInterval, manifestMaxMemory, manifestCacheThreshold, cachedPartitionMaxNum, + snapshotMaxNumPerTable, Ticker.systemTicker()); } @@ -99,6 +106,7 @@ public CachingCatalog( MemorySize manifestMaxMemory, long manifestCacheThreshold, long cachedPartitionMaxNum, + int snapshotMaxNumPerTable, Ticker ticker) { super(wrapped); if (expirationInterval.isZero() || expirationInterval.isNegative()) { @@ -106,6 +114,9 @@ public CachingCatalog( "When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled."); } + this.expirationInterval = expirationInterval; + this.snapshotMaxNumPerTable = snapshotMaxNumPerTable; + this.databaseCache = Caffeine.newBuilder() .softValues() @@ -121,6 +132,7 @@ public CachingCatalog( .expireAfterAccess(expirationInterval) .ticker(ticker) .build(); + this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); this.partitionCache = cachedPartitionMaxNum == 0 ? null @@ -134,7 +146,6 @@ public CachingCatalog( .maximumWeight(cachedPartitionMaxNum) .ticker(ticker) .build(); - this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); } public static Catalog tryToCreate(Catalog catalog, Options options) { @@ -155,7 +166,8 @@ public static Catalog tryToCreate(Catalog catalog, Options options) { options.get(CACHE_EXPIRATION_INTERVAL_MS), manifestMaxMemory, manifestThreshold, - options.get(CACHE_PARTITION_MAX_NUM)); + options.get(CACHE_PARTITION_MAX_NUM), + options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE)); } @Override @@ -244,9 +256,20 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } private void putTableCache(Identifier identifier, Table table) { - if (manifestCache != null && table instanceof FileStoreTable) { - ((FileStoreTable) table).setManifestCache(manifestCache); + if (table instanceof FileStoreTable) { + FileStoreTable storeTable = (FileStoreTable) table; + storeTable.setSnapshotCache( + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(expirationInterval) + .maximumSize(snapshotMaxNumPerTable) + .executor(Runnable::run) + .build()); + if (manifestCache != null) { + storeTable.setManifestCache(manifestCache); + } } + tableCache.put(identifier, table); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 5698908cb9b0..869100d9cfb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -105,7 +105,8 @@ protected List validBranches() { List abnormalBranches = new ArrayList<>(); for (String branch : branches) { - if (!new SchemaManager(table.fileIO(), table.location(), branch).latest().isPresent()) { + SchemaManager schemaManager = table.schemaManager().copyWithBranch(branch); + if (!schemaManager.latest().isPresent()) { abnormalBranches.add(branch); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index 59243a53569e..3ee0d5fa9b01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; @@ -47,6 +48,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.util.List; @@ -210,4 +213,9 @@ public List createTagCallbacks() { public void setManifestCache(SegmentsCache manifestCache) { wrapped.setManifestCache(manifestCache); } + + @Override + public void setSnapshotCache(Cache cache) { + wrapped.setSnapshotCache(cache); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a84348810b99..d827ffd0fb66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,7 +45,6 @@ import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; -import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StringUtils; @@ -769,11 +768,7 @@ boolean commit(TableSchema newSchema) throws Exception { /** Read schema for schema id. */ public TableSchema schema(long id) { - try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return TableSchema.fromPath(fileIO, toSchemaPath(id)); } /** Check if a schema exists. */ @@ -789,14 +784,6 @@ public boolean schemaExists(long id) { } } - public static TableSchema fromPath(FileIO fileIO, Path path) { - try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(path), TableSchema.class); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - private String branchPath() { return BranchManager.branchPath(tableRoot, branch); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index b5bdeccf10f6..a0a149d1ae9b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -29,8 +29,10 @@ import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -296,19 +298,6 @@ public TableSchema copy(Map newOptions) { timeMillis); } - public static TableSchema fromJson(String json) { - return JsonSerdeUtil.fromJson(json, TableSchema.class); - } - - public static TableSchema fromPath(FileIO fileIO, Path path) { - try { - String json = fileIO.readFileUtf8(path); - return TableSchema.fromJson(json); - } catch (IOException e) { - throw new RuntimeException("Fails to read schema from path " + path, e); - } - } - @Override public String toString() { return JsonSerdeUtil.toJson(this); @@ -341,4 +330,28 @@ public int hashCode() { public static List newFields(RowType rowType) { return rowType.getFields(); } + + // =================== Utils for reading ========================= + + public static TableSchema fromJson(String json) { + return JsonSerdeUtil.fromJson(json, TableSchema.class); + } + + public static TableSchema fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static TableSchema tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + return fromJson(fileIO.readFileUtf8(path)); + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 07c0e88645ac..4180ff11c167 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -68,6 +68,8 @@ import org.apache.paimon.utils.SnapshotNotExistException; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.io.IOException; @@ -123,6 +125,11 @@ public void setManifestCache(SegmentsCache manifestCache) { store().setManifestCache(manifestCache); } + @Override + public void setSnapshotCache(Cache cache) { + store().setSnapshotCache(cache); + } + @Override public OptionalLong latestSnapshotId() { Long snapshot = store().snapshotManager().latestSnapshotId(); @@ -340,7 +347,8 @@ public FileStoreTable copy(TableSchema newTableSchema) { : new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema, catalogEnvironment); } - protected SchemaManager schemaManager() { + @Override + public SchemaManager schemaManager() { return new SchemaManager(fileIO(), path, currentBranch()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java index e330db0e04a4..7979daccf756 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; @@ -39,6 +40,8 @@ public interface DataTable extends InnerTable { SnapshotManager snapshotManager(); + SchemaManager schemaManager(); + TagManager tagManager(); BranchManager branchManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 2b369e5005cc..624476b5b43e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -27,6 +27,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.query.LocalTableQuery; @@ -44,6 +45,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import java.time.Duration; import java.util.Objects; import java.util.Optional; @@ -92,6 +95,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); @@ -117,6 +125,11 @@ public void setManifestCache(SegmentsCache manifestCache) { wrapped.setManifestCache(manifestCache); } + @Override + public void setSnapshotCache(Cache cache) { + wrapped.setSnapshotCache(cache); + } + @Override public TableSchema schema() { return wrapped.schema(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index f1a60b9713f9..e3e290f06086 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -28,7 +28,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataFilePlan; import org.apache.paimon.table.source.DataSplit; @@ -103,7 +102,7 @@ public FileStoreTable switchToBranch(String branchName) { private FileStoreTable switchWrappedToBranch(String branchName) { Optional optionalSchema = - new SchemaManager(wrapped.fileIO(), wrapped.location(), branchName).latest(); + wrapped.schemaManager().copyWithBranch(branchName).latest(); Preconditions.checkArgument( optionalSchema.isPresent(), "Branch " + branchName + " does not exist"); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 01227dd35407..d37e57e4e57e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestCacheFilter; @@ -30,6 +31,8 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SegmentsCache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import java.util.List; import java.util.Map; import java.util.Optional; @@ -42,6 +45,8 @@ public interface FileStoreTable extends DataTable { void setManifestCache(SegmentsCache manifestCache); + void setSnapshotCache(Cache cache); + @Override default RowType rowType() { return schema().logicalRowType(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java index 10a046ca70b5..8c0eed4d6b8b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java @@ -18,7 +18,6 @@ package org.apache.paimon.table.system; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -27,7 +26,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -78,14 +76,12 @@ public class AggregationFieldsTable implements ReadonlyTable { private final FileIO fileIO; private final Path location; - private final String branch; private final FileStoreTable dataTable; public AggregationFieldsTable(FileStoreTable dataTable) { this.fileIO = dataTable.fileIO(); this.location = dataTable.location(); - this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; } @@ -192,8 +188,7 @@ public RecordReader createReader(Split split) { if (!(split instanceof AggregationSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - Path location = ((AggregationSplit) split).location; - TableSchema schemas = new SchemaManager(fileIO, location, branch).latest().get(); + TableSchema schemas = dataTable.schemaManager().latest().get(); Iterator rows = createInternalRowIterator(schemas); if (readType != null) { rows = diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index b0cbe0772b5e..1cb967f8d1e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -39,6 +39,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.PredicateReplaceVisitor; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -187,6 +188,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java index ff40c9502eb7..31cecbfb15c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java @@ -33,6 +33,7 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -145,6 +146,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index fc1bb2a5b167..522335aaa6c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -34,6 +34,7 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -131,6 +132,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 0232fc2d2dde..6dcbb322d6d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -143,8 +143,7 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new FilesRead( - new SchemaManager(storeTable.fileIO(), storeTable.location()), storeTable); + return new FilesRead(storeTable.schemaManager(), storeTable); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java index c7dec03343d0..ed20896646b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java @@ -18,7 +18,6 @@ package org.apache.paimon.table.system; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -27,7 +26,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; import org.apache.paimon.table.Table; @@ -44,7 +42,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -70,14 +67,12 @@ public class OptionsTable implements ReadonlyTable { private final FileIO fileIO; private final Path location; - private final String branch; private final FileStoreTable dataTable; public OptionsTable(FileStoreTable dataTable) { this.fileIO = dataTable.fileIO(); this.location = dataTable.location(); - this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; } @@ -178,14 +173,20 @@ public TableRead withIOManager(IOManager ioManager) { } @Override - public RecordReader createReader(Split split) throws IOException { + public RecordReader createReader(Split split) { if (!(split instanceof OptionsSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - Path location = ((OptionsSplit) split).location; Iterator rows = Iterators.transform( - options(fileIO, location, branch).entrySet().iterator(), this::toRow); + dataTable + .schemaManager() + .latest() + .orElseThrow(() -> new RuntimeException("Table not exists.")) + .options() + .entrySet() + .iterator(), + this::toRow); if (readType != null) { rows = Iterators.transform( @@ -203,11 +204,4 @@ private InternalRow toRow(Map.Entry option) { BinaryString.fromString(option.getValue())); } } - - private static Map options(FileIO fileIO, Path location, String branchName) { - return new SchemaManager(fileIO, location, branchName) - .latest() - .orElseThrow(() -> new RuntimeException("Table not exists.")) - .options(); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index deb149791c8f..5308005053c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.operation.DefaultValueAssigner; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -165,6 +166,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java index d0df75b34f51..3cb0ff4783e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java @@ -18,13 +18,11 @@ package org.apache.paimon.table.system; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.And; import org.apache.paimon.predicate.CompoundPredicate; @@ -61,8 +59,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import javax.annotation.Nullable; - import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -73,7 +69,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -98,16 +93,12 @@ public class SchemasTable implements ReadonlyTable { new DataField(5, "comment", SerializationUtils.newStringType(true)), new DataField(6, "update_time", new TimestampType(false, 3)))); - private final FileIO fileIO; private final Path location; - private final String branch; private final FileStoreTable dataTable; public SchemasTable(FileStoreTable dataTable) { - this.fileIO = dataTable.fileIO(); this.location = dataTable.location(); - this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; } @@ -133,7 +124,7 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new SchemasRead(fileIO); + return new SchemasRead(); } @Override @@ -141,24 +132,16 @@ public Table copy(Map dynamicOptions) { return new SchemasTable(dataTable.copy(dynamicOptions)); } - private class SchemasScan extends ReadOnceTableScan { - private @Nullable LeafPredicate schemaId; + private static class SchemasScan extends ReadOnceTableScan { @Override - public InnerTableScan withFilter(Predicate predicate) { - if (predicate == null) { - return this; - } - - Map leafPredicates = - predicate.visit(LeafPredicateExtractor.INSTANCE); - schemaId = leafPredicates.get("schema_id"); - return this; + public Plan innerPlan() { + return () -> Collections.singletonList(new SchemasSplit()); } @Override - public Plan innerPlan() { - return () -> Collections.singletonList(new SchemasSplit(location, schemaId)); + public InnerTableScan withFilter(Predicate predicate) { + return this; } } @@ -167,47 +150,29 @@ private static class SchemasSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final Path location; - - private final @Nullable LeafPredicate schemaId; - - private SchemasSplit(Path location, @Nullable LeafPredicate schemaId) { - this.location = location; - this.schemaId = schemaId; - } - + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { - return false; - } - SchemasSplit that = (SchemasSplit) o; - return Objects.equals(location, that.location) - && Objects.equals(schemaId, that.schemaId); + return o != null && getClass() == o.getClass(); } @Override public int hashCode() { - return Objects.hash(location, schemaId); + return 0; } } /** {@link TableRead} implementation for {@link SchemasTable}. */ private class SchemasRead implements InnerTableRead { - private final FileIO fileIO; private RowType readType; private Optional optionalFilterSchemaIdMax = Optional.empty(); private Optional optionalFilterSchemaIdMin = Optional.empty(); private final List schemaIds = new ArrayList<>(); - public SchemasRead(FileIO fileIO) { - this.fileIO = fileIO; - } - @Override public InnerTableRead withFilter(Predicate predicate) { if (predicate == null) { @@ -287,9 +252,7 @@ public RecordReader createReader(Split split) { if (!(split instanceof SchemasSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - SchemasSplit schemasSplit = (SchemasSplit) split; - Path location = schemasSplit.location; - SchemaManager manager = new SchemaManager(fileIO, location, branch); + SchemaManager manager = dataTable.schemaManager(); Collection tableSchemas; if (!schemaIds.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index 10e5b691acc3..2af13ee937bd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -18,7 +18,6 @@ package org.apache.paimon.table.system; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; @@ -111,14 +110,12 @@ public class SnapshotsTable implements ReadonlyTable { private final FileIO fileIO; private final Path location; - private final String branch; private final FileStoreTable dataTable; public SnapshotsTable(FileStoreTable dataTable) { this.fileIO = dataTable.fileIO(); this.location = dataTable.location(); - this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; } @@ -289,9 +286,8 @@ public RecordReader createReader(Split split) throws IOException { if (!(split instanceof SnapshotsSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - SnapshotManager snapshotManager = - new SnapshotManager(fileIO, ((SnapshotsSplit) split).location, branch); + SnapshotManager snapshotManager = dataTable.snapshotManager(); Iterator snapshots; if (!snapshotIds.isEmpty()) { snapshots = snapshotManager.snapshotsWithId(snapshotIds); diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java index f1ac879d33a7..53641a2eb69f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -33,6 +33,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.time.LocalDateTime; import java.util.Map; @@ -113,29 +114,6 @@ public String toJson() { return JsonSerdeUtil.toJson(this); } - public static Tag fromJson(String json) { - return JsonSerdeUtil.fromJson(json, Tag.class); - } - - public static Tag fromPath(FileIO fileIO, Path path) { - try { - String json = fileIO.readFileUtf8(path); - return Tag.fromJson(json); - } catch (IOException e) { - throw new RuntimeException("Fails to read tag from path " + path, e); - } - } - - @Nullable - public static Tag safelyFromPath(FileIO fileIO, Path path) throws IOException { - try { - String json = fileIO.readFileUtf8(path); - return Tag.fromJson(json); - } catch (FileNotFoundException e) { - return null; - } - } - public static Tag fromSnapshotAndTagTtl( Snapshot snapshot, Duration tagTimeRetained, LocalDateTime tagCreateTime) { return new Tag( @@ -201,4 +179,28 @@ public boolean equals(Object o) { return Objects.equals(tagCreateTime, that.tagCreateTime) && Objects.equals(tagTimeRetained, that.tagTimeRetained); } + + // =================== Utils for reading ========================= + + public static Tag fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Tag.class); + } + + public static Tag fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static Tag tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + return fromJson(fileIO.readFileUtf8(path)); + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index bc353bb10d16..2ea5f542f4e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.tag.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,7 +179,7 @@ public void fastForward(String branchName) { List deleteSchemaPaths = schemaManager.schemaPaths(id -> id >= earliestSchemaId); List deleteTagPaths = tagManager.tagPaths( - path -> Snapshot.fromPath(fileIO, path).id() >= earliestSnapshotId); + path -> Tag.fromPath(fileIO, path).id() >= earliestSnapshotId); List deletePaths = Stream.of(deleteSnapshotPaths, deleteSchemaPaths, deleteTagPaths) @@ -201,6 +202,7 @@ public void fastForward(String branchName) { tagManager.copyWithBranch(branchName).tagDirectory(), tagManager.tagDirectory(), true); + snapshotManager.invalidateCache(); } catch (IOException e) { throw new RuntimeException( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 5902d4c84cf5..9a120042eaaa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -24,6 +24,8 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,16 +76,26 @@ public class SnapshotManager implements Serializable { private final FileIO fileIO; private final Path tablePath; private final String branch; + @Nullable private final Cache cache; public SnapshotManager(FileIO fileIO, Path tablePath) { this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); } /** Specify the default branch for data writing. */ - public SnapshotManager(FileIO fileIO, Path tablePath, String branchName) { + public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String branchName) { + this(fileIO, tablePath, branchName, null); + } + + public SnapshotManager( + FileIO fileIO, + Path tablePath, + @Nullable String branchName, + @Nullable Cache cache) { this.fileIO = fileIO; this.tablePath = tablePath; this.branch = BranchManager.normalizeBranch(branchName); + this.cache = cache; } public SnapshotManager copyWithBranch(String branchName) { @@ -120,20 +132,34 @@ public Path snapshotDirectory() { return new Path(branchPath(tablePath, branch) + "/snapshot"); } + public void invalidateCache() { + if (cache != null) { + cache.invalidateAll(); + } + } + public Snapshot snapshot(long snapshotId) { - Path snapshotPath = snapshotPath(snapshotId); - return Snapshot.fromPath(fileIO, snapshotPath); + Path path = snapshotPath(snapshotId); + Snapshot snapshot = cache == null ? null : cache.getIfPresent(path); + if (snapshot == null) { + snapshot = Snapshot.fromPath(fileIO, path); + if (cache != null) { + cache.put(path, snapshot); + } + } + return snapshot; } public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { - try { - Path snapshotPath = snapshotPath(snapshotId); - return Snapshot.fromJson(fileIO.readFileUtf8(snapshotPath)); - } catch (FileNotFoundException fileNotFoundException) { - throw fileNotFoundException; - } catch (IOException ioException) { - throw new RuntimeException(ioException); + Path path = snapshotPath(snapshotId); + Snapshot snapshot = cache == null ? null : cache.getIfPresent(path); + if (snapshot == null) { + snapshot = Snapshot.tryFromPath(fileIO, path); + if (cache != null) { + cache.put(path, snapshot); + } } + return snapshot; } public Changelog changelog(long snapshotId) { @@ -486,11 +512,9 @@ public List safelyGetAllSnapshots() throws IOException { collectSnapshots( path -> { try { - snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path))); - } catch (IOException e) { - if (!(e instanceof FileNotFoundException)) { - throw new RuntimeException(e); - } + // do not pollution cache + snapshots.add(Snapshot.tryFromPath(fileIO, path)); + } catch (FileNotFoundException ignored) { } }, paths); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 65963aafdf6b..1e05a100d741 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -353,7 +353,7 @@ public SortedMap> tags(Predicate filter) { // If the tag file is not found, it might be deleted by // other processes, so just skip this tag try { - Snapshot snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); + Snapshot snapshot = Tag.tryFromPath(fileIO, path).trimToSnapshot(); tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); } catch (FileNotFoundException ignored) { } @@ -371,9 +371,9 @@ public List> tagObjects() { List> tags = new ArrayList<>(); for (Path path : paths) { String tagName = path.getName().substring(TAG_PREFIX.length()); - Tag tag = Tag.safelyFromPath(fileIO, path); - if (tag != null) { - tags.add(Pair.of(tag, tagName)); + try { + tags.add(Pair.of(Tag.tryFromPath(fileIO, path), tagName)); + } catch (FileNotFoundException ignored) { } } return tags; diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index 65ed5ce0b7bf..e4f0a1510b8d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; @@ -332,6 +333,27 @@ public static Identifier[] sysTables(Identifier tableIdent) { .toArray(Identifier[]::new); } + @Test + public void testSnapshotCache() throws Exception { + TestableCachingCatalog wrappedCatalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + Identifier tableIdent = new Identifier("db", "tbl"); + wrappedCatalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); + Table table = wrappedCatalog.getTable(tableIdent); + + // write + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(1, fromString("1"), fromString("1"))); + write.write(GenericRow.of(2, fromString("2"), fromString("2"))); + commit.commit(write.prepareCommit()); + } + + Snapshot snapshot = table.snapshot(1); + assertThat(snapshot).isSameAs(table.snapshot(1)); + } + @Test public void testManifestCache() throws Exception { innerTestManifestCache(Long.MAX_VALUE); @@ -346,7 +368,8 @@ private void innerTestManifestCache(long manifestCacheThreshold) throws Exceptio Duration.ofSeconds(10), MemorySize.ofMebiBytes(1), manifestCacheThreshold, - 0L); + 0L, + 10); Identifier tableIdent = new Identifier("db", "tbl"); catalog.dropTable(tableIdent, true); catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java index 4c70a0232c44..1d4a9b0e8a58 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java @@ -38,7 +38,14 @@ public class TestableCachingCatalog extends CachingCatalog { private final Duration cacheExpirationInterval; public TestableCachingCatalog(Catalog catalog, Duration expirationInterval, Ticker ticker) { - super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, Long.MAX_VALUE, ticker); + super( + catalog, + expirationInterval, + MemorySize.ZERO, + Long.MAX_VALUE, + Long.MAX_VALUE, + Integer.MAX_VALUE, + ticker); this.cacheExpirationInterval = expirationInterval; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java index 93854e766198..9ce3db0b1ada 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java @@ -33,7 +33,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema return new AppendOnlyFileStoreTable( FileIOFinder.find(tablePath), tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java index b4c16cef20a7..64d0c728d10b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java @@ -37,7 +37,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new AppendOnlyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java index f398d28cc524..300483a9f34b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java @@ -40,7 +40,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new AppendOnlyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java index f65546af75d8..85ed80299736 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java @@ -40,7 +40,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new AppendOnlyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 4d8408955d38..75e284a68c3a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1193,7 +1193,7 @@ public void testCreateBranch() throws Exception { SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath, "test-branch"); TableSchema branchSchema = - SchemaManager.fromPath(new TraceableFileIO(), schemaManager.toSchemaPath(0)); + TableSchema.fromPath(new TraceableFileIO(), schemaManager.toSchemaPath(0)); TableSchema schema0 = schemaManager.schema(0); assertThat(branchSchema.equals(schema0)).isTrue(); } @@ -1344,7 +1344,7 @@ public void testFastForward() throws Exception { // verify schema in branch1 and main branch is same SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); TableSchema branchSchema = - SchemaManager.fromPath( + TableSchema.fromPath( new TraceableFileIO(), schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0)); TableSchema schema0 = schemaManager.schema(0); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java index 8ba25c6617fe..64bb5f21abbb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java @@ -91,7 +91,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new PrimaryKeyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java index 1be321975466..ba9813804498 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java @@ -247,7 +247,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema return new PrimaryKeyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java index 618e8691c65d..88928fe991bc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java @@ -146,7 +146,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new PrimaryKeyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java index 489c1ba05217..32a4138be564 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java @@ -54,7 +54,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new PrimaryKeyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 6b7b28263af0..26480cf411bb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -281,8 +281,8 @@ public void testLatestSnapshotOfUser() throws IOException, InterruptedException @Test public void testTraversalSnapshotsFromLatestSafely() throws IOException, InterruptedException { FileIO localFileIO = LocalFileIO.create(); - SnapshotManager snapshotManager = - new SnapshotManager(localFileIO, new Path(tempDir.toString())); + Path path = new Path(tempDir.toString()); + SnapshotManager snapshotManager = new SnapshotManager(localFileIO, path); // create 10 snapshots for (long i = 0; i < 10; i++) { Snapshot snapshot =