From 36043fcd6555a9eaf6a687f57fe9c6ffa24924e6 Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 10 Apr 2024 18:16:18 +0800 Subject: [PATCH] [core] support PartitionKeyVisitor --- .../paimon/predicate/PartitionKeyVisitor.java | 199 ++++++++++++++ .../predicate/PartitionKeyVisitorTest.java | 246 ++++++++++++++++++ .../DeleteFromPaimonTableCommand.scala | 40 +-- 3 files changed, 470 insertions(+), 15 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/predicate/PartitionKeyVisitor.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/predicate/PartitionKeyVisitorTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionKeyVisitor.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionKeyVisitor.java new file mode 100644 index 0000000000000..77b890d31f2c0 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionKeyVisitor.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.InternalRowUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Sets; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class PartitionKeyVisitor implements FunctionVisitor> { + + private final Set partitions; + + private final Map partitionKeys = new HashMap<>(); + + private boolean accurate = true; + + public PartitionKeyVisitor(List partitionKeys, List partitions) { + for (int i = 0; i < partitionKeys.size(); i++) { + this.partitionKeys.put(partitionKeys.get(i), i); + } + this.partitions = new HashSet<>(partitions); + } + + public boolean isAccurate() { + return accurate; + } + + @VisibleForTesting + public void setAccurate(boolean accurate) { + this.accurate = accurate; + } + + private Set all() { + accurate = false; + return partitions; + } + + private Set filterOrAll(int index, java.util.function.Predicate func) { + if (index == -1) { + // This isn't a partition column. + return all(); + } else { + try { + return partitions.stream().filter(func).collect(Collectors.toSet()); + } catch (Exception e) { + return all(); + } + } + } + + @Override + public Set visitIsNotNull(FieldRef fieldRef) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + return filterOrAll(index, partition -> !partition.isNullAt(index)); + } + + @Override + public Set visitIsNull(FieldRef fieldRef) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + return filterOrAll(index, partition -> partition.isNullAt(index)); + } + + @Override + public Set visitStartsWith(FieldRef fieldRef, Object literal) { + return all(); + } + + @Override + public Set visitLessThan(FieldRef fieldRef, Object literal) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + DataType dataType = fieldRef.type(); + return filterOrAll( + index, + partition -> { + Object value = InternalRowUtils.get(partition, index, dataType); + return InternalRowUtils.compare(value, literal, dataType.getTypeRoot()) < 0; + }); + } + + @Override + public Set visitGreaterOrEqual(FieldRef fieldRef, Object literal) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + DataType dataType = fieldRef.type(); + return filterOrAll( + index, + partition -> { + Object value = InternalRowUtils.get(partition, index, dataType); + return InternalRowUtils.compare(value, literal, dataType.getTypeRoot()) >= 0; + }); + } + + @Override + public Set visitNotEqual(FieldRef fieldRef, Object literal) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + DataType dataType = fieldRef.type(); + return filterOrAll( + index, + partition -> { + Object value = InternalRowUtils.get(partition, index, dataType); + return InternalRowUtils.compare(value, literal, dataType.getTypeRoot()) != 0; + }); + } + + @Override + public Set visitLessOrEqual(FieldRef fieldRef, Object literal) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + DataType dataType = fieldRef.type(); + return filterOrAll( + index, + partition -> { + Object value = InternalRowUtils.get(partition, index, dataType); + return InternalRowUtils.compare(value, literal, dataType.getTypeRoot()) <= 0; + }); + } + + @Override + public Set visitEqual(FieldRef fieldRef, Object literal) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + DataType dataType = fieldRef.type(); + return filterOrAll( + index, + partition -> { + Object value = InternalRowUtils.get(partition, index, dataType); + return InternalRowUtils.compare(value, literal, dataType.getTypeRoot()) == 0; + }); + } + + @Override + public Set visitGreaterThan(FieldRef fieldRef, Object literal) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + DataType dataType = fieldRef.type(); + return filterOrAll( + index, + partition -> { + Object value = InternalRowUtils.get(partition, index, dataType); + return InternalRowUtils.compare(value, literal, dataType.getTypeRoot()) > 0; + }); + } + + @Override + public Set visitIn(FieldRef fieldRef, List literals) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + DataType dataType = fieldRef.type(); + return filterOrAll( + index, + partition -> { + Object value = InternalRowUtils.get(partition, index, dataType); + return literals.contains(value); + }); + } + + @Override + public Set visitNotIn(FieldRef fieldRef, List literals) { + int index = partitionKeys.getOrDefault(fieldRef.name(), -1); + DataType dataType = fieldRef.type(); + return filterOrAll( + index, + partition -> { + Object value = InternalRowUtils.get(partition, index, dataType); + return !literals.contains(value); + }); + } + + @Override + public Set visitAnd(List> children) { + return children.stream().reduce(Sets::intersection).get(); + } + + @Override + public Set visitOr(List> children) { + return children.stream().reduce(Sets::union).get(); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionKeyVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionKeyVisitorTest.java new file mode 100644 index 0000000000000..77dada3e25fec --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionKeyVisitorTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.paimon.data.BinaryString.fromString; +import static org.assertj.core.api.Assertions.assertThat; + +public class PartitionKeyVisitorTest { + + @Test + public void testSinglePartition() { + List partitionKeys = Collections.singletonList("year"); + + BinaryRow pt1 = buildPartitionRow("2022"); + BinaryRow pt2 = buildPartitionRow("2023"); + BinaryRow pt3 = buildPartitionRow("2024"); + BinaryRow pt4 = buildPartitionRow("2025"); + List partitions = Arrays.asList(pt1, pt2, pt3, pt4); + + PartitionKeyVisitor visitor = new PartitionKeyVisitor(partitionKeys, partitions); + + Predicate isNull = + new LeafPredicate(IsNull.INSTANCE, DataTypes.STRING(), 100, "year", null); + assertThat(isNull.visit(visitor)).hasSameElementsAs(Collections.emptyList()); + + Predicate isNotNull = + new LeafPredicate(IsNotNull.INSTANCE, DataTypes.STRING(), 100, "year", null); + assertThat(isNotNull.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt2, pt3, pt4)); + + Predicate equal = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("2024"))); + assertThat(equal.visit(visitor)).hasSameElementsAs(Collections.singletonList(pt3)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate notEqual = + new LeafPredicate( + NotEqual.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("2024"))); + assertThat(notEqual.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt2, pt4)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate gt = + new LeafPredicate( + GreaterThan.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("2024"))); + assertThat(gt.visit(visitor)).hasSameElementsAs(Collections.singletonList(pt4)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate ge = + new LeafPredicate( + GreaterOrEqual.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("2024"))); + assertThat(ge.visit(visitor)).hasSameElementsAs(Arrays.asList(pt3, pt4)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate lt = + new LeafPredicate( + LessThan.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("2024"))); + assertThat(lt.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt2)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate le = + new LeafPredicate( + LessOrEqual.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("2024"))); + assertThat(le.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt2, pt3)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate in = + new LeafPredicate( + In.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Arrays.asList( + BinaryString.fromString("2023"), BinaryString.fromString("2024"))); + assertThat(in.visit(visitor)).hasSameElementsAs(Arrays.asList(pt2, pt3)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate notIn = + new LeafPredicate( + NotIn.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Arrays.asList( + BinaryString.fromString("2023"), BinaryString.fromString("2024"))); + assertThat(notIn.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt4)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate and = PredicateBuilder.and(Arrays.asList(equal, isNotNull)); + assertThat(and.visit(visitor)).hasSameElementsAs(Collections.singletonList(pt3)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate or = PredicateBuilder.or(Arrays.asList(lt, gt)); + assertThat(or.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt2, pt4)); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate startsWith = + new LeafPredicate( + StartsWith.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("20"))); + assertThat(startsWith.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt2, pt3, pt4)); + assertThat(visitor.isAccurate()).isEqualTo(false); + + visitor.setAccurate(true); + assertThat(visitor.isAccurate()).isEqualTo(true); + + Predicate invalidField = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.STRING(), + 100, + "month", + Collections.singletonList(BinaryString.fromString("12"))); + assertThat(invalidField.visit(visitor)) + .hasSameElementsAs(Arrays.asList(pt1, pt2, pt3, pt4)); + assertThat(visitor.isAccurate()).isEqualTo(false); + } + + @Test + public void testMultiPartitions() { + List partitionKeys = Arrays.asList("year", "region"); + + BinaryRow pt1 = buildPartitionRow("2024", 123); + BinaryRow pt2 = buildPartitionRow("2024", 456); + BinaryRow pt3 = buildPartitionRow("2025", 123); + BinaryRow pt4 = buildPartitionRow("2025", 456); + List partitions = Arrays.asList(pt1, pt2, pt3, pt4); + + PartitionKeyVisitor visitor = new PartitionKeyVisitor(partitionKeys, partitions); + + Predicate equalYear2024 = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("2024"))); + Predicate equalYear2025 = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.STRING(), + 100, + "year", + Collections.singletonList(BinaryString.fromString("2025"))); + Predicate equalRegion123 = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.INT(), + 101, + "region", + Collections.singletonList(123)); + Predicate equalRegion456 = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.INT(), + 101, + "region", + Collections.singletonList(456)); + + assertThat(equalYear2024.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt2)); + assertThat(equalRegion123.visit(visitor)).hasSameElementsAs(Arrays.asList(pt1, pt3)); + + Predicate year2024AndRegion123 = + PredicateBuilder.and(Arrays.asList(equalYear2024, equalRegion123)); + assertThat(year2024AndRegion123.visit(visitor)) + .hasSameElementsAs(Collections.singletonList(pt1)); + + Predicate year2024OrRegion456 = + PredicateBuilder.or(Arrays.asList(equalYear2024, equalRegion456)); + assertThat(year2024OrRegion456.visit(visitor)) + .hasSameElementsAs(Arrays.asList(pt1, pt2, pt4)); + + assertThat( + PredicateBuilder.or(Arrays.asList(equalYear2024, year2024AndRegion123)) + .visit(visitor)) + .hasSameElementsAs(Arrays.asList(pt1, pt2)); + } + + private BinaryRow buildPartitionRow(String value) { + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeString(0, fromString(value)); + return row; + } + + private BinaryRow buildPartitionRow(String value1, int value2) { + BinaryRow row = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.writeString(0, fromString(value1)); + writer.writeInt(1, value2); + return row; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 457467da1b655..4a1714321b700 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -18,7 +18,9 @@ package org.apache.paimon.spark.commands -import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor +import org.apache.paimon.CoreOptions +import org.apache.paimon.data.BinaryRow +import org.apache.paimon.predicate.PartitionKeyVisitor import org.apache.paimon.spark.PaimonSplitScan import org.apache.paimon.spark.catalyst.Compatibility import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper @@ -27,6 +29,7 @@ import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage} import org.apache.paimon.types.RowKind +import org.apache.paimon.utils.RowDataPartitionComputer import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.Utils.createDataset @@ -36,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.lit -import java.util.{Collections, UUID} +import java.util.UUID import scala.collection.JavaConverters._ @@ -62,25 +65,26 @@ case class DeleteFromPaimonTableCommand( table.partitionKeys().asScala, sparkSession.sessionState.conf.resolver) - // TODO: provide another partition visitor to support more partition predicate. - val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys) - val partitionPredicate = if (partitionCondition.isEmpty) { - None + val visitor = + new PartitionKeyVisitor(table.partitionKeys, table.newReadBuilder.newScan.listPartitions) + + val (partitionPredicate, candidatePartitions) = if (partitionCondition.isEmpty) { + (None, List.empty[BinaryRow]) } else { - convertConditionToPaimonPredicate( + val predicate = convertConditionToPaimonPredicate( partitionCondition.reduce(And), relation.output, rowType, ignoreFailure = true) + (predicate, predicate.map(_.visit(visitor).asScala.toList).getOrElse(List.empty[BinaryRow])) } - // We do not have to scan table if the following three requirements are met: - // 1) no other predicate; - // 2) partition condition can convert to paimon predicate; - // 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor. + // We have to scan table if the following three requirements are met: + // 1) non-partitioned table; + // 2) no partition predicate; + // 3) all the data in candidate partitions should be deleted; val forceDeleteByRows = - otherCondition.nonEmpty || partitionPredicate.isEmpty || !partitionPredicate.get.visit( - visitor) + otherCondition.nonEmpty || partitionPredicate.isEmpty || !visitor.isAccurate if (forceDeleteByRows) { val commitMessages = if (withPrimaryKeys) { @@ -90,9 +94,15 @@ case class DeleteFromPaimonTableCommand( } writer.commit(commitMessages) } else { - val dropPartitions = visitor.partitions() + val rowDataPartitionComputer = new RowDataPartitionComputer( + CoreOptions.PARTITION_DEFAULT_NAME.defaultValue, + table.schema().logicalPartitionType(), + table.partitionKeys.asScala.toArray + ) commit.dropPartitions( - Collections.singletonList(dropPartitions), + candidatePartitions.map { + partition => rowDataPartitionComputer.generatePartValues(partition).asScala.asJava + }.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER) } }