Skip to content

Commit

Permalink
[core] Add cache for snapshots and tags and schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 21, 2024
1 parent 187825a commit 3701edd
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 109 deletions.
65 changes: 39 additions & 26 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import static org.apache.paimon.utils.MetaCacheManager.SNAPSHOT_CACHE;

/**
* This file is the entrance to all data committed at some specific time point.
*
Expand All @@ -65,7 +64,6 @@
@Public
@JsonIgnoreProperties(ignoreUnknown = true)
public class Snapshot {
private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class);

public static final long FIRST_SNAPSHOT_ID = 1;

Expand Down Expand Up @@ -355,28 +353,6 @@ public String toJson() {
return JsonSerdeUtil.toJson(this);
}

public static Snapshot fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Snapshot.class);
}

public static Snapshot fromPath(FileIO fileIO, Path path) {
try {
return Snapshot.fromJson(fileIO.readFileUtf8(path));
} catch (FileNotFoundException e) {
String errorMessage =
String.format(
"Snapshot file %s does not exist. "
+ "It might have been expired by other jobs operating on this table. "
+ "In this case, you can avoid concurrent modification issues by configuring "
+ "write-only = true and use a dedicated compaction job, or configuring "
+ "different expiration thresholds for different jobs.",
path);
throw new RuntimeException(errorMessage, e);
} catch (IOException e) {
throw new RuntimeException("Fails to read snapshot from path " + path, e);
}
}

@Override
public int hashCode() {
return Objects.hash(
Expand Down Expand Up @@ -437,4 +413,41 @@ public enum CommitKind {
/** Collect statistics. */
ANALYZE
}

// =================== Utils for reading =========================

public static Snapshot fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Snapshot.class);
}

public static Snapshot fromPath(FileIO fileIO, Path path) {
try {
return tryFromPath(fileIO, path);
} catch (FileNotFoundException e) {
String errorMessage =
String.format(
"Snapshot file %s does not exist. "
+ "It might have been expired by other jobs operating on this table. "
+ "In this case, you can avoid concurrent modification issues by configuring "
+ "write-only = true and use a dedicated compaction job, or configuring "
+ "different expiration thresholds for different jobs.",
path);
throw new RuntimeException(errorMessage, e);
}
}

