Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce CachingCatalog #3829

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@
<td>Boolean</td>
<td>Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog.</td>
</tr>
<tr>
<td><h5>cache-enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Controls whether the catalog will cache table entries upon load.</td>
</tr>
<tr>
<td><h5>cache.expiration-interval</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>Controls the duration for which entries in the catalog are cached.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ public class CatalogOptions {
.defaultValue(2)
.withDescription("Configure the size of the connection pool.");

public static final ConfigOption<Boolean> CACHE_ENABLED =
key("cache-enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Controls whether the catalog will cache table entries upon load.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entries should be more detailed. CacheCatalog caches tables and databases.


public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
key("cache.expiration-interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Controls the duration for which entries in the catalog are cached.");

public static final ConfigOption<String> LINEAGE_META =
key("lineage-meta")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -58,17 +57,12 @@
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {

public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
protected static final String DB_LOCATION_PROP = "location";

protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
Expand All @@ -86,8 +80,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.fileIO = fileIO;
this.lineageMetaFactory =
findAndCreateLineageMeta(options, AbstractCatalog.class.getClassLoader());
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.tableDefaultOptions = Catalog.tableDefaultOptions(options.toMap());
this.catalogOptions = options;
}

Expand Down Expand Up @@ -445,12 +438,12 @@ protected void assertMainBranch(String branchName) {
}
}

private static boolean isSpecifiedSystemTable(Identifier identifier) {
public static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
&& !getOriginalIdentifierAndBranch(identifier).isPresent();
}

protected boolean isSystemTable(Identifier identifier) {
protected static boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}

Expand All @@ -463,11 +456,11 @@ protected void checkNotSystemTable(Identifier identifier, String method) {
}
}

public void copyTableDefaultOptions(Map<String, String> options) {
private void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}

private String[] tableAndSystemName(Identifier identifier) {
public static String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(), SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
throw new IllegalArgumentException(
Expand All @@ -493,7 +486,7 @@ public static Path newDatabasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}

private boolean isSystemDatabase(String database) {
public static boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}

Expand All @@ -504,30 +497,9 @@ protected void checkNotSystemDatabase(String database) {
}
}

/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, String... names) {
validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
}

/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, List<String> names) {
if (caseSensitive) {
return;
}
List<String> illegalNames =
names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList());
checkArgument(
illegalNames.isEmpty(),
String.format(
"%s name %s cannot contain upper case in the catalog.",
type, illegalNames));
}

protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
Catalog.validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
Catalog.validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
}

private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
Expand All @@ -545,7 +517,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
}

protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
}

private void validateAutoCreateClose(Map<String, String> options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;

import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;

/** A {@link Catalog} to cache databases and tables and manifests. */
public class CachingCatalog extends DelegateCatalog {

private static final Logger LOG = LoggerFactory.getLogger(CachingCatalog.class);

protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;

public CachingCatalog(Catalog wrapped) {
this(wrapped, CACHE_EXPIRATION_INTERVAL_MS.defaultValue());
}

public CachingCatalog(Catalog wrapped, Duration expirationInterval) {
this(wrapped, expirationInterval, Ticker.systemTicker());
}

public CachingCatalog(Catalog wrapped, Duration expirationInterval, Ticker ticker) {
super(wrapped);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
throw new IllegalArgumentException(
"When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
}

this.databaseCache =
Caffeine.newBuilder()
.softValues()
.executor(Runnable::run)
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
this.tableCache =
Caffeine.newBuilder()
.softValues()
.removalListener(new TableInvalidatingRemovalListener())
.executor(Runnable::run)
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
}

@Override
public Map<String, String> loadDatabaseProperties(String databaseName)
throws DatabaseNotExistException {
Map<String, String> properties = databaseCache.getIfPresent(databaseName);
if (properties != null) {
return properties;
}

properties = super.loadDatabaseProperties(databaseName);
databaseCache.put(databaseName, properties);
return properties;
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
super.dropDatabase(name, ignoreIfNotExists, cascade);
databaseCache.invalidate(name);
if (cascade) {
List<Identifier> tables = new ArrayList<>();
for (Identifier identifier : tableCache.asMap().keySet()) {
if (identifier.getDatabaseName().equals(name)) {
tables.add(identifier);
}
}
tables.forEach(tableCache::invalidate);
}
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
super.dropTable(identifier, ignoreIfNotExists);
invalidateTable(identifier);
}

@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
super.renameTable(fromTable, toTable, ignoreIfNotExists);
invalidateTable(fromTable);
}

@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
super.alterTable(identifier, changes, ignoreIfNotExists);
invalidateTable(identifier);
}

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
Table table = tableCache.getIfPresent(identifier);
if (table != null) {
return table;
}

if (isSpecifiedSystemTable(identifier)) {
String[] splits = tableAndSystemName(identifier);
String tableName = splits[0];
String type = splits[1];

Identifier originIdentifier =
Identifier.create(identifier.getDatabaseName(), tableName);
Table originTable = tableCache.getIfPresent(originIdentifier);
if (originTable == null) {
originTable = wrapped.getTable(originIdentifier);
tableCache.put(originIdentifier, originTable);
}
table = SystemTableLoader.load(type, (FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
tableCache.put(identifier, table);
return table;
}

table = wrapped.getTable(identifier);
tableCache.put(identifier, table);
return table;
}

private class TableInvalidatingRemovalListener implements RemovalListener<Identifier, Table> {
@Override
public void onRemoval(Identifier identifier, Table table, @NonNull RemovalCause cause) {
LOG.debug("Evicted {} from the table cache ({})", identifier, cause);
if (RemovalCause.EXPIRED.equals(cause)) {
tryInvalidateSysTables(identifier);
}
}
}

@Override
public void invalidateTable(Identifier identifier) {
tableCache.invalidate(identifier);
tryInvalidateSysTables(identifier);
}

private void tryInvalidateSysTables(Identifier identifier) {
if (!isSpecifiedSystemTable(identifier)) {
tableCache.invalidateAll(allSystemTables(identifier));
}
}

private static Iterable<Identifier> allSystemTables(Identifier ident) {
List<Identifier> tables = new ArrayList<>();
for (String type : SYSTEM_TABLES) {
tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type));
}
return tables;
}
}
Loading
Loading