diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 22f1b4ac8db0..89fd52f6cfe6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -30,7 +30,6 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; @@ -127,7 +126,7 @@ public Map loadDatabaseProperties(String name) public void dropPartition(Identifier identifier, Map partitionSpec) throws TableNotExistException { Table table = getTable(identifier); - AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table; + FileStoreTable fileStoreTable = (FileStoreTable) table; FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString()); commit.dropPartitions( Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER); @@ -446,8 +445,6 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c } else if (change instanceof SchemaChange.RenameColumn) { SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) change; fieldNames.add(rename.newName()); - } else { - // do nothing } } validateFieldNameCaseInsensitive(fieldNames); @@ -459,7 +456,7 @@ private void validateFieldNameCaseInsensitive(List fieldNames) { private void validateAutoCreateClose(Map options) { checkArgument( - !Boolean.valueOf( + !Boolean.parseBoolean( options.getOrDefault( CoreOptions.AUTO_CREATE.key(), CoreOptions.AUTO_CREATE.defaultValue().toString())), diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java index f003ffc10e6c..5b4ee8a43c3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java @@ -37,7 +37,7 @@ import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.sort.BinaryExternalSortBuffer; -import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.PartitionKeyExtractor; import org.apache.paimon.table.sink.RowPartitionKeyExtractor; @@ -78,12 +78,11 @@ public class GlobalIndexAssigner implements Serializable, Closeable { private static final String INDEX_NAME = "keyIndex"; - private final AbstractFileStoreTable table; + private final FileStoreTable table; private transient IOManager ioManager; private transient int bucketIndex; - private transient ProjectToRowFunction setPartition; private transient boolean bootstrap; private transient BinaryExternalSortBuffer bootstrapKeys; private transient RowBuffer bootstrapRecords; @@ -103,7 +102,7 @@ public class GlobalIndexAssigner implements Serializable, Closeable { private transient ExistingProcessor existingProcessor; public GlobalIndexAssigner(Table table) { - this.table = (AbstractFileStoreTable) table; + this.table = (FileStoreTable) table; } // ================== Start Public API =================== @@ -122,7 +121,8 @@ public void open( RowType bootstrapType = IndexBootstrap.bootstrapType(table.schema()); this.bucketIndex = bootstrapType.getFieldCount() - 1; - this.setPartition = new ProjectToRowFunction(table.rowType(), table.partitionKeys()); + ProjectToRowFunction setPartition = + new ProjectToRowFunction(table.rowType(), table.partitionKeys()); CoreOptions coreOptions = table.coreOptions(); this.targetBucketRowNumber = (int) coreOptions.dynamicBucketTargetRowNum(); diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 57b4bf8a277c..2b4612229bc1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -33,7 +33,7 @@ import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; -import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; @@ -104,14 +104,12 @@ private static DataFileMeta constructFileMeta( try { FieldStatsCollector.Factory[] factories = StatsCollectorFactories.createStatsFactories( - ((AbstractFileStoreTable) table).coreOptions(), + ((FileStoreTable) table).coreOptions(), table.rowType().getFieldNames()); TableStatsExtractor tableStatsExtractor = FileFormat.getFileFormat( - ((AbstractFileStoreTable) table) - .coreOptions() - .toConfiguration(), + ((FileStoreTable) table).coreOptions().toConfiguration(), format) .createStatsExtractor(table.rowType(), factories) .orElseThrow( @@ -167,7 +165,7 @@ private static DataFileMeta constructFileMeta( stats, 0, 0, - ((AbstractFileStoreTable) table).schema().id()); + ((FileStoreTable) table).schema().id()); } public static BinaryRow writePartitionValue( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 041470055125..9900d9ecf0f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -72,7 +72,7 @@ import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Abstract {@link FileStoreTable}. */ -public abstract class AbstractFileStoreTable implements FileStoreTable { +abstract class AbstractFileStoreTable implements FileStoreTable { private static final long serialVersionUID = 1L; @@ -81,7 +81,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable { protected final TableSchema tableSchema; protected final CatalogEnvironment catalogEnvironment; - public AbstractFileStoreTable( + protected AbstractFileStoreTable( FileIO fileIO, Path path, TableSchema tableSchema, @@ -165,11 +165,9 @@ public InnerStreamTableScan newStreamScan() { DefaultValueAssigner.create(tableSchema)); } - public abstract SplitGenerator splitGenerator(); + protected abstract SplitGenerator splitGenerator(); - public abstract boolean supportStreamingReadOverwrite(); - - public abstract BiConsumer nonPartitionFilterConsumer(); + protected abstract BiConsumer nonPartitionFilterConsumer(); protected abstract FileStoreTable copy(TableSchema newTableSchema); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 0ebcd44aeb11..678453729748 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -89,7 +89,7 @@ public AppendOnlyFileStore store() { } @Override - public SplitGenerator splitGenerator() { + protected SplitGenerator splitGenerator() { return new AppendOnlySplitGenerator( store().options().splitTargetSize(), store().options().splitOpenFileCost(), @@ -106,7 +106,7 @@ public boolean supportStreamingReadOverwrite() { } @Override - public BiConsumer nonPartitionFilterConsumer() { + protected BiConsumer nonPartitionFilterConsumer() { return (scan, predicate) -> ((AppendOnlyFileStoreScan) scan).withFilter(predicate); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index b457f562abc6..b183d6ad63a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -102,4 +102,6 @@ default Optional comment() { default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) { return dataFileMeta.valueStats(); } + + boolean supportStreamingReadOverwrite(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index ef91edf31244..d815aae9fd4d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -117,7 +117,7 @@ public KeyValueFileStore store() { } @Override - public SplitGenerator splitGenerator() { + protected SplitGenerator splitGenerator() { return new MergeTreeSplitGenerator( store().newKeyComparator(), store().options().splitTargetSize(), @@ -130,7 +130,7 @@ public boolean supportStreamingReadOverwrite() { } @Override - public BiConsumer nonPartitionFilterConsumer() { + protected BiConsumer nonPartitionFilterConsumer() { return (scan, predicate) -> { // currently we can only perform filter push down on keys // consider this case: diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 55542373458b..8daff265fad1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -22,7 +22,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.DefaultValueAssigner; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -56,10 +55,10 @@ public class ReadOptimizedTable implements DataTable, ReadonlyTable { public static final String READ_OPTIMIZED = "ro"; - private final AbstractFileStoreTable dataTable; + private final FileStoreTable dataTable; public ReadOptimizedTable(FileStoreTable dataTable) { - this.dataTable = (AbstractFileStoreTable) dataTable; + this.dataTable = dataTable; } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java index cc765786ff16..4c334e5bfa6c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java @@ -22,7 +22,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; -import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.TraceableFileIO; @@ -42,7 +42,7 @@ public abstract class PrimaryKeyTableTestBase { @TempDir protected java.nio.file.Path tempPath; - protected AbstractFileStoreTable table; + protected FileStoreTable table; protected String commitUser; @BeforeEach @@ -63,7 +63,7 @@ public void beforeEachBase() throws Exception { .options(tableOptions().toMap()) .build(); catalog.createTable(identifier, schema, true); - table = (AbstractFileStoreTable) catalog.getTable(identifier); + table = (FileStoreTable) catalog.getTable(identifier); commitUser = UUID.randomUUID().toString(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java index 564d177870df..94dc3cfe46a4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java @@ -26,7 +26,6 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.CommitMessage; @@ -255,6 +254,6 @@ private PartitionExpire newExpire() { options.put(PARTITION_EXPIRATION_TIME.key(), "2 d"); options.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "1 d"); options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd"); - return ((AbstractFileStoreTable) table.copy(options)).store().newPartitionExpire(""); + return table.copy(options).store().newPartitionExpire(""); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java index 23ff1e8227f0..ce98052e5049 100644 --- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java @@ -27,7 +27,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; -import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.DataTypes; @@ -64,7 +64,7 @@ public void testPartitionStats() throws Exception { GenericRow.of(1, 3, 1), GenericRow.of(2, 1, 1)); - AbstractFileStoreTable storeTable = (AbstractFileStoreTable) table; + FileStoreTable storeTable = (FileStoreTable) table; AbstractFileStore store = (AbstractFileStore) storeTable.store(); String manifestListFile = storeTable.snapshotManager().latestSnapshot().deltaManifestList(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java index 98816dd04cce..b7117cf0a600 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java @@ -106,7 +106,7 @@ public void testIndexFileExpirationWithTag() throws Exception { assertThat(indexFileSize()).isEqualTo(5); assertThat(indexManifestSize()).isEqualTo(3); - TagManager tagManager = new TagManager(LocalFileIO.create(), table.path); + TagManager tagManager = new TagManager(LocalFileIO.create(), table.location()); checkIndexFiles(tagManager.taggedSnapshot("tag3")); checkIndexFiles(tagManager.taggedSnapshot("tag5")); } @@ -133,7 +133,7 @@ public void testIndexFileExpirationWhenDeletingTag() throws Exception { expire.expireUntil(1, 7); table.deleteTag("tag3"); - TagManager tagManager = new TagManager(LocalFileIO.create(), table.path); + TagManager tagManager = new TagManager(LocalFileIO.create(), table.location()); checkIndexFiles(7); checkIndexFiles(tagManager.taggedSnapshot("tag5")); assertThat(indexFileSize()).isEqualTo(4); @@ -241,11 +241,12 @@ private void checkIndexFiles(Snapshot snapshot) { } private long indexFileSize() throws IOException { - return LocalFileIO.create().listStatus(new Path(table.path, "index")).length; + return LocalFileIO.create().listStatus(new Path(table.location(), "index")).length; } private long indexManifestSize() throws IOException { - return Arrays.stream(LocalFileIO.create().listStatus(new Path(table.path, "manifest"))) + return Arrays.stream( + LocalFileIO.create().listStatus(new Path(table.location(), "manifest"))) .filter(s -> s.getPath().getName().startsWith("index-")) .count(); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index aabb5d53c6d4..da38b2a0ce7f 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -22,7 +22,6 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.schema.Schema; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -652,8 +651,8 @@ public void testWaterMarkSyncTable(String format) throws Exception { syncTableActionBuilder(kafkaConfig).withTableConfig(config).build(); runActionWithDefaultEnv(action); - AbstractFileStoreTable table = - (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName)); + FileStoreTable table = + (FileStoreTable) catalog.getTable(new Identifier(database, tableName)); while (true) { if (table.snapshotManager().snapshotCount() > 0 && table.snapshotManager().latestSnapshot().watermark() diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java index 43d396008c04..a4fb332d5033 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.action.cdc.pulsar; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -220,8 +219,8 @@ public void testWaterMarkSyncTable() throws Exception { .build(); runActionWithDefaultEnv(action); - AbstractFileStoreTable table = - (AbstractFileStoreTable) catalog.getTable(new Identifier(database, tableName)); + FileStoreTable table = + (FileStoreTable) catalog.getTable(new Identifier(database, tableName)); while (true) { if (table.snapshotManager().snapshotCount() > 0 && table.snapshotManager().latestSnapshot().watermark() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java index a1bd77544e2f..d6a70f375f7f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.action; import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.BatchWriteBuilder; @@ -49,7 +48,7 @@ public DropPartitionAction( this.partitions = partitions; - AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table; + FileStoreTable fileStoreTable = (FileStoreTable) table; this.commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java index 98bb6a69e910..50bc45b752f8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java @@ -20,7 +20,7 @@ import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchWriteBuilder; @@ -47,7 +47,7 @@ public FlinkTableSink( @Override public void executeTruncation() { FileStoreCommit commit = - ((AbstractFileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); + ((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER; commit.purgeTable(identifier); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java index b0b9cf2b948f..9713421fdd6a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java @@ -22,7 +22,7 @@ import org.apache.paimon.index.HashBucketAssigner; import org.apache.paimon.index.SimpleHashBucketAssigner; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.PartitionKeyExtractor; import org.apache.paimon.utils.MathUtils; @@ -42,7 +42,7 @@ public class HashBucketAssignerOperator extends AbstractStreamOperator> extractorFunction; private final boolean overwrite; @@ -57,7 +57,7 @@ public HashBucketAssignerOperator( SerializableFunction> extractorFunction, boolean overwrite) { this.initialCommitUser = commitUser; - this.table = (AbstractFileStoreTable) table; + this.table = (FileStoreTable) table; this.numAssigners = numAssigners; this.extractorFunction = extractorFunction; this.overwrite = overwrite; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java index b628b1cbcdaa..4150b7f3a374 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java @@ -28,8 +28,8 @@ import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.AppendOnlyFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.PrimaryKeyFileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableUtils; @@ -169,7 +169,7 @@ public boolean applyDeleteFilters(List list) { @Override public Optional executeDeletion() { FileStoreCommit commit = - ((AbstractFileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); + ((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER; if (deletePredicate == null) { commit.purgeTable(identifier); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java index 06aaa23ca630..c4f928ef42e4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; @@ -25,7 +26,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.manifest.IndexManifestEntry; -import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.FileStoreTable; import org.apache.flink.types.Row; import org.assertj.core.api.Assertions; @@ -111,10 +112,9 @@ public void testOverwrite() throws Exception { // overwrite the whole table, we should update the index file by this sql sql("INSERT OVERWRITE T SELECT * FROM T LIMIT 4"); - AbstractFileStoreTable table = - (AbstractFileStoreTable) - (CatalogFactory.createCatalog(CatalogContext.create(new Path(path)))) - .getTable(Identifier.create("default", "T")); + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(path))); + + FileStoreTable table = (FileStoreTable) catalog.getTable(Identifier.create("default", "T")); IndexFileHandler indexFileHandler = table.store().newIndexFileHandler(); List partitions = table.newScan().listPartitions(); List entries = new ArrayList<>(); @@ -123,5 +123,7 @@ public void testOverwrite() throws Exception { Long records = entries.stream().map(entry -> entry.indexFile().rowCount()).reduce(Long::sum).get(); Assertions.assertThat(records).isEqualTo(4); + + catalog.close(); } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java index c60c5d3effdf..c199d7e893f4 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java @@ -30,9 +30,10 @@ import org.apache.paimon.migrate.FileMetaUtils; import org.apache.paimon.migrate.Migrator; import org.apache.paimon.schema.Schema; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.AppendOnlyFileStoreTable; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; @@ -111,8 +112,7 @@ public void executeMigrate() throws Exception { } try { - AbstractFileStoreTable paimonTable = - (AbstractFileStoreTable) hiveCatalog.getTable(identifier); + FileStoreTable paimonTable = (FileStoreTable) hiveCatalog.getTable(identifier); checkPaimonTable(paimonTable); List partitionsNames = @@ -165,7 +165,9 @@ public void executeMigrate() throws Exception { throw new RuntimeException("Migrating failed because exception happens", e); } - paimonTable.newBatchWriteBuilder().newCommit().commit(new ArrayList<>(commitMessages)); + try (BatchTableCommit commit = paimonTable.newBatchWriteBuilder().newCommit()) { + commit.commit(new ArrayList<>(commitMessages)); + } } catch (Exception e) { if (!alreadyExist) { hiveCatalog.dropTable(identifier, true); @@ -184,7 +186,7 @@ private void checkPrimaryKey() throws Exception { } } - private void checkPaimonTable(AbstractFileStoreTable paimonTable) { + private void checkPaimonTable(FileStoreTable paimonTable) { if (!(paimonTable instanceof AppendOnlyFileStoreTable)) { throw new IllegalArgumentException( "Hive migrator only support append only table target table"); @@ -231,7 +233,7 @@ private List importPartitionedTableTask( FileIO fileIO, List partitionNames, Table sourceTable, - AbstractFileStoreTable paimonTable, + FileStoreTable paimonTable, Map rollback) throws Exception { List migrateTasks = new ArrayList<>(); @@ -265,7 +267,7 @@ private List importPartitionedTableTask( public MigrateTask importUnPartitionedTableTask( FileIO fileIO, Table sourceTable, - AbstractFileStoreTable paimonTable, + FileStoreTable paimonTable, Map rollback) { String format = parseFormat(sourceTable.getSd().getSerdeInfo().toString()); String location = sourceTable.getSd().getLocation(); @@ -274,7 +276,7 @@ public MigrateTask importUnPartitionedTableTask( fileIO, format, location, paimonTable, BinaryRow.EMPTY_ROW, path, rollback); } - private void checkCompatible(Table sourceHiveTable, AbstractFileStoreTable paimonTable) { + private void checkCompatible(Table sourceHiveTable, FileStoreTable paimonTable) { List sourceFields = new ArrayList<>(sourceHiveTable.getPartitionKeys()); List targetFields = new ArrayList<>( @@ -321,7 +323,7 @@ public static class MigrateTask implements Callable { private final FileIO fileIO; private final String format; private final String location; - private final AbstractFileStoreTable paimonTable; + private final FileStoreTable paimonTable; private final BinaryRow partitionRow; private final Path newDir; private final Map rollback; @@ -330,7 +332,7 @@ public MigrateTask( FileIO fileIO, String format, String location, - AbstractFileStoreTable paimonTable, + FileStoreTable paimonTable, BinaryRow partitionRow, Path newDir, Map rollback) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index c1872804f48a..c4149fa91578 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark import org.apache.paimon.operation.FileStoreCommit -import org.apache.paimon.table.AbstractFileStoreTable +import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.BatchWriteBuilder import org.apache.paimon.types.RowType import org.apache.paimon.utils.{FileStorePathFactory, RowDataPartitionComputer} @@ -58,7 +58,7 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement { partitionKeys.asScala.toArray) val partitionMap = rowDataPartitionComputer.generatePartValues(new SparkRow(tableRowType, row)) getTable match { - case table: AbstractFileStoreTable => + case table: FileStoreTable => val commit: FileStoreCommit = table.store.newCommit(UUID.randomUUID.toString) commit.dropPartitions( Collections.singletonList(partitionMap), diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 2aa259ae781f..39fb60945f1c 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -22,7 +22,7 @@ import org.apache.paimon.options.Options import org.apache.paimon.spark.catalog.Catalogs import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions import org.apache.paimon.spark.sql.WithTableOptions -import org.apache.paimon.table.AbstractFileStoreTable +import org.apache.paimon.table.FileStoreTable import org.apache.spark.SparkConf import org.apache.spark.paimon.Utils @@ -97,8 +97,8 @@ class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTab CatalogFactory.createCatalog(catalogContext) } - def loadTable(tableName: String): AbstractFileStoreTable = { - catalog.getTable(Identifier.create(dbName0, tableName)).asInstanceOf[AbstractFileStoreTable] + def loadTable(tableName: String): FileStoreTable = { + catalog.getTable(Identifier.create(dbName0, tableName)).asInstanceOf[FileStoreTable] } protected def createRelationV2(tableName: String): LogicalPlan = { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index 2a0cc043d807..23dee044b85f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.Snapshot.CommitKind import org.apache.paimon.spark.PaimonSparkTestBase -import org.apache.paimon.table.AbstractFileStoreTable +import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream @@ -422,11 +422,11 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT Assertions.assertThat(where).isEqualTo(whereExpected) } - def lastSnapshotCommand(table: AbstractFileStoreTable): CommitKind = { + def lastSnapshotCommand(table: FileStoreTable): CommitKind = { table.snapshotManager().latestSnapshot().commitKind() } - def lastSnapshotId(table: AbstractFileStoreTable): Long = { + def lastSnapshotId(table: FileStoreTable): Long = { table.snapshotManager().latestSnapshotId() } }