public static Snapshot tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
try {
Snapshot snapshot = SNAPSHOT_CACHE.getIfPresent(path);
if (snapshot == null) {
snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path));
SNAPSHOT_CACHE.put(path, snapshot);
}
return snapshot;
} catch (FileNotFoundException e) {
throw e;
} catch (IOException e) {
throw new RuntimeException("Fails to read snapshot from path " + path, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
Expand Down Expand Up @@ -80,6 +79,7 @@
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.MetaCacheManager.invalidateCacheForPrefix;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;

Expand Down Expand Up @@ -250,6 +250,7 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws

boolean success = commit(newSchema);
if (success) {
invalidateCacheForPrefix(tableRoot);
return newSchema;
}
}
Expand Down Expand Up @@ -769,11 +770,7 @@ boolean commit(TableSchema newSchema) throws Exception {

/** Read schema for schema id. */
public TableSchema schema(long id) {
try {
return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return TableSchema.fromPath(fileIO, toSchemaPath(id));
}

/** Check if a schema exists. */
Expand All @@ -789,14 +786,6 @@ public boolean schemaExists(long id) {
}
}

public static TableSchema fromPath(FileIO fileIO, Path path) {
try {
return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(path), TableSchema.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private String branchPath() {
return BranchManager.branchPath(tableRoot, branch);
}
Expand Down
45 changes: 32 additions & 13 deletions paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -40,6 +42,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.utils.MetaCacheManager.SCHEMA_CACHE;

/**
* Schema of a table. Unlike schema, it has more information than {@link Schema}, including schemaId
Expand Down Expand Up @@ -296,19 +299,6 @@ public TableSchema copy(Map<String, String> newOptions) {
timeMillis);
}

public static TableSchema fromJson(String json) {
return JsonSerdeUtil.fromJson(json, TableSchema.class);
}

public static TableSchema fromPath(FileIO fileIO, Path path) {
try {
String json = fileIO.readFileUtf8(path);
return TableSchema.fromJson(json);
} catch (IOException e) {
throw new RuntimeException("Fails to read schema from path " + path, e);
}
}

@Override
public String toString() {
return JsonSerdeUtil.toJson(this);
Expand Down Expand Up @@ -341,4 +331,33 @@ public int hashCode() {
public static List<DataField> newFields(RowType rowType) {
return rowType.getFields();
}

// =================== Utils for reading =========================

public static TableSchema fromJson(String json) {
return JsonSerdeUtil.fromJson(json, TableSchema.class);
}

public static TableSchema fromPath(FileIO fileIO, Path path) {
try {
return tryFromPath(fileIO, path);
} catch (FileNotFoundException e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public static TableSchema tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
try {
TableSchema schema = SCHEMA_CACHE.getIfPresent(path);
if (schema == null) {
schema = fromJson(fileIO.readFileUtf8(path));
SCHEMA_CACHE.put(path, schema);
}
return schema;
} catch (FileNotFoundException e) {
throw e;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
55 changes: 32 additions & 23 deletions paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Objects;

import static org.apache.paimon.utils.MetaCacheManager.TAG_CACHE;

/** Snapshot with tagCreateTime and tagTimeRetained. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Tag extends Snapshot {
Expand Down Expand Up @@ -113,29 +116,6 @@ public String toJson() {
return JsonSerdeUtil.toJson(this);
}

public static Tag fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Tag.class);
}

public static Tag fromPath(FileIO fileIO, Path path) {
try {
String json = fileIO.readFileUtf8(path);
return Tag.fromJson(json);
} catch (IOException e) {
throw new RuntimeException("Fails to read tag from path " + path, e);
}
}

@Nullable
public static Tag safelyFromPath(FileIO fileIO, Path path) throws IOException {
try {
String json = fileIO.readFileUtf8(path);
return Tag.fromJson(json);
} catch (FileNotFoundException e) {
return null;
}
}

public static Tag fromSnapshotAndTagTtl(
Snapshot snapshot, Duration tagTimeRetained, LocalDateTime tagCreateTime) {
return new Tag(
Expand Down Expand Up @@ -201,4 +181,33 @@ public boolean equals(Object o) {
return Objects.equals(tagCreateTime, that.tagCreateTime)
&& Objects.equals(tagTimeRetained, that.tagTimeRetained);
}

// =================== Utils for reading =========================

public static Tag fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Tag.class);
}

public static Tag fromPath(FileIO fileIO, Path path) {
try {
return tryFromPath(fileIO, path);
} catch (FileNotFoundException e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public static Tag tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
try {
Tag tag = TAG_CACHE.getIfPresent(path);
if (tag == null) {
tag = fromJson(fileIO.readFileUtf8(path));
TAG_CACHE.put(path, tag);
}
return tag;
} catch (FileNotFoundException e) {
throw e;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.tag.Tag;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;

import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;

/** Cache for {@link Snapshot} and {@link Tag} and {@link TableSchema}. */
public class MetaCacheManager {

public static final Cache<Path, Snapshot> SNAPSHOT_CACHE =
Caffeine.newBuilder()
.softValues()
.expireAfterAccess(Duration.ofMinutes(10))
.maximumSize(300)
.executor(Runnable::run)
.build();

public static final Cache<Path, Tag> TAG_CACHE =
Caffeine.newBuilder()
.softValues()
.expireAfterAccess(Duration.ofMinutes(10))
.maximumSize(100)
.executor(Runnable::run)
.build();

public static final Cache<Path, TableSchema> SCHEMA_CACHE =
Caffeine.newBuilder()
.softValues()
.expireAfterAccess(Duration.ofMinutes(10))
.maximumSize(100)
.executor(Runnable::run)
.build();

public static void invalidateCacheForPrefix(Path tablePath) {
String path = tablePath.toString();
invalidateCacheForPrefix(SNAPSHOT_CACHE, path);
invalidateCacheForPrefix(TAG_CACHE, path);
invalidateCacheForPrefix(SCHEMA_CACHE, path);
}

private static void invalidateCacheForPrefix(Cache<Path, ?> cache, String tablePath) {
List<Path> keys =
cache.asMap().keySet().stream()
.filter(key -> key.toString().startsWith(tablePath))
.collect(Collectors.toList());
cache.invalidateAll(keys);
}
}
Loading

0 comments on commit 3701edd

Please sign in to comment.