From 3701eddaf8b2d04a67be96061b444f107aef6416 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 21 Nov 2024 15:06:22 +0800 Subject: [PATCH 1/8] [core] Add cache for snapshots and tags and schemas --- .../main/java/org/apache/paimon/Snapshot.java | 65 +++++++++------- .../apache/paimon/schema/SchemaManager.java | 17 +---- .../org/apache/paimon/schema/TableSchema.java | 45 +++++++---- .../main/java/org/apache/paimon/tag/Tag.java | 55 ++++++++------ .../apache/paimon/utils/MetaCacheManager.java | 74 +++++++++++++++++++ .../apache/paimon/utils/SnapshotManager.java | 16 +--- .../org/apache/paimon/utils/TagManager.java | 37 +++++----- .../paimon/table/FileStoreTableTestBase.java | 4 +- 8 files changed, 204 insertions(+), 109 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 3b8d2fa15b4b..cd5891a5bbb0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -29,9 +29,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.FileNotFoundException; @@ -39,6 +36,8 @@ import java.util.Map; import java.util.Objects; +import static org.apache.paimon.utils.MetaCacheManager.SNAPSHOT_CACHE; + /** * This file is the entrance to all data committed at some specific time point. * @@ -65,7 +64,6 @@ @Public @JsonIgnoreProperties(ignoreUnknown = true) public class Snapshot { - private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class); public static final long FIRST_SNAPSHOT_ID = 1; @@ -355,28 +353,6 @@ public String toJson() { return JsonSerdeUtil.toJson(this); } - public static Snapshot fromJson(String json) { - return JsonSerdeUtil.fromJson(json, Snapshot.class); - } - - public static Snapshot fromPath(FileIO fileIO, Path path) { - try { - return Snapshot.fromJson(fileIO.readFileUtf8(path)); - } catch (FileNotFoundException e) { - String errorMessage = - String.format( - "Snapshot file %s does not exist. " - + "It might have been expired by other jobs operating on this table. " - + "In this case, you can avoid concurrent modification issues by configuring " - + "write-only = true and use a dedicated compaction job, or configuring " - + "different expiration thresholds for different jobs.", - path); - throw new RuntimeException(errorMessage, e); - } catch (IOException e) { - throw new RuntimeException("Fails to read snapshot from path " + path, e); - } - } - @Override public int hashCode() { return Objects.hash( @@ -437,4 +413,41 @@ public enum CommitKind { /** Collect statistics. */ ANALYZE } + + // =================== Utils for reading ========================= + + public static Snapshot fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Snapshot.class); + } + + public static Snapshot fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + String errorMessage = + String.format( + "Snapshot file %s does not exist. " + + "It might have been expired by other jobs operating on this table. " + + "In this case, you can avoid concurrent modification issues by configuring " + + "write-only = true and use a dedicated compaction job, or configuring " + + "different expiration thresholds for different jobs.", + path); + throw new RuntimeException(errorMessage, e); + } + } + + public static Snapshot tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + Snapshot snapshot = SNAPSHOT_CACHE.getIfPresent(path); + if (snapshot == null) { + snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); + SNAPSHOT_CACHE.put(path, snapshot); + } + return snapshot; + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new RuntimeException("Fails to read snapshot from path " + path, e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a84348810b99..0951dae79562 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,7 +45,6 @@ import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; -import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StringUtils; @@ -80,6 +79,7 @@ import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; +import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -250,6 +250,7 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws boolean success = commit(newSchema); if (success) { + invalidateCacheForPrefix(tableRoot); return newSchema; } } @@ -769,11 +770,7 @@ boolean commit(TableSchema newSchema) throws Exception { /** Read schema for schema id. */ public TableSchema schema(long id) { - try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return TableSchema.fromPath(fileIO, toSchemaPath(id)); } /** Check if a schema exists. */ @@ -789,14 +786,6 @@ public boolean schemaExists(long id) { } } - public static TableSchema fromPath(FileIO fileIO, Path path) { - try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(path), TableSchema.class); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - private String branchPath() { return BranchManager.branchPath(tableRoot, branch); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index b5bdeccf10f6..03c3bb4cdf36 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -29,8 +29,10 @@ import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -40,6 +42,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.apache.paimon.utils.MetaCacheManager.SCHEMA_CACHE; /** * Schema of a table. Unlike schema, it has more information than {@link Schema}, including schemaId @@ -296,19 +299,6 @@ public TableSchema copy(Map newOptions) { timeMillis); } - public static TableSchema fromJson(String json) { - return JsonSerdeUtil.fromJson(json, TableSchema.class); - } - - public static TableSchema fromPath(FileIO fileIO, Path path) { - try { - String json = fileIO.readFileUtf8(path); - return TableSchema.fromJson(json); - } catch (IOException e) { - throw new RuntimeException("Fails to read schema from path " + path, e); - } - } - @Override public String toString() { return JsonSerdeUtil.toJson(this); @@ -341,4 +331,33 @@ public int hashCode() { public static List newFields(RowType rowType) { return rowType.getFields(); } + + // =================== Utils for reading ========================= + + public static TableSchema fromJson(String json) { + return JsonSerdeUtil.fromJson(json, TableSchema.class); + } + + public static TableSchema fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static TableSchema tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + TableSchema schema = SCHEMA_CACHE.getIfPresent(path); + if (schema == null) { + schema = fromJson(fileIO.readFileUtf8(path)); + SCHEMA_CACHE.put(path, schema); + } + return schema; + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java index f1ac879d33a7..5a9028b0acf6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -33,11 +33,14 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.time.LocalDateTime; import java.util.Map; import java.util.Objects; +import static org.apache.paimon.utils.MetaCacheManager.TAG_CACHE; + /** Snapshot with tagCreateTime and tagTimeRetained. */ @JsonIgnoreProperties(ignoreUnknown = true) public class Tag extends Snapshot { @@ -113,29 +116,6 @@ public String toJson() { return JsonSerdeUtil.toJson(this); } - public static Tag fromJson(String json) { - return JsonSerdeUtil.fromJson(json, Tag.class); - } - - public static Tag fromPath(FileIO fileIO, Path path) { - try { - String json = fileIO.readFileUtf8(path); - return Tag.fromJson(json); - } catch (IOException e) { - throw new RuntimeException("Fails to read tag from path " + path, e); - } - } - - @Nullable - public static Tag safelyFromPath(FileIO fileIO, Path path) throws IOException { - try { - String json = fileIO.readFileUtf8(path); - return Tag.fromJson(json); - } catch (FileNotFoundException e) { - return null; - } - } - public static Tag fromSnapshotAndTagTtl( Snapshot snapshot, Duration tagTimeRetained, LocalDateTime tagCreateTime) { return new Tag( @@ -201,4 +181,33 @@ public boolean equals(Object o) { return Objects.equals(tagCreateTime, that.tagCreateTime) && Objects.equals(tagTimeRetained, that.tagTimeRetained); } + + // =================== Utils for reading ========================= + + public static Tag fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Tag.class); + } + + public static Tag fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static Tag tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + Tag tag = TAG_CACHE.getIfPresent(path); + if (tag == null) { + tag = fromJson(fileIO.readFileUtf8(path)); + TAG_CACHE.put(path, tag); + } + return tag; + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java new file mode 100644 index 000000000000..a4d37f58944b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.tag.Tag; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; + +/** Cache for {@link Snapshot} and {@link Tag} and {@link TableSchema}. */ +public class MetaCacheManager { + + public static final Cache SNAPSHOT_CACHE = + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofMinutes(10)) + .maximumSize(300) + .executor(Runnable::run) + .build(); + + public static final Cache TAG_CACHE = + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofMinutes(10)) + .maximumSize(100) + .executor(Runnable::run) + .build(); + + public static final Cache SCHEMA_CACHE = + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofMinutes(10)) + .maximumSize(100) + .executor(Runnable::run) + .build(); + + public static void invalidateCacheForPrefix(Path tablePath) { + String path = tablePath.toString(); + invalidateCacheForPrefix(SNAPSHOT_CACHE, path); + invalidateCacheForPrefix(TAG_CACHE, path); + invalidateCacheForPrefix(SCHEMA_CACHE, path); + } + + private static void invalidateCacheForPrefix(Cache cache, String tablePath) { + List keys = + cache.asMap().keySet().stream() + .filter(key -> key.toString().startsWith(tablePath)) + .collect(Collectors.toList()); + cache.invalidateAll(keys); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 5902d4c84cf5..f750ca44151a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -126,14 +126,7 @@ public Snapshot snapshot(long snapshotId) { } public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { - try { - Path snapshotPath = snapshotPath(snapshotId); - return Snapshot.fromJson(fileIO.readFileUtf8(snapshotPath)); - } catch (FileNotFoundException fileNotFoundException) { - throw fileNotFoundException; - } catch (IOException ioException) { - throw new RuntimeException(ioException); - } + return Snapshot.tryFromPath(fileIO, snapshotPath(snapshotId)); } public Changelog changelog(long snapshotId) { @@ -486,11 +479,8 @@ public List safelyGetAllSnapshots() throws IOException { collectSnapshots( path -> { try { - snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path))); - } catch (IOException e) { - if (!(e instanceof FileNotFoundException)) { - throw new RuntimeException(e); - } + snapshots.add(Snapshot.tryFromPath(fileIO, path)); + } catch (FileNotFoundException ignored) { } }, paths); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 65963aafdf6b..ea36cd2a8207 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -341,25 +341,26 @@ public SortedMap> tags() { public SortedMap> tags(Predicate filter) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); + List paths; try { - List paths = tagPaths(path -> true); + paths = tagPaths(path -> true); + } catch (IOException e) { + throw new RuntimeException(e); + } - for (Path path : paths) { - String tagName = path.getName().substring(TAG_PREFIX.length()); + for (Path path : paths) { + String tagName = path.getName().substring(TAG_PREFIX.length()); - if (!filter.test(tagName)) { - continue; - } - // If the tag file is not found, it might be deleted by - // other processes, so just skip this tag - try { - Snapshot snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); - tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); - } catch (FileNotFoundException ignored) { - } + if (!filter.test(tagName)) { + continue; + } + // If the tag file is not found, it might be deleted by + // other processes, so just skip this tag + try { + Snapshot snapshot = Snapshot.tryFromPath(fileIO, path); + tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); + } catch (FileNotFoundException ignored) { } - } catch (IOException e) { - throw new RuntimeException(e); } return tags; } @@ -371,9 +372,9 @@ public List> tagObjects() { List> tags = new ArrayList<>(); for (Path path : paths) { String tagName = path.getName().substring(TAG_PREFIX.length()); - Tag tag = Tag.safelyFromPath(fileIO, path); - if (tag != null) { - tags.add(Pair.of(tag, tagName)); + try { + tags.add(Pair.of(Tag.tryFromPath(fileIO, path), tagName)); + } catch (FileNotFoundException ignored) { } } return tags; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 4d8408955d38..75e284a68c3a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1193,7 +1193,7 @@ public void testCreateBranch() throws Exception { SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath, "test-branch"); TableSchema branchSchema = - SchemaManager.fromPath(new TraceableFileIO(), schemaManager.toSchemaPath(0)); + TableSchema.fromPath(new TraceableFileIO(), schemaManager.toSchemaPath(0)); TableSchema schema0 = schemaManager.schema(0); assertThat(branchSchema.equals(schema0)).isTrue(); } @@ -1344,7 +1344,7 @@ public void testFastForward() throws Exception { // verify schema in branch1 and main branch is same SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); TableSchema branchSchema = - SchemaManager.fromPath( + TableSchema.fromPath( new TraceableFileIO(), schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0)); TableSchema schema0 = schemaManager.schema(0); From 9f8ab33c017a16ebd076b81f76ea2a70389236fb Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 21 Nov 2024 15:33:18 +0800 Subject: [PATCH 2/8] fix --- paimon-core/src/main/java/org/apache/paimon/tag/Tag.java | 9 +-------- .../java/org/apache/paimon/utils/MetaCacheManager.java | 9 --------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java index 5a9028b0acf6..53641a2eb69f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -39,8 +39,6 @@ import java.util.Map; import java.util.Objects; -import static org.apache.paimon.utils.MetaCacheManager.TAG_CACHE; - /** Snapshot with tagCreateTime and tagTimeRetained. */ @JsonIgnoreProperties(ignoreUnknown = true) public class Tag extends Snapshot { @@ -198,12 +196,7 @@ public static Tag fromPath(FileIO fileIO, Path path) { public static Tag tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { try { - Tag tag = TAG_CACHE.getIfPresent(path); - if (tag == null) { - tag = fromJson(fileIO.readFileUtf8(path)); - TAG_CACHE.put(path, tag); - } - return tag; + return fromJson(fileIO.readFileUtf8(path)); } catch (FileNotFoundException e) { throw e; } catch (IOException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java index a4d37f58944b..da9712e00ac4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java @@ -41,14 +41,6 @@ public class MetaCacheManager { .executor(Runnable::run) .build(); - public static final Cache TAG_CACHE = - Caffeine.newBuilder() - .softValues() - .expireAfterAccess(Duration.ofMinutes(10)) - .maximumSize(100) - .executor(Runnable::run) - .build(); - public static final Cache SCHEMA_CACHE = Caffeine.newBuilder() .softValues() @@ -60,7 +52,6 @@ public class MetaCacheManager { public static void invalidateCacheForPrefix(Path tablePath) { String path = tablePath.toString(); invalidateCacheForPrefix(SNAPSHOT_CACHE, path); - invalidateCacheForPrefix(TAG_CACHE, path); invalidateCacheForPrefix(SCHEMA_CACHE, path); } From 6c24fe5afac28f02d428c80f1c6c902391a5c033 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 21 Nov 2024 16:05:23 +0800 Subject: [PATCH 3/8] fix --- .../main/java/org/apache/paimon/catalog/FileSystemCatalog.java | 2 ++ .../src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java | 2 ++ .../src/main/java/org/apache/paimon/hive/HiveCatalog.java | 2 ++ 3 files changed, 6 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 9264a54647b1..0b5c64a2e675 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -35,6 +35,7 @@ import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { @@ -108,6 +109,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis protected void dropTableImpl(Identifier identifier) { Path path = getTableLocation(identifier); uncheck(() -> fileIO.delete(path, true)); + invalidateCacheForPrefix(path); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 778bc591fe89..7d0a7cde498a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -59,6 +59,7 @@ import static org.apache.paimon.jdbc.JdbcUtils.execute; import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; import static org.apache.paimon.jdbc.JdbcUtils.updateTable; +import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; /* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -225,6 +226,7 @@ protected void dropTableImpl(Identifier identifier) { try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); + invalidateCacheForPrefix(path); } } catch (Exception ex) { LOG.error("Delete directory[{}] fail for table {}", path, identifier, ex); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 93e7e87ef5c7..36b327ddae3e 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -111,6 +111,7 @@ import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound; +import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; @@ -693,6 +694,7 @@ protected void dropTableImpl(Identifier identifier) { try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); + invalidateCacheForPrefix(path); } } catch (Exception ee) { LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); From 8a0b91e5136957ff2acb1e9142f573d8d1243862 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 21 Nov 2024 18:29:22 +0800 Subject: [PATCH 4/8] fix --- .../org/apache/paimon/utils/TagManager.java | 32 +++++++++---------- .../paimon/utils/SnapshotManagerTest.java | 6 ++-- .../org/apache/paimon/hive/HiveCatalog.java | 5 +-- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index ea36cd2a8207..9e662b91cde5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -341,26 +341,26 @@ public SortedMap> tags() { public SortedMap> tags(Predicate filter) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); - List paths; try { - paths = tagPaths(path -> true); - } catch (IOException e) { - throw new RuntimeException(e); - } + List paths = tagPaths(path -> true); - for (Path path : paths) { - String tagName = path.getName().substring(TAG_PREFIX.length()); + for (Path path : paths) { + String tagName = path.getName().substring(TAG_PREFIX.length()); - if (!filter.test(tagName)) { - continue; - } - // If the tag file is not found, it might be deleted by - // other processes, so just skip this tag - try { - Snapshot snapshot = Snapshot.tryFromPath(fileIO, path); - tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); - } catch (FileNotFoundException ignored) { + if (!filter.test(tagName)) { + continue; + } + // If the tag file is not found, it might be deleted by + // other processes, so just skip this tag + try { + // tag may be modified, so here do not use Snapshot.fromPath to bypass cache + Snapshot snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); + tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); + } catch (FileNotFoundException ignored) { + } } + } catch (IOException e) { + throw new RuntimeException(e); } return tags; } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 6b7b28263af0..14839f4fe0a0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -281,8 +282,8 @@ public void testLatestSnapshotOfUser() throws IOException, InterruptedException @Test public void testTraversalSnapshotsFromLatestSafely() throws IOException, InterruptedException { FileIO localFileIO = LocalFileIO.create(); - SnapshotManager snapshotManager = - new SnapshotManager(localFileIO, new Path(tempDir.toString())); + Path path = new Path(tempDir.toString()); + SnapshotManager snapshotManager = new SnapshotManager(localFileIO, path); // create 10 snapshots for (long i = 0; i < 10; i++) { Snapshot snapshot = @@ -366,6 +367,7 @@ public void testTraversalSnapshotsFromLatestSafely() throws IOException, Interru thread.start(); Thread.sleep(100); + invalidateCacheForPrefix(path); localFileIO.deleteQuietly(snapshotManager.snapshotPath(3)); thread.join(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 36b327ddae3e..0c228dd1c937 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -681,6 +681,9 @@ protected void dropTableImpl(Identifier identifier) { false, true)); + Path path = getTableLocation(identifier); + invalidateCacheForPrefix(path); + // When drop a Hive external table, only the hive metadata is deleted and the data files // are not deleted. if (externalTable) { @@ -690,11 +693,9 @@ protected void dropTableImpl(Identifier identifier) { // Deletes table directory to avoid schema in filesystem exists after dropping hive // table successfully to keep the table consistency between which in filesystem and // which in Hive metastore. - Path path = getTableLocation(identifier); try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); - invalidateCacheForPrefix(path); } } catch (Exception ee) { LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); From edb4f7fd6b736a9794e878c769717d093855efc0 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 21 Nov 2024 19:18:28 +0800 Subject: [PATCH 5/8] fix --- .../src/main/java/org/apache/paimon/utils/BranchManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index bc353bb10d16..2405337e5675 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -34,6 +34,7 @@ import java.util.stream.Stream; import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; +import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Manager for {@code Branch}. */ @@ -201,6 +202,7 @@ public void fastForward(String branchName) { tagManager.copyWithBranch(branchName).tagDirectory(), tagManager.tagDirectory(), true); + invalidateCacheForPrefix(tablePath); } catch (IOException e) { throw new RuntimeException( String.format( From 526cd0703dfa77571f18d5f2da2f0955a1915a78 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 21 Nov 2024 21:29:20 +0800 Subject: [PATCH 6/8] fix --- .../main/java/org/apache/paimon/Snapshot.java | 2 +- .../apache/paimon/catalog/CachingCatalog.java | 35 ++++++++++++++++ .../paimon/catalog/FileSystemCatalog.java | 4 +- .../org/apache/paimon/jdbc/JdbcCatalog.java | 4 +- .../apache/paimon/schema/SchemaManager.java | 4 +- .../org/apache/paimon/schema/TableSchema.java | 2 +- .../apache/paimon/utils/BranchManager.java | 4 +- .../apache/paimon/utils/MetaCacheManager.java | 41 +------------------ .../paimon/utils/SnapshotManagerTest.java | 4 +- .../org/apache/paimon/hive/HiveCatalog.java | 4 +- 10 files changed, 50 insertions(+), 54 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index cd5891a5bbb0..78690abe9dbd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -36,7 +36,7 @@ import java.util.Map; import java.util.Objects; -import static org.apache.paimon.utils.MetaCacheManager.SNAPSHOT_CACHE; +import static org.apache.paimon.catalog.CachingCatalog.SNAPSHOT_CACHE; /** * This file is the entrance to all data committed at some specific time point. diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 003f0edb4fa5..e8422f17b011 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -18,11 +18,13 @@ package org.apache.paimon.catalog; +import org.apache.paimon.Snapshot; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.SystemTableLoader; @@ -47,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable; import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; @@ -316,4 +319,36 @@ public void refreshPartitions(Identifier identifier) throws TableNotExistExcepti partitionCache.put(identifier, result); } } + + // ====================== cache for snapshot and schema files ================================ + + public static final Cache SNAPSHOT_CACHE = + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofMinutes(10)) + .maximumSize(300) + .executor(Runnable::run) + .build(); + + public static final Cache SCHEMA_CACHE = + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofMinutes(10)) + .maximumSize(100) + .executor(Runnable::run) + .build(); + + public static void invalidateMetaCacheForPrefix(Path tablePath) { + String path = tablePath.toString(); + invalidateMetaCacheForPrefix(SNAPSHOT_CACHE, path); + invalidateMetaCacheForPrefix(SCHEMA_CACHE, path); + } + + private static void invalidateMetaCacheForPrefix(Cache cache, String tablePath) { + List keys = + cache.asMap().keySet().stream() + .filter(key -> key.toString().startsWith(tablePath)) + .collect(Collectors.toList()); + cache.invalidateAll(keys); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 0b5c64a2e675..1cc1f85cd925 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -34,8 +34,8 @@ import java.util.Map; import java.util.concurrent.Callable; +import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; -import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { @@ -109,7 +109,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis protected void dropTableImpl(Identifier identifier) { Path path = getTableLocation(identifier); uncheck(() -> fileIO.delete(path, true)); - invalidateCacheForPrefix(path); + invalidateMetaCacheForPrefix(path); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 7d0a7cde498a..e4c028eb75b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -54,12 +54,12 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; import static org.apache.paimon.jdbc.JdbcUtils.execute; import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; import static org.apache.paimon.jdbc.JdbcUtils.updateTable; -import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; /* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -226,7 +226,7 @@ protected void dropTableImpl(Identifier identifier) { try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); - invalidateCacheForPrefix(path); + invalidateMetaCacheForPrefix(path); } } catch (Exception ex) { LOG.error("Delete directory[{}] fail for table {}", path, identifier, ex); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 0951dae79562..428750704bcf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -76,10 +76,10 @@ import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX; +import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; -import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -250,7 +250,7 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws boolean success = commit(newSchema); if (success) { - invalidateCacheForPrefix(tableRoot); + invalidateMetaCacheForPrefix(tableRoot); return newSchema; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index 03c3bb4cdf36..b47351a027bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -42,7 +42,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET_KEY; -import static org.apache.paimon.utils.MetaCacheManager.SCHEMA_CACHE; +import static org.apache.paimon.catalog.CachingCatalog.SCHEMA_CACHE; /** * Schema of a table. Unlike schema, it has more information than {@link Schema}, including schemaId diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 2405337e5675..0b3526159b34 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -33,8 +33,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; -import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Manager for {@code Branch}. */ @@ -202,7 +202,7 @@ public void fastForward(String branchName) { tagManager.copyWithBranch(branchName).tagDirectory(), tagManager.tagDirectory(), true); - invalidateCacheForPrefix(tablePath); + invalidateMetaCacheForPrefix(tablePath); } catch (IOException e) { throw new RuntimeException( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java index da9712e00ac4..ce9f7eb8b22d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java @@ -19,47 +19,8 @@ package org.apache.paimon.utils; import org.apache.paimon.Snapshot; -import org.apache.paimon.fs.Path; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.tag.Tag; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; - -import java.time.Duration; -import java.util.List; -import java.util.stream.Collectors; - /** Cache for {@link Snapshot} and {@link Tag} and {@link TableSchema}. */ -public class MetaCacheManager { - - public static final Cache SNAPSHOT_CACHE = - Caffeine.newBuilder() - .softValues() - .expireAfterAccess(Duration.ofMinutes(10)) - .maximumSize(300) - .executor(Runnable::run) - .build(); - - public static final Cache SCHEMA_CACHE = - Caffeine.newBuilder() - .softValues() - .expireAfterAccess(Duration.ofMinutes(10)) - .maximumSize(100) - .executor(Runnable::run) - .build(); - - public static void invalidateCacheForPrefix(Path tablePath) { - String path = tablePath.toString(); - invalidateCacheForPrefix(SNAPSHOT_CACHE, path); - invalidateCacheForPrefix(SCHEMA_CACHE, path); - } - - private static void invalidateCacheForPrefix(Cache cache, String tablePath) { - List keys = - cache.asMap().keySet().stream() - .filter(key -> key.toString().startsWith(tablePath)) - .collect(Collectors.toList()); - cache.invalidateAll(keys); - } -} +public class MetaCacheManager {} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 14839f4fe0a0..7b6480cdfa71 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; +import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -367,7 +367,7 @@ public void testTraversalSnapshotsFromLatestSafely() throws IOException, Interru thread.start(); Thread.sleep(100); - invalidateCacheForPrefix(path); + invalidateMetaCacheForPrefix(path); localFileIO.deleteQuietly(snapshotManager.snapshotPath(3)); thread.join(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 0c228dd1c937..9012eefcbd91 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -96,6 +96,7 @@ import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.FORMAT_TABLE; +import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; @@ -111,7 +112,6 @@ import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound; -import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; @@ -682,7 +682,7 @@ protected void dropTableImpl(Identifier identifier) { true)); Path path = getTableLocation(identifier); - invalidateCacheForPrefix(path); + invalidateMetaCacheForPrefix(path); // When drop a Hive external table, only the hive metadata is deleted and the data files // are not deleted. From c0c966daff5d278df528c2bf4c755f74fcbe848b Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 22 Nov 2024 16:25:13 +0800 Subject: [PATCH 7/8] fix --- .../apache/paimon/options/CatalogOptions.java | 7 ++ .../org/apache/paimon/AbstractFileStore.java | 10 ++- .../java/org/apache/paimon/FileStore.java | 4 ++ .../main/java/org/apache/paimon/Snapshot.java | 9 +-- .../apache/paimon/catalog/CachingCatalog.java | 70 ++++++++----------- .../paimon/catalog/FileSystemCatalog.java | 2 - .../org/apache/paimon/jdbc/JdbcCatalog.java | 2 - .../paimon/operation/OrphanFilesClean.java | 3 +- .../paimon/privilege/PrivilegedFileStore.java | 8 +++ .../apache/paimon/schema/SchemaManager.java | 2 - .../org/apache/paimon/schema/TableSchema.java | 8 +-- .../paimon/table/AbstractFileStoreTable.java | 10 ++- .../org/apache/paimon/table/DataTable.java | 3 + .../paimon/table/DelegatedFileStoreTable.java | 13 ++++ .../table/FallbackReadFileStoreTable.java | 3 +- .../apache/paimon/table/FileStoreTable.java | 5 ++ .../table/system/AggregationFieldsTable.java | 7 +- .../paimon/table/system/AuditLogTable.java | 6 ++ .../table/system/CompactBucketsTable.java | 6 ++ .../paimon/table/system/FileMonitorTable.java | 6 ++ .../paimon/table/system/FilesTable.java | 3 +- .../paimon/table/system/OptionsTable.java | 24 +++---- .../table/system/ReadOptimizedTable.java | 6 ++ .../paimon/table/system/SchemasTable.java | 57 +++------------ .../paimon/table/system/SnapshotsTable.java | 6 +- .../apache/paimon/utils/BranchManager.java | 6 +- .../apache/paimon/utils/MetaCacheManager.java | 26 ------- .../apache/paimon/utils/SnapshotManager.java | 42 +++++++++-- .../org/apache/paimon/utils/TagManager.java | 3 +- .../paimon/catalog/CachingCatalogTest.java | 25 ++++++- .../catalog/TestableCachingCatalog.java | 9 ++- .../table/AppendOnlyFileDataTableTest.java | 2 +- ...AppendOnlyTableColumnTypeFileDataTest.java | 2 +- ...AppendOnlyTableColumnTypeFileMetaTest.java | 2 +- .../AppendOnlyTableFileMetaFilterTest.java | 2 +- .../PrimaryKeyColumnTypeFileDataTest.java | 2 +- .../table/PrimaryKeyFileDataTableTest.java | 2 +- .../table/PrimaryKeyFileMetaFilterTest.java | 2 +- ...PrimaryKeyTableColumnTypeFileMetaTest.java | 2 +- .../paimon/utils/SnapshotManagerTest.java | 2 - .../org/apache/paimon/hive/HiveCatalog.java | 5 +- 41 files changed, 221 insertions(+), 193 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 6ad9f3350adf..f69af2d59910 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -123,6 +123,13 @@ public class CatalogOptions { .noDefaultValue() .withDescription("Controls the maximum memory to cache manifest content."); + public static final ConfigOption CACHE_SNAPSHOT_MAX_NUM_PER_TABLE = + key("cache.snapshot.max-num-per-table") + .intType() + .defaultValue(20) + .withDescription( + "Controls the max number for snapshots per table in the catalog are cached."); + public static final ConfigOption LINEAGE_META = key("lineage-meta") .stringType() diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 14665961a8a7..ae4552aa7150 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -54,6 +54,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.time.Duration; @@ -79,6 +81,7 @@ abstract class AbstractFileStore implements FileStore { @Nullable private final SegmentsCache writeManifestCache; @Nullable private SegmentsCache readManifestCache; + @Nullable private Cache snapshotCache; protected AbstractFileStore( FileIO fileIO, @@ -116,7 +119,7 @@ public FileStorePathFactory pathFactory() { @Override public SnapshotManager snapshotManager() { - return new SnapshotManager(fileIO, options.path(), options.branch()); + return new SnapshotManager(fileIO, options.path(), options.branch(), snapshotCache); } @Override @@ -340,4 +343,9 @@ public ServiceManager newServiceManager() { public void setManifestCache(SegmentsCache manifestCache) { this.readManifestCache = manifestCache; } + + @Override + public void setSnapshotCache(Cache cache) { + this.snapshotCache = cache; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index f9bf4c8440bd..e50d4ada1397 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -44,6 +44,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.util.List; @@ -107,4 +109,6 @@ public interface FileStore { List createTagCallbacks(); void setManifestCache(SegmentsCache manifestCache); + + void setSnapshotCache(Cache cache); } diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 78690abe9dbd..baee7bad950e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -36,8 +36,6 @@ import java.util.Map; import java.util.Objects; -import static org.apache.paimon.catalog.CachingCatalog.SNAPSHOT_CACHE; - /** * This file is the entrance to all data committed at some specific time point. * @@ -438,12 +436,7 @@ public static Snapshot fromPath(FileIO fileIO, Path path) { public static Snapshot tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { try { - Snapshot snapshot = SNAPSHOT_CACHE.getIfPresent(path); - if (snapshot == null) { - snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); - SNAPSHOT_CACHE.put(path, snapshot); - } - return snapshot; + return Snapshot.fromJson(fileIO.readFileUtf8(path)); } catch (FileNotFoundException e) { throw e; } catch (IOException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index e8422f17b011..1912ad60623c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -18,13 +18,11 @@ package org.apache.paimon.catalog; -import org.apache.paimon.Snapshot; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; -import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.SystemTableLoader; @@ -49,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable; import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; @@ -58,6 +55,7 @@ import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD; import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; +import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE; import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES; /** A {@link Catalog} to cache databases and tables and manifests. */ @@ -65,6 +63,9 @@ public class CachingCatalog extends DelegateCatalog { private static final Logger LOG = LoggerFactory.getLogger(CachingCatalog.class); + private final Duration expirationInterval; + private final int snapshotMaxNumPerTable; + protected final Cache databaseCache; protected final Cache tableCache; @Nullable protected final SegmentsCache manifestCache; @@ -78,7 +79,8 @@ public CachingCatalog(Catalog wrapped) { CACHE_EXPIRATION_INTERVAL_MS.defaultValue(), CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(), CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(), - CACHE_PARTITION_MAX_NUM.defaultValue()); + CACHE_PARTITION_MAX_NUM.defaultValue(), + CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue()); } public CachingCatalog( @@ -86,13 +88,15 @@ public CachingCatalog( Duration expirationInterval, MemorySize manifestMaxMemory, long manifestCacheThreshold, - long cachedPartitionMaxNum) { + long cachedPartitionMaxNum, + int snapshotMaxNumPerTable) { this( wrapped, expirationInterval, manifestMaxMemory, manifestCacheThreshold, cachedPartitionMaxNum, + snapshotMaxNumPerTable, Ticker.systemTicker()); } @@ -102,6 +106,7 @@ public CachingCatalog( MemorySize manifestMaxMemory, long manifestCacheThreshold, long cachedPartitionMaxNum, + int snapshotMaxNumPerTable, Ticker ticker) { super(wrapped); if (expirationInterval.isZero() || expirationInterval.isNegative()) { @@ -109,6 +114,9 @@ public CachingCatalog( "When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled."); } + this.expirationInterval = expirationInterval; + this.snapshotMaxNumPerTable = snapshotMaxNumPerTable; + this.databaseCache = Caffeine.newBuilder() .softValues() @@ -124,6 +132,7 @@ public CachingCatalog( .expireAfterAccess(expirationInterval) .ticker(ticker) .build(); + this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); this.partitionCache = cachedPartitionMaxNum == 0 ? null @@ -137,7 +146,6 @@ public CachingCatalog( .maximumWeight(cachedPartitionMaxNum) .ticker(ticker) .build(); - this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); } public static Catalog tryToCreate(Catalog catalog, Options options) { @@ -158,7 +166,8 @@ public static Catalog tryToCreate(Catalog catalog, Options options) { options.get(CACHE_EXPIRATION_INTERVAL_MS), manifestMaxMemory, manifestThreshold, - options.get(CACHE_PARTITION_MAX_NUM)); + options.get(CACHE_PARTITION_MAX_NUM), + options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE)); } @Override @@ -247,9 +256,20 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } private void putTableCache(Identifier identifier, Table table) { - if (manifestCache != null && table instanceof FileStoreTable) { - ((FileStoreTable) table).setManifestCache(manifestCache); + if (table instanceof FileStoreTable) { + FileStoreTable storeTable = (FileStoreTable) table; + storeTable.setSnapshotCache( + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(expirationInterval) + .maximumSize(snapshotMaxNumPerTable) + .executor(Runnable::run) + .build()); + if (manifestCache != null) { + storeTable.setManifestCache(manifestCache); + } } + tableCache.put(identifier, table); } @@ -319,36 +339,4 @@ public void refreshPartitions(Identifier identifier) throws TableNotExistExcepti partitionCache.put(identifier, result); } } - - // ====================== cache for snapshot and schema files ================================ - - public static final Cache SNAPSHOT_CACHE = - Caffeine.newBuilder() - .softValues() - .expireAfterAccess(Duration.ofMinutes(10)) - .maximumSize(300) - .executor(Runnable::run) - .build(); - - public static final Cache SCHEMA_CACHE = - Caffeine.newBuilder() - .softValues() - .expireAfterAccess(Duration.ofMinutes(10)) - .maximumSize(100) - .executor(Runnable::run) - .build(); - - public static void invalidateMetaCacheForPrefix(Path tablePath) { - String path = tablePath.toString(); - invalidateMetaCacheForPrefix(SNAPSHOT_CACHE, path); - invalidateMetaCacheForPrefix(SCHEMA_CACHE, path); - } - - private static void invalidateMetaCacheForPrefix(Cache cache, String tablePath) { - List keys = - cache.asMap().keySet().stream() - .filter(key -> key.toString().startsWith(tablePath)) - .collect(Collectors.toList()); - cache.invalidateAll(keys); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 1cc1f85cd925..9264a54647b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.concurrent.Callable; -import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; /** A catalog implementation for {@link FileIO}. */ @@ -109,7 +108,6 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis protected void dropTableImpl(Identifier identifier) { Path path = getTableLocation(identifier); uncheck(() -> fileIO.delete(path, true)); - invalidateMetaCacheForPrefix(path); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index e4c028eb75b1..778bc591fe89 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -54,7 +54,6 @@ import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; import static org.apache.paimon.jdbc.JdbcUtils.execute; @@ -226,7 +225,6 @@ protected void dropTableImpl(Identifier identifier) { try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); - invalidateMetaCacheForPrefix(path); } } catch (Exception ex) { LOG.error("Delete directory[{}] fail for table {}", path, identifier, ex); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 5698908cb9b0..869100d9cfb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -105,7 +105,8 @@ protected List validBranches() { List abnormalBranches = new ArrayList<>(); for (String branch : branches) { - if (!new SchemaManager(table.fileIO(), table.location(), branch).latest().isPresent()) { + SchemaManager schemaManager = table.schemaManager().copyWithBranch(branch); + if (!schemaManager.latest().isPresent()) { abnormalBranches.add(branch); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index 59243a53569e..3ee0d5fa9b01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; @@ -47,6 +48,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.util.List; @@ -210,4 +213,9 @@ public List createTagCallbacks() { public void setManifestCache(SegmentsCache manifestCache) { wrapped.setManifestCache(manifestCache); } + + @Override + public void setSnapshotCache(Cache cache) { + wrapped.setSnapshotCache(cache); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 428750704bcf..d827ffd0fb66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -76,7 +76,6 @@ import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX; -import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; @@ -250,7 +249,6 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws boolean success = commit(newSchema); if (success) { - invalidateMetaCacheForPrefix(tableRoot); return newSchema; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index b47351a027bf..a0a149d1ae9b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -42,7 +42,6 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET_KEY; -import static org.apache.paimon.catalog.CachingCatalog.SCHEMA_CACHE; /** * Schema of a table. Unlike schema, it has more information than {@link Schema}, including schemaId @@ -348,12 +347,7 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { public static TableSchema tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { try { - TableSchema schema = SCHEMA_CACHE.getIfPresent(path); - if (schema == null) { - schema = fromJson(fileIO.readFileUtf8(path)); - SCHEMA_CACHE.put(path, schema); - } - return schema; + return fromJson(fileIO.readFileUtf8(path)); } catch (FileNotFoundException e) { throw e; } catch (IOException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 07c0e88645ac..4180ff11c167 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -68,6 +68,8 @@ import org.apache.paimon.utils.SnapshotNotExistException; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.io.IOException; @@ -123,6 +125,11 @@ public void setManifestCache(SegmentsCache manifestCache) { store().setManifestCache(manifestCache); } + @Override + public void setSnapshotCache(Cache cache) { + store().setSnapshotCache(cache); + } + @Override public OptionalLong latestSnapshotId() { Long snapshot = store().snapshotManager().latestSnapshotId(); @@ -340,7 +347,8 @@ public FileStoreTable copy(TableSchema newTableSchema) { : new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema, catalogEnvironment); } - protected SchemaManager schemaManager() { + @Override + public SchemaManager schemaManager() { return new SchemaManager(fileIO(), path, currentBranch()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java index e330db0e04a4..7979daccf756 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; @@ -39,6 +40,8 @@ public interface DataTable extends InnerTable { SnapshotManager snapshotManager(); + SchemaManager schemaManager(); + TagManager tagManager(); BranchManager branchManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 2b369e5005cc..624476b5b43e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -27,6 +27,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.query.LocalTableQuery; @@ -44,6 +45,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import java.time.Duration; import java.util.Objects; import java.util.Optional; @@ -92,6 +95,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); @@ -117,6 +125,11 @@ public void setManifestCache(SegmentsCache manifestCache) { wrapped.setManifestCache(manifestCache); } + @Override + public void setSnapshotCache(Cache cache) { + wrapped.setSnapshotCache(cache); + } + @Override public TableSchema schema() { return wrapped.schema(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index f1a60b9713f9..e3e290f06086 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -28,7 +28,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataFilePlan; import org.apache.paimon.table.source.DataSplit; @@ -103,7 +102,7 @@ public FileStoreTable switchToBranch(String branchName) { private FileStoreTable switchWrappedToBranch(String branchName) { Optional optionalSchema = - new SchemaManager(wrapped.fileIO(), wrapped.location(), branchName).latest(); + wrapped.schemaManager().copyWithBranch(branchName).latest(); Preconditions.checkArgument( optionalSchema.isPresent(), "Branch " + branchName + " does not exist"); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 01227dd35407..d37e57e4e57e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestCacheFilter; @@ -30,6 +31,8 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SegmentsCache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import java.util.List; import java.util.Map; import java.util.Optional; @@ -42,6 +45,8 @@ public interface FileStoreTable extends DataTable { void setManifestCache(SegmentsCache manifestCache); + void setSnapshotCache(Cache cache); + @Override default RowType rowType() { return schema().logicalRowType(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java index 10a046ca70b5..8c0eed4d6b8b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java @@ -18,7 +18,6 @@ package org.apache.paimon.table.system; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -27,7 +26,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -78,14 +76,12 @@ public class AggregationFieldsTable implements ReadonlyTable { private final FileIO fileIO; private final Path location; - private final String branch; private final FileStoreTable dataTable; public AggregationFieldsTable(FileStoreTable dataTable) { this.fileIO = dataTable.fileIO(); this.location = dataTable.location(); - this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; } @@ -192,8 +188,7 @@ public RecordReader createReader(Split split) { if (!(split instanceof AggregationSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - Path location = ((AggregationSplit) split).location; - TableSchema schemas = new SchemaManager(fileIO, location, branch).latest().get(); + TableSchema schemas = dataTable.schemaManager().latest().get(); Iterator rows = createInternalRowIterator(schemas); if (readType != null) { rows = diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index b0cbe0772b5e..1cb967f8d1e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -39,6 +39,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.PredicateReplaceVisitor; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -187,6 +188,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java index ff40c9502eb7..31cecbfb15c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java @@ -33,6 +33,7 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -145,6 +146,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index fc1bb2a5b167..522335aaa6c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -34,6 +34,7 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -131,6 +132,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 0232fc2d2dde..6dcbb322d6d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -143,8 +143,7 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new FilesRead( - new SchemaManager(storeTable.fileIO(), storeTable.location()), storeTable); + return new FilesRead(storeTable.schemaManager(), storeTable); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java index c7dec03343d0..ed20896646b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java @@ -18,7 +18,6 @@ package org.apache.paimon.table.system; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -27,7 +26,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; import org.apache.paimon.table.Table; @@ -44,7 +42,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -70,14 +67,12 @@ public class OptionsTable implements ReadonlyTable { private final FileIO fileIO; private final Path location; - private final String branch; private final FileStoreTable dataTable; public OptionsTable(FileStoreTable dataTable) { this.fileIO = dataTable.fileIO(); this.location = dataTable.location(); - this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; } @@ -178,14 +173,20 @@ public TableRead withIOManager(IOManager ioManager) { } @Override - public RecordReader createReader(Split split) throws IOException { + public RecordReader createReader(Split split) { if (!(split instanceof OptionsSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - Path location = ((OptionsSplit) split).location; Iterator rows = Iterators.transform( - options(fileIO, location, branch).entrySet().iterator(), this::toRow); + dataTable + .schemaManager() + .latest() + .orElseThrow(() -> new RuntimeException("Table not exists.")) + .options() + .entrySet() + .iterator(), + this::toRow); if (readType != null) { rows = Iterators.transform( @@ -203,11 +204,4 @@ private InternalRow toRow(Map.Entry option) { BinaryString.fromString(option.getValue())); } } - - private static Map options(FileIO fileIO, Path location, String branchName) { - return new SchemaManager(fileIO, location, branchName) - .latest() - .orElseThrow(() -> new RuntimeException("Table not exists.")) - .options(); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index deb149791c8f..5308005053c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.operation.DefaultValueAssigner; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -165,6 +166,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public SchemaManager schemaManager() { + return wrapped.schemaManager(); + } + @Override public TagManager tagManager() { return wrapped.tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java index d0df75b34f51..3cb0ff4783e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java @@ -18,13 +18,11 @@ package org.apache.paimon.table.system; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.And; import org.apache.paimon.predicate.CompoundPredicate; @@ -61,8 +59,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import javax.annotation.Nullable; - import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -73,7 +69,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -98,16 +93,12 @@ public class SchemasTable implements ReadonlyTable { new DataField(5, "comment", SerializationUtils.newStringType(true)), new DataField(6, "update_time", new TimestampType(false, 3)))); - private final FileIO fileIO; private final Path location; - private final String branch; private final FileStoreTable dataTable; public SchemasTable(FileStoreTable dataTable) { - this.fileIO = dataTable.fileIO(); this.location = dataTable.location(); - this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; } @@ -133,7 +124,7 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new SchemasRead(fileIO); + return new SchemasRead(); } @Override @@ -141,24 +132,16 @@ public Table copy(Map dynamicOptions) { return new SchemasTable(dataTable.copy(dynamicOptions)); } - private class SchemasScan extends ReadOnceTableScan { - private @Nullable LeafPredicate schemaId; + private static class SchemasScan extends ReadOnceTableScan { @Override - public InnerTableScan withFilter(Predicate predicate) { - if (predicate == null) { - return this; - } - - Map leafPredicates = - predicate.visit(LeafPredicateExtractor.INSTANCE); - schemaId = leafPredicates.get("schema_id"); - return this; + public Plan innerPlan() { + return () -> Collections.singletonList(new SchemasSplit()); } @Override - public Plan innerPlan() { - return () -> Collections.singletonList(new SchemasSplit(location, schemaId)); + public InnerTableScan withFilter(Predicate predicate) { + return this; } } @@ -167,47 +150,29 @@ private static class SchemasSplit extends SingletonSplit { private static final long serialVersionUID = 1L; - private final Path location; - - private final @Nullable LeafPredicate schemaId; - - private SchemasSplit(Path location, @Nullable LeafPredicate schemaId) { - this.location = location; - this.schemaId = schemaId; - } - + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { - return false; - } - SchemasSplit that = (SchemasSplit) o; - return Objects.equals(location, that.location) - && Objects.equals(schemaId, that.schemaId); + return o != null && getClass() == o.getClass(); } @Override public int hashCode() { - return Objects.hash(location, schemaId); + return 0; } } /** {@link TableRead} implementation for {@link SchemasTable}. */ private class SchemasRead implements InnerTableRead { - private final FileIO fileIO; private RowType readType; private Optional optionalFilterSchemaIdMax = Optional.empty(); private Optional optionalFilterSchemaIdMin = Optional.empty(); private final List schemaIds = new ArrayList<>(); - public SchemasRead(FileIO fileIO) { - this.fileIO = fileIO; - } - @Override public InnerTableRead withFilter(Predicate predicate) { if (predicate == null) { @@ -287,9 +252,7 @@ public RecordReader createReader(Split split) { if (!(split instanceof SchemasSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - SchemasSplit schemasSplit = (SchemasSplit) split; - Path location = schemasSplit.location; - SchemaManager manager = new SchemaManager(fileIO, location, branch); + SchemaManager manager = dataTable.schemaManager(); Collection tableSchemas; if (!schemaIds.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index 10e5b691acc3..2af13ee937bd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -18,7 +18,6 @@ package org.apache.paimon.table.system; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; @@ -111,14 +110,12 @@ public class SnapshotsTable implements ReadonlyTable { private final FileIO fileIO; private final Path location; - private final String branch; private final FileStoreTable dataTable; public SnapshotsTable(FileStoreTable dataTable) { this.fileIO = dataTable.fileIO(); this.location = dataTable.location(); - this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; } @@ -289,9 +286,8 @@ public RecordReader createReader(Split split) throws IOException { if (!(split instanceof SnapshotsSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - SnapshotManager snapshotManager = - new SnapshotManager(fileIO, ((SnapshotsSplit) split).location, branch); + SnapshotManager snapshotManager = dataTable.snapshotManager(); Iterator snapshots; if (!snapshotIds.isEmpty()) { snapshots = snapshotManager.snapshotsWithId(snapshotIds); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 0b3526159b34..2ea5f542f4e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.tag.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -179,7 +179,7 @@ public void fastForward(String branchName) { List deleteSchemaPaths = schemaManager.schemaPaths(id -> id >= earliestSchemaId); List deleteTagPaths = tagManager.tagPaths( - path -> Snapshot.fromPath(fileIO, path).id() >= earliestSnapshotId); + path -> Tag.fromPath(fileIO, path).id() >= earliestSnapshotId); List deletePaths = Stream.of(deleteSnapshotPaths, deleteSchemaPaths, deleteTagPaths) @@ -202,7 +202,7 @@ public void fastForward(String branchName) { tagManager.copyWithBranch(branchName).tagDirectory(), tagManager.tagDirectory(), true); - invalidateMetaCacheForPrefix(tablePath); + snapshotManager.invalidateCache(); } catch (IOException e) { throw new RuntimeException( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java deleted file mode 100644 index ce9f7eb8b22d..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.utils; - -import org.apache.paimon.Snapshot; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.tag.Tag; - -/** Cache for {@link Snapshot} and {@link Tag} and {@link TableSchema}. */ -public class MetaCacheManager {} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index f750ca44151a..9a120042eaaa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -24,6 +24,8 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,16 +76,26 @@ public class SnapshotManager implements Serializable { private final FileIO fileIO; private final Path tablePath; private final String branch; + @Nullable private final Cache cache; public SnapshotManager(FileIO fileIO, Path tablePath) { this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); } /** Specify the default branch for data writing. */ - public SnapshotManager(FileIO fileIO, Path tablePath, String branchName) { + public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String branchName) { + this(fileIO, tablePath, branchName, null); + } + + public SnapshotManager( + FileIO fileIO, + Path tablePath, + @Nullable String branchName, + @Nullable Cache cache) { this.fileIO = fileIO; this.tablePath = tablePath; this.branch = BranchManager.normalizeBranch(branchName); + this.cache = cache; } public SnapshotManager copyWithBranch(String branchName) { @@ -120,13 +132,34 @@ public Path snapshotDirectory() { return new Path(branchPath(tablePath, branch) + "/snapshot"); } + public void invalidateCache() { + if (cache != null) { + cache.invalidateAll(); + } + } + public Snapshot snapshot(long snapshotId) { - Path snapshotPath = snapshotPath(snapshotId); - return Snapshot.fromPath(fileIO, snapshotPath); + Path path = snapshotPath(snapshotId); + Snapshot snapshot = cache == null ? null : cache.getIfPresent(path); + if (snapshot == null) { + snapshot = Snapshot.fromPath(fileIO, path); + if (cache != null) { + cache.put(path, snapshot); + } + } + return snapshot; } public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { - return Snapshot.tryFromPath(fileIO, snapshotPath(snapshotId)); + Path path = snapshotPath(snapshotId); + Snapshot snapshot = cache == null ? null : cache.getIfPresent(path); + if (snapshot == null) { + snapshot = Snapshot.tryFromPath(fileIO, path); + if (cache != null) { + cache.put(path, snapshot); + } + } + return snapshot; } public Changelog changelog(long snapshotId) { @@ -479,6 +512,7 @@ public List safelyGetAllSnapshots() throws IOException { collectSnapshots( path -> { try { + // do not pollution cache snapshots.add(Snapshot.tryFromPath(fileIO, path)); } catch (FileNotFoundException ignored) { } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 9e662b91cde5..1e05a100d741 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -353,8 +353,7 @@ public SortedMap> tags(Predicate filter) { // If the tag file is not found, it might be deleted by // other processes, so just skip this tag try { - // tag may be modified, so here do not use Snapshot.fromPath to bypass cache - Snapshot snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); + Snapshot snapshot = Tag.tryFromPath(fileIO, path).trimToSnapshot(); tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); } catch (FileNotFoundException ignored) { } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index 65ed5ce0b7bf..e4f0a1510b8d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; @@ -332,6 +333,27 @@ public static Identifier[] sysTables(Identifier tableIdent) { .toArray(Identifier[]::new); } + @Test + public void testSnapshotCache() throws Exception { + TestableCachingCatalog wrappedCatalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + Identifier tableIdent = new Identifier("db", "tbl"); + wrappedCatalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); + Table table = wrappedCatalog.getTable(tableIdent); + + // write + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(1, fromString("1"), fromString("1"))); + write.write(GenericRow.of(2, fromString("2"), fromString("2"))); + commit.commit(write.prepareCommit()); + } + + Snapshot snapshot = table.snapshot(1); + assertThat(snapshot).isSameAs(table.snapshot(1)); + } + @Test public void testManifestCache() throws Exception { innerTestManifestCache(Long.MAX_VALUE); @@ -346,7 +368,8 @@ private void innerTestManifestCache(long manifestCacheThreshold) throws Exceptio Duration.ofSeconds(10), MemorySize.ofMebiBytes(1), manifestCacheThreshold, - 0L); + 0L, + 10); Identifier tableIdent = new Identifier("db", "tbl"); catalog.dropTable(tableIdent, true); catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java index 4c70a0232c44..1d4a9b0e8a58 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java @@ -38,7 +38,14 @@ public class TestableCachingCatalog extends CachingCatalog { private final Duration cacheExpirationInterval; public TestableCachingCatalog(Catalog catalog, Duration expirationInterval, Ticker ticker) { - super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, Long.MAX_VALUE, ticker); + super( + catalog, + expirationInterval, + MemorySize.ZERO, + Long.MAX_VALUE, + Long.MAX_VALUE, + Integer.MAX_VALUE, + ticker); this.cacheExpirationInterval = expirationInterval; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java index 93854e766198..9ce3db0b1ada 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java @@ -33,7 +33,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema return new AppendOnlyFileStoreTable( FileIOFinder.find(tablePath), tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java index b4c16cef20a7..64d0c728d10b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java @@ -37,7 +37,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new AppendOnlyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java index f398d28cc524..300483a9f34b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java @@ -40,7 +40,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new AppendOnlyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java index f65546af75d8..85ed80299736 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java @@ -40,7 +40,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new AppendOnlyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java index 8ba25c6617fe..64bb5f21abbb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java @@ -91,7 +91,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new PrimaryKeyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java index 1be321975466..ba9813804498 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java @@ -247,7 +247,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema return new PrimaryKeyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java index 618e8691c65d..88928fe991bc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java @@ -146,7 +146,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new PrimaryKeyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java index 489c1ba05217..32a4138be564 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java @@ -54,7 +54,7 @@ protected FileStoreTable createFileStoreTable(Map tableSchema SchemaManager schemaManager = new TestingSchemaManager(tablePath, tableSchemas); return new PrimaryKeyFileStoreTable(fileIO, tablePath, schemaManager.latest().get()) { @Override - protected SchemaManager schemaManager() { + public SchemaManager schemaManager() { return schemaManager; } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 7b6480cdfa71..26480cf411bb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -367,7 +366,6 @@ public void testTraversalSnapshotsFromLatestSafely() throws IOException, Interru thread.start(); Thread.sleep(100); - invalidateMetaCacheForPrefix(path); localFileIO.deleteQuietly(snapshotManager.snapshotPath(3)); thread.join(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 9012eefcbd91..93e7e87ef5c7 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -96,7 +96,6 @@ import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.FORMAT_TABLE; -import static org.apache.paimon.catalog.CachingCatalog.invalidateMetaCacheForPrefix; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; @@ -681,9 +680,6 @@ protected void dropTableImpl(Identifier identifier) { false, true)); - Path path = getTableLocation(identifier); - invalidateMetaCacheForPrefix(path); - // When drop a Hive external table, only the hive metadata is deleted and the data files // are not deleted. if (externalTable) { @@ -693,6 +689,7 @@ protected void dropTableImpl(Identifier identifier) { // Deletes table directory to avoid schema in filesystem exists after dropping hive // table successfully to keep the table consistency between which in filesystem and // which in Hive metastore. + Path path = getTableLocation(identifier); try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); From a57ea78ab77500e68fb05709e80e2ae76c9a9ee0 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 22 Nov 2024 17:46:57 +0800 Subject: [PATCH 8/8] fix --- .../layouts/shortcodes/generated/catalog_configuration.html | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 3686fa20c68a..6706d5c421a1 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -68,6 +68,12 @@ Long Controls the max number for which partitions in the catalog are cached. + +
cache.snapshot.max-num-per-table
+ 20 + Integer + Controls the max number for snapshots per table in the catalog are cached. +
client-pool-size
2