From d463b1d6751255c75ba8932720252cb6c3adf4d1 Mon Sep 17 00:00:00 2001 From: chouc <32946731+choucmei@users.noreply.github.com> Date: Wed, 13 Mar 2024 13:39:36 +0800 Subject: [PATCH] [spark] Support DeleteFromPaimonTableCommand for Spark 3.2 (#2947) --- .../DeleteFromPaimonTableCommand.scala | 29 +++++++++++++++++++ .../DeleteFromPaimonTableCommand.scala | 29 +++++++++++++++++++ .../spark/sql/DeleteFromTableTest.scala | 21 ++++++++++++++ .../spark/sql/DeleteFromTableTest.scala | 21 ++++++++++++++ .../spark/sql/DeleteFromTableTest.scala | 21 ++++++++++++++ .../spark/sql/DeleteFromTableTest.scala | 21 ++++++++++++++ .../DeleteFromPaimonTableCommand.scala | 15 ++++++---- .../spark/sql/DeleteFromTableTest.scala | 4 ++- 8 files changed, 155 insertions(+), 6 deletions(-) create mode 100644 paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala create mode 100644 paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala create mode 100644 paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala create mode 100644 paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala create mode 100644 paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala create mode 100644 paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala diff --git a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala new file mode 100644 index 000000000000..b131384debee --- /dev/null +++ b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.spark.SparkTable + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable + +case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable) + extends DeleteFromPaimonTableCommandBase { + override def condition(): Expression = delete.condition.orNull +} diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala new file mode 100644 index 000000000000..b131384debee --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.spark.SparkTable + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable + +case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable) + extends DeleteFromPaimonTableCommandBase { + override def condition(): Expression = delete.condition.orNull +} diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala new file mode 100644 index 000000000000..09554a1dbf8d --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DeleteFromTableTest extends DeleteFromTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala new file mode 100644 index 000000000000..09554a1dbf8d --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DeleteFromTableTest extends DeleteFromTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala new file mode 100644 index 000000000000..09554a1dbf8d --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DeleteFromTableTest extends DeleteFromTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala new file mode 100644 index 000000000000..09554a1dbf8d --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DeleteFromTableTest extends DeleteFromTableTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index a30b47dbc496..3b8e801a2e3a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -29,6 +29,7 @@ import org.apache.paimon.types.RowKind import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.Utils.createDataset +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter} import org.apache.spark.sql.functions.lit @@ -37,14 +38,13 @@ import java.util.{Collections, UUID} import scala.util.control.NonFatal -case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable) - extends PaimonLeafRunnableCommand - with PaimonCommand { - +trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with PaimonCommand { + self: DeleteFromPaimonTableCommand => override def table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable] private val relation = delete.table - private val condition = delete.condition + + def condition(): Expression private lazy val (deletePredicate, forceDeleteByRows) = if (condition == null || condition == TrueLiteral) { @@ -87,3 +87,8 @@ case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromT WriteIntoPaimonTable(table, InsertInto, df, new Options()).run(sparkSession) } } + +case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable) + extends DeleteFromPaimonTableCommandBase { + override def condition(): Expression = delete.condition +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala index 7c76d28ec5d2..3646cd73d5d6 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala @@ -24,7 +24,7 @@ import org.apache.paimon.spark.catalyst.analysis.Delete import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} -class DeleteFromTableTest extends PaimonSparkTestBase { +abstract class DeleteFromTableTestBase extends PaimonSparkTestBase { test(s"test delete from append only table") { spark.sql(s""" @@ -184,3 +184,5 @@ class DeleteFromTableTest extends PaimonSparkTestBase { assertThat(rows4.toString).isEqualTo("[]") } } + +class DeleteFromTableTest extends DeleteFromTableTestBase {}