diff --git a/docs/content/how-to/altering-tables.md b/docs/content/how-to/altering-tables.md index 39c3038b4637..c1fe579707a2 100644 --- a/docs/content/how-to/altering-tables.md +++ b/docs/content/how-to/altering-tables.md @@ -268,6 +268,40 @@ ALTER TABLE my_table DROP COLUMN c1; {{< /tabs >}} + +## Dropping Partitions + +The following SQL drops the partitions of the paimon table. + +{{< tabs "drop-partitions" >}} + +{{< tab "Flink" >}} + +For flink sql, you can specify the partial columns of partition columns, and you can also specify multiple partition values at the same time. + +```sql +ALTER TABLE MyTable DROP PARTITION (`id` = 1); + +ALTER TABLE MyTable DROP PARTITION (`id` = 1, `name` = 'paimon'); + +ALTER TABLE MyTable DROP PARTITION (`id` = 1), PARTITION (`id` = 2); + +``` + +{{< /tab >}} + +{{< tab "Spark3" >}} + +For spark sql, you need to specify all the partition columns. + +```sql +ALTER TABLE MyTable DROP PARTITION (`id` = 1, `name` = 'paimon'); +``` + +{{< /tab >}} + +{{< /tabs >}} + ## Changing Column Nullability The following SQL changes nullability of column `coupon_info`. diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java index bd938c0289cd..005212cf2c6c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java @@ -79,7 +79,7 @@ public static Object castFromCdcValueString(String s, DataType type) { return castFromStringInternal(s, type, true); } - private static Object castFromStringInternal(String s, DataType type, boolean isCdcValue) { + public static Object castFromStringInternal(String s, DataType type, boolean isCdcValue) { BinaryString str = BinaryString.fromString(s); switch (type.getTypeRoot()) { case CHAR: diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 8f56d04bb61c..11599801427e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -23,23 +23,28 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.lineage.LineageMetaFactory; +import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; @@ -101,6 +106,16 @@ public void createDatabase(String name, boolean ignoreIfExists) createDatabaseImpl(name); } + @Override + public void dropPartition(Identifier identifier, Map partitionSpec) + throws TableNotExistException { + Table table = getTable(identifier); + AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table; + FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString()); + commit.dropPartitions( + Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER); + } + protected abstract void createDatabaseImpl(String name); @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 7e8b03f13164..99295c6bdb75 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -27,6 +27,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -191,6 +192,17 @@ void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotEx void alterTable(Identifier identifier, List changes, boolean ignoreIfNotExists) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException; + /** + * Drop the partition of the specify table. + * + * @param identifier path of the table to drop partition + * @param partitions the partition to be deleted + * @throws TableNotExistException if the table does not exist + * @throws PartitionNotExistException if the partition does not exist + */ + void dropPartition(Identifier identifier, Map partitions) + throws TableNotExistException, PartitionNotExistException; + /** * Modify an existing table from a {@link SchemaChange}. * @@ -323,6 +335,36 @@ public Identifier identifier() { } } + /** Exception for trying to operate on a partition that doesn't exist. */ + class PartitionNotExistException extends Exception { + + private static final String MSG = "Partition %s do not exist in the table %s."; + + private final Identifier identifier; + + private final Map partitionSpec; + + public PartitionNotExistException( + Identifier identifier, Map partitionSpec) { + this(identifier, partitionSpec, null); + } + + public PartitionNotExistException( + Identifier identifier, Map partitionSpec, Throwable cause) { + super(String.format(MSG, partitionSpec, identifier.getFullName()), cause); + this.identifier = identifier; + this.partitionSpec = partitionSpec; + } + + public Identifier identifier() { + return identifier; + } + + public Map partitionSpec() { + return partitionSpec; + } + } + /** Exception for trying to alter a column that already exists. */ class ColumnAlreadyExistException extends Exception { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 5d78e6e2a659..3eca8442f3fb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -20,8 +20,14 @@ import org.apache.paimon.predicate.Predicate; +import java.util.Map; + /** Inner {@link TableScan} contains filter push down. */ public interface InnerTableScan extends TableScan { InnerTableScan withFilter(Predicate predicate); + + default InnerTableScan withPartitionFilter(Map partitionSpec) { + return this; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java index cb4e39b23a5e..29f890a91784 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java @@ -25,6 +25,8 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.utils.SnapshotManager; +import java.util.Map; + /** {@link TableScan} implementation for batch planning. */ public class InnerTableScanImpl extends AbstractInnerTableScan { @@ -51,6 +53,12 @@ public InnerTableScan withFilter(Predicate predicate) { return this; } + @Override + public InnerTableScan withPartitionFilter(Map partitionSpec) { + snapshotReader.withPartitionFilter(partitionSpec); + return this; + } + @Override public TableScan.Plan plan() { if (startingScanner == null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java index 4991275fb8a6..e83ae0bfa632 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java @@ -27,6 +27,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; +import java.util.Map; /** * An interface for building the {@link TableScan} and {@link TableRead}. @@ -95,6 +96,9 @@ default ReadBuilder withFilter(List predicates) { */ ReadBuilder withFilter(Predicate predicate); + /** Push partition filter. */ + ReadBuilder withPartitionFilter(Map partitionSpec); + /** * Apply projection to the reader. * diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index eeca2c57590b..4fb0dec377da 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -25,6 +25,7 @@ import org.apache.paimon.utils.TypeUtils; import java.util.Arrays; +import java.util.Map; import java.util.Objects; /** Implementation for {@link ReadBuilder}. */ @@ -36,6 +37,7 @@ public class ReadBuilderImpl implements ReadBuilder { private Predicate filter; private int[][] projection; + private Map partitionSpec; public ReadBuilderImpl(InnerTable table) { this.table = table; @@ -60,6 +62,12 @@ public ReadBuilder withFilter(Predicate filter) { return this; } + @Override + public ReadBuilder withPartitionFilter(Map partitionSpec) { + this.partitionSpec = partitionSpec; + return this; + } + @Override public ReadBuilder withProjection(int[][] projection) { this.projection = projection; @@ -68,7 +76,7 @@ public ReadBuilder withProjection(int[][] projection) { @Override public TableScan newScan() { - return table.newScan().withFilter(filter); + return table.newScan().withFilter(filter).withPartitionFilter(partitionSpec); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index ab55deba1c6f..4af797c0a97f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Map; /** Read splits from specified {@link Snapshot} with given configuration. */ public interface SnapshotReader { @@ -50,6 +51,8 @@ public interface SnapshotReader { SnapshotReader withFilter(Predicate predicate); + SnapshotReader withPartitionFilter(Map partitionSpec); + SnapshotReader withMode(ScanMode scanMode); SnapshotReader withLevelFilter(Filter levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index aeb1143c31de..88a17878b4e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -39,8 +39,10 @@ import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.SplitGenerator; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TypeUtils; import javax.annotation.Nullable; @@ -124,6 +126,30 @@ public SnapshotReader withSnapshot(Snapshot snapshot) { return this; } + @Override + public SnapshotReader withPartitionFilter(Map partitionSpec) { + if (partitionSpec != null) { + List partitionKeys = tableSchema.partitionKeys(); + RowType rowType = tableSchema.logicalPartitionType(); + PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); + List partitionFilters = + partitionSpec.entrySet().stream() + .map( + m -> { + int index = partitionKeys.indexOf(m.getKey()); + Object value = + TypeUtils.castFromStringInternal( + m.getValue(), + rowType.getTypeAt(index), + false); + return predicateBuilder.equal(index, value); + }) + .collect(Collectors.toList()); + scan.withPartitionFilter(PredicateBuilder.and(partitionFilters)); + } + return this; + } + @Override public SnapshotReader withFilter(Predicate predicate) { List partitionKeys = tableSchema.partitionKeys(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index fb7bfa1318ba..598281195cb9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -227,6 +227,12 @@ public SnapshotReader withFilter(Predicate predicate) { return this; } + @Override + public SnapshotReader withPartitionFilter(Map partitionSpec) { + snapshotReader.withPartitionFilter(partitionSpec); + return this; + } + public SnapshotReader withMode(ScanMode scanMode) { snapshotReader.withMode(scanMode); return this; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 415480fe64f6..b49d748de5a5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -31,6 +31,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Preconditions; @@ -827,7 +828,19 @@ private List getPartitionSpecs( throw new TableNotPartitionedException(getName(), tablePath); } - List splits = table.newReadBuilder().newScan().plan().splits(); + ReadBuilder readBuilder = table.newReadBuilder(); + List splits; + if (partitionSpec != null && partitionSpec.getPartitionSpec() != null) { + splits = + readBuilder + .withPartitionFilter(partitionSpec.getPartitionSpec()) + .newScan() + .plan() + .splits(); + } else { + splits = readBuilder.newScan().plan().splits(); + } + List partitions = splits.stream() .map(m -> ((DataSplit) m).partition()) @@ -848,28 +861,8 @@ private List getPartitionSpecs( Preconditions.checkNotNull( m, "Partition row data is null. This is unexpected.")); - if (partitionSpec != null - && partitionSpec.getPartitionSpec() != null) { - boolean match = true; - for (Map.Entry specMapEntry : - partitionSpec.getPartitionSpec().entrySet()) { - String key = specMapEntry.getKey(); - match = - match & partValues.containsKey(key) - && partValues - .get(key) - .contains(specMapEntry.getValue()); - } - if (match) { - return new CatalogPartitionSpec(partValues); - } - - return null; - } else { - return new CatalogPartitionSpec(partValues); - } + return new CatalogPartitionSpec(partValues); }) - .filter(Objects::nonNull) .collect(Collectors.toList()); } catch (Catalog.TableNotExistException e) { throw new TableNotExistException(getName(), tablePath); @@ -899,7 +892,12 @@ public final CatalogPartition getPartition( @Override public final boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { - return false; + try { + List partitionSpecs = getPartitionSpecs(tablePath, partitionSpec); + return partitionSpecs.size() > 0; + } catch (TableNotPartitionedException | TableNotExistException e) { + throw new CatalogException(e); + } } @Override @@ -915,8 +913,20 @@ public final void createPartition( @Override public final void dropPartition( ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) - throws CatalogException { - throw new UnsupportedOperationException(); + throws PartitionNotExistException, CatalogException { + + if (!partitionExists(tablePath, partitionSpec)) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + + try { + Identifier identifier = toIdentifier(tablePath); + catalog.dropPartition(identifier, partitionSpec.getPartitionSpec()); + } catch (Catalog.TableNotExistException e) { + throw new CatalogException(e); + } catch (Catalog.PartitionNotExistException e) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 920c6a8b3a85..4ded19c85240 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -32,6 +32,7 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; @@ -454,6 +455,50 @@ public void testShowPartitions() { assertThat(result.toString()).isEqualTo("[+I[dt=2020-01-02/hh=11]]"); } + @Test + public void testDropPartition() { + sql( + "CREATE TABLE PartitionTable (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING,\n" + + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + + ") PARTITIONED BY (dt, hh)"); + sql("INSERT INTO PartitionTable select 1,1,'a','2020-01-01','10'"); + sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'"); + sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'"); + sql("INSERT INTO PartitionTable select 4,4,'d','2020-01-04','14'"); + sql("INSERT INTO PartitionTable select 5,5,'e','2020-01-05','15'"); + + assertThatThrownBy( + () -> + sql( + "ALTER TABLE PartitionTable DROP PARTITION (`dt` = '2020-10-10')")) + .getRootCause() + .isInstanceOf(PartitionNotExistException.class) + .hasMessage( + "Partition CatalogPartitionSpec{{dt=2020-10-10}} of table default.PartitionTable in catalog PAIMON does not exist."); + + List result = sql("SHOW PARTITIONS PartitionTable"); + assertThat(result.toString()) + .isEqualTo( + "[+I[dt=2020-01-01/hh=10], +I[dt=2020-01-02/hh=11], +I[dt=2020-01-03/hh=11], +I[dt=2020-01-04/hh=14], +I[dt=2020-01-05/hh=15]]"); + + // drop a partition + sql("ALTER TABLE PartitionTable DROP PARTITION (`dt` = '2020-01-01', `hh` = '10')"); + result = sql("SHOW PARTITIONS PartitionTable"); + assertThat(result.toString()) + .isEqualTo( + "[+I[dt=2020-01-02/hh=11], +I[dt=2020-01-03/hh=11], +I[dt=2020-01-04/hh=14], +I[dt=2020-01-05/hh=15]]"); + + // drop two partitions + sql("ALTER TABLE PartitionTable DROP PARTITION (dt ='2020-01-04'), PARTITION (hh='11')"); + result = sql("SHOW PARTITIONS PartitionTable"); + assertThat(result.toString()).isEqualTo("[+I[dt=2020-01-05/hh=15]]"); + } + @Test public void testFileFormatPerLevel() { sql(