Skip to content

Commit

Permalink
[core] Make AbstractFileStoreTable package private (apache#2779)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Jan 24, 2024
1 parent b8a88cc commit 4945781
Show file tree
Hide file tree
Showing 23 changed files with 73 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
Expand Down Expand Up @@ -127,7 +126,7 @@ public Map<String, String> loadDatabaseProperties(String name)
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
Table table = getTable(identifier);
AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table;
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
commit.dropPartitions(
Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER);
Expand Down Expand Up @@ -446,8 +445,6 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
} else if (change instanceof SchemaChange.RenameColumn) {
SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) change;
fieldNames.add(rename.newName());
} else {
// do nothing
}
}
validateFieldNameCaseInsensitive(fieldNames);
Expand All @@ -459,7 +456,7 @@ private void validateFieldNameCaseInsensitive(List<String> fieldNames) {

private void validateAutoCreateClose(Map<String, String> options) {
checkArgument(
!Boolean.valueOf(
!Boolean.parseBoolean(
options.getOrDefault(
CoreOptions.AUTO_CREATE.key(),
CoreOptions.AUTO_CREATE.defaultValue().toString())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
Expand Down Expand Up @@ -78,12 +78,11 @@ public class GlobalIndexAssigner implements Serializable, Closeable {

private static final String INDEX_NAME = "keyIndex";

private final AbstractFileStoreTable table;
private final FileStoreTable table;

private transient IOManager ioManager;

private transient int bucketIndex;
private transient ProjectToRowFunction setPartition;
private transient boolean bootstrap;
private transient BinaryExternalSortBuffer bootstrapKeys;
private transient RowBuffer bootstrapRecords;
Expand All @@ -103,7 +102,7 @@ public class GlobalIndexAssigner implements Serializable, Closeable {
private transient ExistingProcessor existingProcessor;

public GlobalIndexAssigner(Table table) {
this.table = (AbstractFileStoreTable) table;
this.table = (FileStoreTable) table;
}

// ================== Start Public API ===================
Expand All @@ -122,7 +121,8 @@ public void open(

RowType bootstrapType = IndexBootstrap.bootstrapType(table.schema());
this.bucketIndex = bootstrapType.getFieldCount() - 1;
this.setPartition = new ProjectToRowFunction(table.rowType(), table.partitionKeys());
ProjectToRowFunction setPartition =
new ProjectToRowFunction(table.rowType(), table.partitionKeys());

CoreOptions coreOptions = table.coreOptions();
this.targetBucketRowNumber = (int) coreOptions.dynamicBucketTargetRowNum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
Expand Down Expand Up @@ -104,14 +104,12 @@ private static DataFileMeta constructFileMeta(
try {
FieldStatsCollector.Factory[] factories =
StatsCollectorFactories.createStatsFactories(
((AbstractFileStoreTable) table).coreOptions(),
((FileStoreTable) table).coreOptions(),
table.rowType().getFieldNames());

TableStatsExtractor tableStatsExtractor =
FileFormat.getFileFormat(
((AbstractFileStoreTable) table)
.coreOptions()
.toConfiguration(),
((FileStoreTable) table).coreOptions().toConfiguration(),
format)
.createStatsExtractor(table.rowType(), factories)
.orElseThrow(
Expand Down Expand Up @@ -167,7 +165,7 @@ private static DataFileMeta constructFileMeta(
stats,
0,
0,
((AbstractFileStoreTable) table).schema().id());
((FileStoreTable) table).schema().id());
}

public static BinaryRow writePartitionValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Abstract {@link FileStoreTable}. */
public abstract class AbstractFileStoreTable implements FileStoreTable {
abstract class AbstractFileStoreTable implements FileStoreTable {

private static final long serialVersionUID = 1L;

Expand All @@ -81,7 +81,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
protected final TableSchema tableSchema;
protected final CatalogEnvironment catalogEnvironment;

public AbstractFileStoreTable(
protected AbstractFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
Expand Down Expand Up @@ -165,11 +165,9 @@ public InnerStreamTableScan newStreamScan() {
DefaultValueAssigner.create(tableSchema));
}

public abstract SplitGenerator splitGenerator();
protected abstract SplitGenerator splitGenerator();

public abstract boolean supportStreamingReadOverwrite();

public abstract BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer();
protected abstract BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer();

protected abstract FileStoreTable copy(TableSchema newTableSchema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public AppendOnlyFileStore store() {
}

@Override
public SplitGenerator splitGenerator() {
protected SplitGenerator splitGenerator() {
return new AppendOnlySplitGenerator(
store().options().splitTargetSize(),
store().options().splitOpenFileCost(),
Expand All @@ -106,7 +106,7 @@ public boolean supportStreamingReadOverwrite() {
}

@Override
public BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
protected BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
return (scan, predicate) -> ((AppendOnlyFileStoreScan) scan).withFilter(predicate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,6 @@ default Optional<String> comment() {
default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) {
return dataFileMeta.valueStats();
}

boolean supportStreamingReadOverwrite();
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public KeyValueFileStore store() {
}

@Override
public SplitGenerator splitGenerator() {
protected SplitGenerator splitGenerator() {
return new MergeTreeSplitGenerator(
store().newKeyComparator(),
store().options().splitTargetSize(),
Expand All @@ -130,7 +130,7 @@ public boolean supportStreamingReadOverwrite() {
}

@Override
public BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
protected BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
return (scan, predicate) -> {
// currently we can only perform filter push down on keys
// consider this case:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
Expand Down Expand Up @@ -56,10 +55,10 @@ public class ReadOptimizedTable implements DataTable, ReadonlyTable {

public static final String READ_OPTIMIZED = "ro";

private final AbstractFileStoreTable dataTable;
private final FileStoreTable dataTable;

public ReadOptimizedTable(FileStoreTable dataTable) {
this.dataTable = (AbstractFileStoreTable) dataTable;
this.dataTable = dataTable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

Expand All @@ -42,7 +42,7 @@ public abstract class PrimaryKeyTableTestBase {

@TempDir protected java.nio.file.Path tempPath;

protected AbstractFileStoreTable table;
protected FileStoreTable table;
protected String commitUser;

@BeforeEach
Expand All @@ -63,7 +63,7 @@ public void beforeEachBase() throws Exception {
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (AbstractFileStoreTable) catalog.getTable(identifier);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
Expand Down Expand Up @@ -255,6 +254,6 @@ private PartitionExpire newExpire() {
options.put(PARTITION_EXPIRATION_TIME.key(), "2 d");
options.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "1 d");
options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
return ((AbstractFileStoreTable) table.copy(options)).store().newPartitionExpire("");
return table.copy(options).store().newPartitionExpire("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void testPartitionStats() throws Exception {
GenericRow.of(1, 3, 1),
GenericRow.of(2, 1, 1));

AbstractFileStoreTable storeTable = (AbstractFileStoreTable) table;
FileStoreTable storeTable = (FileStoreTable) table;
AbstractFileStore<?> store = (AbstractFileStore<?>) storeTable.store();
String manifestListFile = storeTable.snapshotManager().latestSnapshot().deltaManifestList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testIndexFileExpirationWithTag() throws Exception {
assertThat(indexFileSize()).isEqualTo(5);
assertThat(indexManifestSize()).isEqualTo(3);

TagManager tagManager = new TagManager(LocalFileIO.create(), table.path);
TagManager tagManager = new TagManager(LocalFileIO.create(), table.location());
checkIndexFiles(tagManager.taggedSnapshot("tag3"));
checkIndexFiles(tagManager.taggedSnapshot("tag5"));
}
Expand All @@ -133,7 +133,7 @@ public void testIndexFileExpirationWhenDeletingTag() throws Exception {
expire.expireUntil(1, 7);
table.deleteTag("tag3");

TagManager tagManager = new TagManager(LocalFileIO.create(), table.path);
TagManager tagManager = new TagManager(LocalFileIO.create(), table.location());
checkIndexFiles(7);
checkIndexFiles(tagManager.taggedSnapshot("tag5"));
assertThat(indexFileSize()).isEqualTo(4);
Expand Down Expand Up @@ -241,11 +241,12 @@ private void checkIndexFiles(Snapshot snapshot) {
}

private long indexFileSize() throws IOException {
return LocalFileIO.create().listStatus(new Path(table.path, "index")).length;
return LocalFileIO.create().listStatus(new Path(table.location(), "index")).length;
}

private long indexManifestSize() throws IOException {
return Arrays.stream(LocalFileIO.create().listStatus(new Path(table.path, "manifest")))
return Arrays.stream(
LocalFileIO.create().listStatus(new Path(table.location(), "manifest")))
.filter(s -> s.getPath().getName().startsWith("index-"))
.count();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -652,8 +651,8 @@ public void testWaterMarkSyncTable(String format) throws Exception {
syncTableActionBuilder(kafkaConfig).withTableConfig(config).build();
runActionWithDefaultEnv(action);

AbstractFileStoreTable table =
(AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
FileStoreTable table =
(FileStoreTable) catalog.getTable(new Identifier(database, tableName));
while (true) {
if (table.snapshotManager().snapshotCount() > 0
&& table.snapshotManager().latestSnapshot().watermark()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.pulsar;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -220,8 +219,8 @@ public void testWaterMarkSyncTable() throws Exception {
.build();
runActionWithDefaultEnv(action);

AbstractFileStoreTable table =
(AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName));
FileStoreTable table =
(FileStoreTable) catalog.getTable(new Identifier(database, tableName));
while (true) {
if (table.snapshotManager().snapshotCount() > 0
&& table.snapshotManager().latestSnapshot().watermark()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;

import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;

Expand Down Expand Up @@ -49,7 +48,7 @@ public DropPartitionAction(

this.partitions = partitions;

AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table;
FileStoreTable fileStoreTable = (FileStoreTable) table;
this.commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;

Expand All @@ -47,7 +47,7 @@ public FlinkTableSink(
@Override
public void executeTruncation() {
FileStoreCommit commit =
((AbstractFileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
commit.purgeTable(identifier);
}
Expand Down
Loading

0 comments on commit 4945781

Please sign in to comment.