Skip to content

Commit

Permalink
[core] Introduce CachingCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jul 30, 2024
1 parent 76d2520 commit a3e19b4
Show file tree
Hide file tree
Showing 37 changed files with 822 additions and 269 deletions.
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@
<td>Boolean</td>
<td>Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog.</td>
</tr>
<tr>
<td><h5>cache-enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Controls whether the catalog will cache table entries upon load.</td>
</tr>
<tr>
<td><h5>cache.expiration-interval</h5></td>
<td style="word-wrap: break-word;">30 s</td>
<td>Duration</td>
<td>Controls the duration for which entries in the catalog are cached.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ public class CatalogOptions {
.defaultValue(2)
.withDescription("Configure the size of the connection pool.");

public static final ConfigOption<Boolean> CACHE_ENABLED =
key("cache-enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Controls whether the catalog will cache table entries upon load.");

public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
key("cache.expiration-interval")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"Controls the duration for which entries in the catalog are cached.");

public static final ConfigOption<String> LINEAGE_META =
key("lineage-meta")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -58,17 +57,12 @@
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {

public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
protected static final String DB_LOCATION_PROP = "location";

protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
Expand All @@ -86,8 +80,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.fileIO = fileIO;
this.lineageMetaFactory =
findAndCreateLineageMeta(options, AbstractCatalog.class.getClassLoader());
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.tableDefaultOptions = Catalog.tableDefaultOptions(options.toMap());
this.catalogOptions = options;
}

Expand Down Expand Up @@ -445,12 +438,12 @@ protected void assertMainBranch(String branchName) {
}
}

private static boolean isSpecifiedSystemTable(Identifier identifier) {
public static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
&& !getOriginalIdentifierAndBranch(identifier).isPresent();
}

protected boolean isSystemTable(Identifier identifier) {
protected static boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}

Expand All @@ -463,11 +456,11 @@ protected void checkNotSystemTable(Identifier identifier, String method) {
}
}

public void copyTableDefaultOptions(Map<String, String> options) {
private void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}

private String[] tableAndSystemName(Identifier identifier) {
public static String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(), SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
throw new IllegalArgumentException(
Expand All @@ -493,7 +486,7 @@ public static Path newDatabasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}

private boolean isSystemDatabase(String database) {
public static boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}

Expand All @@ -504,30 +497,9 @@ protected void checkNotSystemDatabase(String database) {
}
}

/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, String... names) {
validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
}

/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, List<String> names) {
if (caseSensitive) {
return;
}
List<String> illegalNames =
names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList());
checkArgument(
illegalNames.isEmpty(),
String.format(
"%s name %s cannot contain upper case in the catalog.",
type, illegalNames));
}

protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
Catalog.validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
Catalog.validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
}

private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
Expand All @@ -545,7 +517,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
}

protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
}

private void validateAutoCreateClose(Map<String, String> options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;

import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;

/** A {@link Catalog} to cache databases and tables and manifests. */
public class CachingCatalog extends DelegateCatalog {

private static final Logger LOG = LoggerFactory.getLogger(CachingCatalog.class);

protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;

public CachingCatalog(Catalog wrapped) {
this(wrapped, CACHE_EXPIRATION_INTERVAL_MS.defaultValue());
}

public CachingCatalog(Catalog wrapped, Duration expirationInterval) {
this(wrapped, expirationInterval, Ticker.systemTicker());
}

public CachingCatalog(Catalog wrapped, Duration expirationInterval, Ticker ticker) {
super(wrapped);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
throw new IllegalArgumentException(
"When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
}

this.databaseCache =
Caffeine.newBuilder()
.softValues()
.executor(Runnable::run)
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
this.tableCache =
Caffeine.newBuilder()
.softValues()
.removalListener(new TableInvalidatingRemovalListener())
.executor(Runnable::run)
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
}

@Override
public Map<String, String> loadDatabaseProperties(String databaseName)
throws DatabaseNotExistException {
Map<String, String> properties = databaseCache.getIfPresent(databaseName);
if (properties != null) {
return properties;
}

properties = super.loadDatabaseProperties(databaseName);
databaseCache.put(databaseName, properties);
return properties;
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
super.dropDatabase(name, ignoreIfNotExists, cascade);
databaseCache.invalidate(name);
if (cascade) {
List<Identifier> tables = new ArrayList<>();
for (Identifier identifier : tableCache.asMap().keySet()) {
if (identifier.getDatabaseName().equals(name)) {
tables.add(identifier);
}
}
tables.forEach(tableCache::invalidate);
}
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
super.dropTable(identifier, ignoreIfNotExists);
invalidateTable(identifier);
}

@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
super.renameTable(fromTable, toTable, ignoreIfNotExists);
invalidateTable(fromTable);
}

@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
super.alterTable(identifier, changes, ignoreIfNotExists);
invalidateTable(identifier);
}

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
Table table = tableCache.getIfPresent(identifier);
if (table != null) {
return table;
}

if (isSpecifiedSystemTable(identifier)) {
String[] splits = tableAndSystemName(identifier);
String tableName = splits[0];
String type = splits[1];

Identifier originIdentifier =
Identifier.create(identifier.getDatabaseName(), tableName);
Table originTable = tableCache.getIfPresent(originIdentifier);
if (originTable == null) {
originTable = wrapped.getTable(originIdentifier);
tableCache.put(originIdentifier, originTable);
}
table = SystemTableLoader.load(type, (FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
tableCache.put(identifier, table);
return table;
}

table = wrapped.getTable(identifier);
tableCache.put(identifier, table);
return table;
}

private class TableInvalidatingRemovalListener implements RemovalListener<Identifier, Table> {
@Override
public void onRemoval(
Identifier tableIdentifier, Table table, @NonNull RemovalCause cause) {
LOG.debug("Evicted {} from the table cache ({})", tableIdentifier, cause);
if (RemovalCause.EXPIRED.equals(cause)) {
if (!isSpecifiedSystemTable(tableIdentifier)) {
tableCache.invalidateAll(allSystemTables(tableIdentifier));
}
}
}
}

private void invalidateTable(Identifier identifier) {
tableCache.invalidate(identifier);
tableCache.invalidateAll(allSystemTables(identifier));
}

private static Iterable<Identifier> allSystemTables(Identifier ident) {
List<Identifier> tables = new ArrayList<>();
for (String type : SYSTEM_TABLES) {
tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type));
}
return tables;
}
}
Loading

0 comments on commit a3e19b4

Please sign in to comment.