Skip to content

Commit

Permalink
[core] Introduce listPartitions interface to Catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 5, 2024
1 parent 5a8bde0 commit 2f4c8d0
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
Expand Down Expand Up @@ -198,6 +199,12 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
}
}

@Override
public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
return getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
}

protected abstract void createDatabaseImpl(String name, Map<String, String> properties);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;

import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,9 +64,10 @@ public class CachingCatalog extends DelegateCatalog {

protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;
@Nullable protected final Cache<Identifier, List<PartitionEntry>> partitionCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
private final long cachedPartitionMaxNum;

// partition cache will affect data latency
@Nullable protected final Cache<Identifier, List<PartitionEntry>> partitionCache;

public CachingCatalog(Catalog wrapped) {
this(
Expand Down Expand Up @@ -126,12 +128,13 @@ public CachingCatalog(
.softValues()
.executor(Runnable::run)
.expireAfterAccess(expirationInterval)
.weigher(this::weigh)
.weigher(
(Weigher<Identifier, List<PartitionEntry>>)
(identifier, v) -> v.size())
.maximumWeight(cachedPartitionMaxNum)
.ticker(ticker)
.build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
this.cachedPartitionMaxNum = cachedPartitionMaxNum;
}

public static Catalog tryToCreate(Catalog catalog, Options options) {
Expand Down Expand Up @@ -248,40 +251,19 @@ private void putTableCache(Identifier identifier, Table table) {
tableCache.put(identifier, table);
}

public List<PartitionEntry> getPartitions(Identifier identifier) throws TableNotExistException {
Table table = this.getTable(identifier);
if (partitionCacheEnabled(table)) {
List<PartitionEntry> partitions;
partitions = partitionCache.getIfPresent(identifier);
if (partitions == null || partitions.isEmpty()) {
partitions = this.refreshPartitions(identifier);
}
return partitions;
}
return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
}

public List<PartitionEntry> refreshPartitions(Identifier identifier)
@Override
public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
Table table = this.getTable(identifier);
List<PartitionEntry> partitions =
((FileStoreTable) table).newSnapshotReader().partitionEntries();
if (partitionCacheEnabled(table)
&& partitionCache.asMap().values().stream().mapToInt(List::size).sum()
< this.cachedPartitionMaxNum) {
partitionCache.put(identifier, partitions);
if (partitionCache == null) {
return wrapped.listPartitions(identifier);
}
return partitions;
}

private boolean partitionCacheEnabled(Table table) {
return partitionCache != null
&& table instanceof FileStoreTable
&& !table.partitionKeys().isEmpty();
}

private int weigh(Identifier identifier, List<PartitionEntry> partitions) {
return partitions.size();
List<PartitionEntry> result = partitionCache.getIfPresent(identifier);
if (result == null) {
result = wrapped.listPartitions(identifier);
partitionCache.put(identifier, result);
}
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -290,6 +291,14 @@ void createPartition(Identifier identifier, Map<String, String> partitionSpec)
void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException;

/**
* Get PartitionEntry of all partitions of the table.
*
* @param identifier path of the table to list partitions
* @throws TableNotExistException if the table does not exist
*/
List<PartitionEntry> listPartitions(Identifier identifier) throws TableNotExistException;

/**
* Modify an existing table from a {@link SchemaChange}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -192,6 +193,12 @@ public void dropPartition(Identifier identifier, Map<String, String> partitions)
wrapped.dropPartition(identifier, partitions);
}

@Override
public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
return wrapped.listPartitions(identifier);
}

@Override
public void repairCatalog() {
wrapped.repairCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void testPartitionCache() throws Exception {
Collections.emptyMap(),
"");
catalog.createTable(tableIdent, schema, false);
List<PartitionEntry> partitionEntryList = catalog.getPartitions(tableIdent);
List<PartitionEntry> partitionEntryList = catalog.listPartitions(tableIdent);
assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
List<PartitionEntry> partitionEntryListFromCache =
catalog.partitionCache().getIfPresent(tableIdent);
Expand Down

0 comments on commit 2f4c8d0

Please sign in to comment.