Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Aug 1, 2024
1 parent bba6681 commit c70a514
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 93 deletions.
16 changes: 14 additions & 2 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,23 @@
<td>Controls the duration for which databases and tables in the catalog are cached.</td>
</tr>
<tr>
<td><h5>cache.manifest-max-memory</h5></td>
<td style="word-wrap: break-word;">0 bytes</td>
<td><h5>cache.manifest.max-memory</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
<td>Controls the maximum memory to cache manifest content.</td>
</tr>
<tr>
<td><h5>cache.manifest.small-file-memory</h5></td>
<td style="word-wrap: break-word;">128 mb</td>
<td>MemorySize</td>
<td>Controls the cache memory to cache small manifest files.</td>
</tr>
<tr>
<td><h5>cache.manifest.small-file-threshold</h5></td>
<td style="word-wrap: break-word;">512 kb</td>
<td>MemorySize</td>
<td>Controls the threshold of small manifest file.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,22 @@ public class CatalogOptions {
.withDescription(
"Controls the duration for which databases and tables in the catalog are cached.");

public static final ConfigOption<MemorySize> CACHE_MANIFEST_SMALL_FILE_MEMORY =
key("cache.manifest.small-file-memory")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription("Controls the cache memory to cache small manifest files.");

public static final ConfigOption<MemorySize> CACHE_MANIFEST_SMALL_FILE_THRESHOLD =
key("cache.manifest.small-file-threshold")
.memoryType()
.defaultValue(MemorySize.ofKibiBytes(512))
.withDescription("Controls the threshold of small manifest file.");

public static final ConfigOption<MemorySize> CACHE_MANIFEST_MAX_MEMORY =
key("cache.manifest-max-memory")
key("cache.manifest.max-memory")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(0))
.noDefaultValue()
.withDescription("Controls the maximum memory to cache manifest content.");

public static final ConfigOption<String> LINEAGE_META =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ protected AbstractFileStore(
this.partitionType = partitionType;
this.catalogEnvironment = catalogEnvironment;
this.writeManifestCache =
SegmentsCache.create(options.pageSize(), options.writeManifestCache());
SegmentsCache.create(
options.pageSize(), options.writeManifestCache(), Long.MAX_VALUE);
}

@Override
Expand Down Expand Up @@ -142,7 +143,11 @@ protected ManifestList.Factory manifestListFactory(boolean forWrite) {

protected IndexManifestFile.Factory indexManifestFileFactory() {
return new IndexManifestFile.Factory(
fileIO, options.manifestFormat(), options.manifestCompression(), pathFactory());
fileIO,
options.manifestFormat(),
options.manifestCompression(),
pathFactory(),
readManifestCache);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.fs.Path;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand All @@ -42,11 +43,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;

/** A {@link Catalog} to cache databases and tables and manifests. */
Expand All @@ -57,25 +62,37 @@ public class CachingCatalog extends DelegateCatalog {
protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
protected final long manifestCacheThreshold;

public CachingCatalog(Catalog wrapped) {
this(
wrapped,
CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
CACHE_MANIFEST_MAX_MEMORY.defaultValue());
CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
}

public CachingCatalog(
Catalog wrapped, Duration expirationInterval, MemorySize manifestMaxMemory) {
this(wrapped, expirationInterval, manifestMaxMemory, Ticker.systemTicker());
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold) {
this(
wrapped,
expirationInterval,
manifestMaxMemory,
manifestCacheThreshold,
Ticker.systemTicker());
}

public CachingCatalog(
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
Ticker ticker) {
super(wrapped);
this.manifestCacheThreshold = manifestCacheThreshold;
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
throw new IllegalArgumentException(
"When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled.");
Expand All @@ -96,7 +113,27 @@ public CachingCatalog(
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory);
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
}

public static Catalog tryToCreate(Catalog catalog, Options options) {
if (!options.get(CACHE_ENABLED)) {
return catalog;
}

MemorySize manifestMaxMemory = options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY);
long manifestThreshold = options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes();
Optional<MemorySize> maxMemory = options.getOptional(CACHE_MANIFEST_MAX_MEMORY);
if (maxMemory.isPresent() && maxMemory.get().compareTo(manifestMaxMemory) > 0) {
// cache all manifest files
manifestMaxMemory = maxMemory.get();
manifestThreshold = Long.MAX_VALUE;
}
return new CachingCatalog(
catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS),
manifestMaxMemory,
manifestThreshold);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.privilege.FileBasedPrivilegeManager;
import org.apache.paimon.privilege.PrivilegeManager;
import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.utils.Preconditions;

import java.io.IOException;
import java.io.UncheckedIOException;

import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;

Expand Down Expand Up @@ -74,27 +69,9 @@ static Catalog createCatalog(CatalogContext options) {

static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) {
Catalog catalog = createUnwrappedCatalog(context, classLoader);

Options options = context.options();
if (options.get(CACHE_ENABLED)) {
catalog =
new CachingCatalog(
catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS),
options.get(CACHE_MANIFEST_MAX_MEMORY));
}

PrivilegeManager privilegeManager =
new FileBasedPrivilegeManager(
catalog.warehouse(),
catalog.fileIO(),
context.options().get(PrivilegedCatalog.USER),
context.options().get(PrivilegedCatalog.PASSWORD));
if (privilegeManager.privilegeEnabled()) {
catalog = new PrivilegedCatalog(catalog, privilegeManager);
}

return catalog;
catalog = CachingCatalog.tryToCreate(catalog, options);
return PrivilegedCatalog.tryToCreate(catalog, options);
}

