From 3701eddaf8b2d04a67be96061b444f107aef6416 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 21 Nov 2024 15:06:22 +0800 Subject: [PATCH] [core] Add cache for snapshots and tags and schemas --- .../main/java/org/apache/paimon/Snapshot.java | 65 +++++++++------- .../apache/paimon/schema/SchemaManager.java | 17 +---- .../org/apache/paimon/schema/TableSchema.java | 45 +++++++---- .../main/java/org/apache/paimon/tag/Tag.java | 55 ++++++++------ .../apache/paimon/utils/MetaCacheManager.java | 74 +++++++++++++++++++ .../apache/paimon/utils/SnapshotManager.java | 16 +--- .../org/apache/paimon/utils/TagManager.java | 37 +++++----- .../paimon/table/FileStoreTableTestBase.java | 4 +- 8 files changed, 204 insertions(+), 109 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 3b8d2fa15b4b..cd5891a5bbb0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -29,9 +29,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.FileNotFoundException; @@ -39,6 +36,8 @@ import java.util.Map; import java.util.Objects; +import static org.apache.paimon.utils.MetaCacheManager.SNAPSHOT_CACHE; + /** * This file is the entrance to all data committed at some specific time point. * @@ -65,7 +64,6 @@ @Public @JsonIgnoreProperties(ignoreUnknown = true) public class Snapshot { - private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class); public static final long FIRST_SNAPSHOT_ID = 1; @@ -355,28 +353,6 @@ public String toJson() { return JsonSerdeUtil.toJson(this); } - public static Snapshot fromJson(String json) { - return JsonSerdeUtil.fromJson(json, Snapshot.class); - } - - public static Snapshot fromPath(FileIO fileIO, Path path) { - try { - return Snapshot.fromJson(fileIO.readFileUtf8(path)); - } catch (FileNotFoundException e) { - String errorMessage = - String.format( - "Snapshot file %s does not exist. " - + "It might have been expired by other jobs operating on this table. " - + "In this case, you can avoid concurrent modification issues by configuring " - + "write-only = true and use a dedicated compaction job, or configuring " - + "different expiration thresholds for different jobs.", - path); - throw new RuntimeException(errorMessage, e); - } catch (IOException e) { - throw new RuntimeException("Fails to read snapshot from path " + path, e); - } - } - @Override public int hashCode() { return Objects.hash( @@ -437,4 +413,41 @@ public enum CommitKind { /** Collect statistics. */ ANALYZE } + + // =================== Utils for reading ========================= + + public static Snapshot fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Snapshot.class); + } + + public static Snapshot fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + String errorMessage = + String.format( + "Snapshot file %s does not exist. " + + "It might have been expired by other jobs operating on this table. " + + "In this case, you can avoid concurrent modification issues by configuring " + + "write-only = true and use a dedicated compaction job, or configuring " + + "different expiration thresholds for different jobs.", + path); + throw new RuntimeException(errorMessage, e); + } + } + + public static Snapshot tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + Snapshot snapshot = SNAPSHOT_CACHE.getIfPresent(path); + if (snapshot == null) { + snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); + SNAPSHOT_CACHE.put(path, snapshot); + } + return snapshot; + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new RuntimeException("Fails to read snapshot from path " + path, e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a84348810b99..0951dae79562 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,7 +45,6 @@ import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; -import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StringUtils; @@ -80,6 +79,7 @@ import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; +import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; @@ -250,6 +250,7 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws boolean success = commit(newSchema); if (success) { + invalidateCacheForPrefix(tableRoot); return newSchema; } } @@ -769,11 +770,7 @@ boolean commit(TableSchema newSchema) throws Exception { /** Read schema for schema id. */ public TableSchema schema(long id) { - try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return TableSchema.fromPath(fileIO, toSchemaPath(id)); } /** Check if a schema exists. */ @@ -789,14 +786,6 @@ public boolean schemaExists(long id) { } } - public static TableSchema fromPath(FileIO fileIO, Path path) { - try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(path), TableSchema.class); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - private String branchPath() { return BranchManager.branchPath(tableRoot, branch); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index b5bdeccf10f6..03c3bb4cdf36 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -29,8 +29,10 @@ import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -40,6 +42,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.apache.paimon.utils.MetaCacheManager.SCHEMA_CACHE; /** * Schema of a table. Unlike schema, it has more information than {@link Schema}, including schemaId @@ -296,19 +299,6 @@ public TableSchema copy(Map newOptions) { timeMillis); } - public static TableSchema fromJson(String json) { - return JsonSerdeUtil.fromJson(json, TableSchema.class); - } - - public static TableSchema fromPath(FileIO fileIO, Path path) { - try { - String json = fileIO.readFileUtf8(path); - return TableSchema.fromJson(json); - } catch (IOException e) { - throw new RuntimeException("Fails to read schema from path " + path, e); - } - } - @Override public String toString() { return JsonSerdeUtil.toJson(this); @@ -341,4 +331,33 @@ public int hashCode() { public static List newFields(RowType rowType) { return rowType.getFields(); } + + // =================== Utils for reading ========================= + + public static TableSchema fromJson(String json) { + return JsonSerdeUtil.fromJson(json, TableSchema.class); + } + + public static TableSchema fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static TableSchema tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + TableSchema schema = SCHEMA_CACHE.getIfPresent(path); + if (schema == null) { + schema = fromJson(fileIO.readFileUtf8(path)); + SCHEMA_CACHE.put(path, schema); + } + return schema; + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java index f1ac879d33a7..5a9028b0acf6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -33,11 +33,14 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.time.LocalDateTime; import java.util.Map; import java.util.Objects; +import static org.apache.paimon.utils.MetaCacheManager.TAG_CACHE; + /** Snapshot with tagCreateTime and tagTimeRetained. */ @JsonIgnoreProperties(ignoreUnknown = true) public class Tag extends Snapshot { @@ -113,29 +116,6 @@ public String toJson() { return JsonSerdeUtil.toJson(this); } - public static Tag fromJson(String json) { - return JsonSerdeUtil.fromJson(json, Tag.class); - } - - public static Tag fromPath(FileIO fileIO, Path path) { - try { - String json = fileIO.readFileUtf8(path); - return Tag.fromJson(json); - } catch (IOException e) { - throw new RuntimeException("Fails to read tag from path " + path, e); - } - } - - @Nullable - public static Tag safelyFromPath(FileIO fileIO, Path path) throws IOException { - try { - String json = fileIO.readFileUtf8(path); - return Tag.fromJson(json); - } catch (FileNotFoundException e) { - return null; - } - } - public static Tag fromSnapshotAndTagTtl( Snapshot snapshot, Duration tagTimeRetained, LocalDateTime tagCreateTime) { return new Tag( @@ -201,4 +181,33 @@ public boolean equals(Object o) { return Objects.equals(tagCreateTime, that.tagCreateTime) && Objects.equals(tagTimeRetained, that.tagTimeRetained); } + + // =================== Utils for reading ========================= + + public static Tag fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Tag.class); + } + + public static Tag fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static Tag tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { + try { + Tag tag = TAG_CACHE.getIfPresent(path); + if (tag == null) { + tag = fromJson(fileIO.readFileUtf8(path)); + TAG_CACHE.put(path, tag); + } + return tag; + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java new file mode 100644 index 000000000000..a4d37f58944b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/MetaCacheManager.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.tag.Tag; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; + +/** Cache for {@link Snapshot} and {@link Tag} and {@link TableSchema}. */ +public class MetaCacheManager { + + public static final Cache SNAPSHOT_CACHE = + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofMinutes(10)) + .maximumSize(300) + .executor(Runnable::run) + .build(); + + public static final Cache TAG_CACHE = + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofMinutes(10)) + .maximumSize(100) + .executor(Runnable::run) + .build(); + + public static final Cache SCHEMA_CACHE = + Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofMinutes(10)) + .maximumSize(100) + .executor(Runnable::run) + .build(); + + public static void invalidateCacheForPrefix(Path tablePath) { + String path = tablePath.toString(); + invalidateCacheForPrefix(SNAPSHOT_CACHE, path); + invalidateCacheForPrefix(TAG_CACHE, path); + invalidateCacheForPrefix(SCHEMA_CACHE, path); + } + + private static void invalidateCacheForPrefix(Cache cache, String tablePath) { + List keys = + cache.asMap().keySet().stream() + .filter(key -> key.toString().startsWith(tablePath)) + .collect(Collectors.toList()); + cache.invalidateAll(keys); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 5902d4c84cf5..f750ca44151a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -126,14 +126,7 @@ public Snapshot snapshot(long snapshotId) { } public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { - try { - Path snapshotPath = snapshotPath(snapshotId); - return Snapshot.fromJson(fileIO.readFileUtf8(snapshotPath)); - } catch (FileNotFoundException fileNotFoundException) { - throw fileNotFoundException; - } catch (IOException ioException) { - throw new RuntimeException(ioException); - } + return Snapshot.tryFromPath(fileIO, snapshotPath(snapshotId)); } public Changelog changelog(long snapshotId) { @@ -486,11 +479,8 @@ public List safelyGetAllSnapshots() throws IOException { collectSnapshots( path -> { try { - snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path))); - } catch (IOException e) { - if (!(e instanceof FileNotFoundException)) { - throw new RuntimeException(e); - } + snapshots.add(Snapshot.tryFromPath(fileIO, path)); + } catch (FileNotFoundException ignored) { } }, paths); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 65963aafdf6b..ea36cd2a8207 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -341,25 +341,26 @@ public SortedMap> tags() { public SortedMap> tags(Predicate filter) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); + List paths; try { - List paths = tagPaths(path -> true); + paths = tagPaths(path -> true); + } catch (IOException e) { + throw new RuntimeException(e); + } - for (Path path : paths) { - String tagName = path.getName().substring(TAG_PREFIX.length()); + for (Path path : paths) { + String tagName = path.getName().substring(TAG_PREFIX.length()); - if (!filter.test(tagName)) { - continue; - } - // If the tag file is not found, it might be deleted by - // other processes, so just skip this tag - try { - Snapshot snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); - tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); - } catch (FileNotFoundException ignored) { - } + if (!filter.test(tagName)) { + continue; + } + // If the tag file is not found, it might be deleted by + // other processes, so just skip this tag + try { + Snapshot snapshot = Snapshot.tryFromPath(fileIO, path); + tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); + } catch (FileNotFoundException ignored) { } - } catch (IOException e) { - throw new RuntimeException(e); } return tags; } @@ -371,9 +372,9 @@ public List> tagObjects() { List> tags = new ArrayList<>(); for (Path path : paths) { String tagName = path.getName().substring(TAG_PREFIX.length()); - Tag tag = Tag.safelyFromPath(fileIO, path); - if (tag != null) { - tags.add(Pair.of(tag, tagName)); + try { + tags.add(Pair.of(Tag.tryFromPath(fileIO, path), tagName)); + } catch (FileNotFoundException ignored) { } } return tags; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 4d8408955d38..75e284a68c3a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1193,7 +1193,7 @@ public void testCreateBranch() throws Exception { SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath, "test-branch"); TableSchema branchSchema = - SchemaManager.fromPath(new TraceableFileIO(), schemaManager.toSchemaPath(0)); + TableSchema.fromPath(new TraceableFileIO(), schemaManager.toSchemaPath(0)); TableSchema schema0 = schemaManager.schema(0); assertThat(branchSchema.equals(schema0)).isTrue(); } @@ -1344,7 +1344,7 @@ public void testFastForward() throws Exception { // verify schema in branch1 and main branch is same SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); TableSchema branchSchema = - SchemaManager.fromPath( + TableSchema.fromPath( new TraceableFileIO(), schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0)); TableSchema schema0 = schemaManager.schema(0);