Skip to content

Commit

Permalink
[core] Introduce SYSTEM_TABLE_LOADERS to SystemTableLoader (#3831)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jul 29, 2024
1 parent 775c086 commit 405779a
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
String type = splits[1];
FileStoreTable originTable =
getDataTable(new Identifier(identifier.getDatabaseName(), tableName));
Table table = SystemTableLoader.load(type, fileIO, originTable);
Table table = SystemTableLoader.load(type, originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand Down Expand Up @@ -62,7 +63,7 @@ public class AggregationFieldsTable implements ReadonlyTable {

private static final long serialVersionUID = 1L;

public static final String AGGREGATION = "aggregation_fields";
public static final String AGGREGATION_FIELDS = "aggregation_fields";

public static final RowType TABLE_TYPE =
new RowType(
Expand All @@ -77,14 +78,18 @@ public class AggregationFieldsTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;

public AggregationFieldsTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location());
}

public AggregationFieldsTable(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
}

@Override
public String name() {
return location.getName() + SYSTEM_TABLE_SPLITTER + AGGREGATION;
return location.getName() + SYSTEM_TABLE_SPLITTER + AGGREGATION_FIELDS;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public class BranchesTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;

public BranchesTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location());
}

public BranchesTable(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class ConsumersTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;

public ConsumersTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location());
}

public ConsumersTable(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand Down Expand Up @@ -69,6 +70,10 @@ public class OptionsTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;

public OptionsTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location());
}

public OptionsTable(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand Down Expand Up @@ -83,6 +84,10 @@ public class SchemasTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;

public SchemasTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location());
}

public SchemasTable(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public class SnapshotsTable implements ReadonlyTable {

private final FileStoreTable dataTable;

public SnapshotsTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location(), dataTable);
}

public SnapshotsTable(FileIO fileIO, Path location, FileStoreTable dataTable) {
this.fileIO = fileIO;
this.location = location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class StatisticTable implements ReadonlyTable {

private final FileStoreTable dataTable;

public StatisticTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location(), dataTable);
}

public StatisticTable(FileIO fileIO, Path location, FileStoreTable dataTable) {
this.fileIO = fileIO;
this.location = location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION;
import static org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION_FIELDS;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
import static org.apache.paimon.table.system.BranchesTable.BRANCHES;
Expand All @@ -55,39 +60,30 @@
/** Loader to load system {@link Table}s. */
public class SystemTableLoader {

public static final Map<String, Function<FileStoreTable, Table>> SYSTEM_TABLE_LOADERS =
new ImmutableMap.Builder<String, Function<FileStoreTable, Table>>()
.put(MANIFESTS, ManifestsTable::new)
.put(SNAPSHOTS, SnapshotsTable::new)
.put(OPTIONS, OptionsTable::new)
.put(SCHEMAS, SchemasTable::new)
.put(PARTITIONS, PartitionsTable::new)
.put(AUDIT_LOG, AuditLogTable::new)
.put(FILES, FilesTable::new)
.put(TAGS, TagsTable::new)
.put(BRANCHES, BranchesTable::new)
.put(CONSUMERS, ConsumersTable::new)
.put(READ_OPTIMIZED, ReadOptimizedTable::new)
.put(AGGREGATION_FIELDS, AggregationFieldsTable::new)
.put(STATISTICS, StatisticTable::new)
.build();

public static final List<String> SYSTEM_TABLES = new ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());

@Nullable
public static Table load(String type, FileIO fileIO, FileStoreTable dataTable) {
Path location = dataTable.location();
switch (type.toLowerCase()) {
case MANIFESTS:
return new ManifestsTable(dataTable);
case SNAPSHOTS:
return new SnapshotsTable(fileIO, location, dataTable);
case OPTIONS:
return new OptionsTable(fileIO, location);
case SCHEMAS:
return new SchemasTable(fileIO, location);
case PARTITIONS:
return new PartitionsTable(dataTable);
case AUDIT_LOG:
return new AuditLogTable(dataTable);
case FILES:
return new FilesTable(dataTable);
case TAGS:
return new TagsTable(fileIO, location);
case BRANCHES:
return new BranchesTable(fileIO, location);
case CONSUMERS:
return new ConsumersTable(fileIO, location);
case READ_OPTIMIZED:
return new ReadOptimizedTable(dataTable);
case AGGREGATION:
return new AggregationFieldsTable(fileIO, location);
case STATISTICS:
return new StatisticTable(fileIO, location, dataTable);
default:
return null;
}
public static Table load(String type, FileStoreTable dataTable) {
return Optional.ofNullable(SYSTEM_TABLE_LOADERS.get(type.toLowerCase()))
.map(f -> f.apply(dataTable))
.orElse(null);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand Down Expand Up @@ -82,6 +83,10 @@ public class TagsTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;

public TagsTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location());
}

public TagsTable(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
Expand Down

0 comments on commit 405779a

Please sign in to comment.