static Catalog createUnwrappedCatalog(CatalogContext context, ClassLoader classLoader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,26 +197,6 @@ public List<IndexManifestEntry> scanEntries(
return result;
}

public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow partition, int bucket) {
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
return Collections.emptyList();
}
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyList();
}
List<IndexManifestEntry> result = new ArrayList<>();
for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
if (file.indexFile().indexType().equals(indexType)
&& file.partition().equals(partition)
&& file.bucket() == bucket) {
result.add(file);
}
}
return result;
}

public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.VersionedObjectSerializer;

import javax.annotation.Nullable;
Expand All @@ -41,15 +43,16 @@ private IndexManifestFile(
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory) {
PathFactory pathFactory,
@Nullable SegmentsCache<Path> cache) {
super(
fileIO,
new IndexManifestEntrySerializer(),
readerFactory,
writerFactory,
compression,
pathFactory,
null);
cache);
}

/** Write new index files to index manifest. */
Expand All @@ -72,16 +75,19 @@ public static class Factory {
private final FileFormat fileFormat;
private final String compression;
private final FileStorePathFactory pathFactory;
@Nullable private final SegmentsCache<Path> cache;

public Factory(
FileIO fileIO,
FileFormat fileFormat,
String compression,
FileStorePathFactory pathFactory) {
FileStorePathFactory pathFactory,
@Nullable SegmentsCache<Path> cache) {
this.fileIO = fileIO;
this.fileFormat = fileFormat;
this.compression = compression;
this.pathFactory = pathFactory;
this.cache = cache;
}

public IndexManifestFile create() {
Expand All @@ -91,7 +97,8 @@ public IndexManifestFile create() {
fileFormat.createReaderFactory(schema),
fileFormat.createWriterFactory(schema),
compression,
pathFactory.indexManifestFileFactory());
pathFactory.indexManifestFileFactory(),
cache);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -49,6 +50,19 @@ public PrivilegedCatalog(Catalog wrapped, PrivilegeManager privilegeManager) {
this.privilegeManager = privilegeManager;
}

public static Catalog tryToCreate(Catalog catalog, Options options) {
PrivilegeManager privilegeManager =
new FileBasedPrivilegeManager(
catalog.warehouse(),
catalog.fileIO(),
options.get(PrivilegedCatalog.USER),
options.get(PrivilegedCatalog.PASSWORD));
if (privilegeManager.privilegeEnabled()) {
catalog = new PrivilegedCatalog(catalog, privilegeManager);
}
return catalog;
}

public PrivilegeManager privilegeManager() {
return privilegeManager;
}
Expand Down
Loading

0 comments on commit c70a514

Please sign in to comment.