diff --git a/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java b/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java index 28780cfea94e..24cad16ad43c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java @@ -20,10 +20,6 @@ import org.apache.paimon.options.ConfigOption; -import org.apache.orc.OrcConf; - -import static org.apache.orc.OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD; -import static org.apache.orc.OrcConf.DIRECT_ENCODING_COLUMNS; import static org.apache.paimon.options.ConfigOptions.key; /** Options for orc format. */ @@ -36,7 +32,7 @@ public class OrcOptions { .withDescription("write batch size for orc."); public static final ConfigOption ORC_COMPRESS = - key(OrcConf.COMPRESS.getAttribute()) + key("orc.compress") .stringType() .defaultValue("lz4") .withDescription( @@ -45,16 +41,16 @@ public class OrcOptions { + "orc.compression.zstd.level"); public static final ConfigOption ORC_COLUMN_ENCODING_DIRECT = - key(DIRECT_ENCODING_COLUMNS.getAttribute()) + key("orc.column.encoding.direct") .intType() .noDefaultValue() .withDescription( "Comma-separated list of fields for which dictionary encoding is to be skipped in orc."); public static final ConfigOption ORC_DICTIONARY_KEY_THRESHOLD = - key(DICTIONARY_KEY_SIZE_THRESHOLD.getAttribute()) + key("orc.dictionary.key.threshold") .doubleType() - .defaultValue((Double) DICTIONARY_KEY_SIZE_THRESHOLD.getDefaultValue()) + .defaultValue(0.8) .withDescription( "If the number of distinct keys in a dictionary is greater than this " + "fraction of the total number of non-null rows, turn off " diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala index a654da55af82..c4b861b29752 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala @@ -91,6 +91,34 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), Row(true) :: Nil) checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + + // snapshot-4 + inputData.addData((2, "c1")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's4')"), + Row(true) :: Nil) + + // snapshot-5 + inputData.addData((3, "c2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Row(3, "c2") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's5')"), + Row(true) :: Nil) + + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("s4") :: Row("s5") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 's4,s5')"), + Row(true) :: Nil) + + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) } finally { stream.stop() } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteTagProcedure.java index ca9bce12b6d9..4868ac0a0fc6 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteTagProcedure.java @@ -60,12 +60,15 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String tag = args.getString(1); + String tagStr = args.getString(1); + String[] tags = tagStr.split(","); return modifyPaimonTable( tableIdent, table -> { - table.deleteTag(tag); + for (String tag : tags) { + table.deleteTag(tag); + } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; }); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 30a2e6eae5d9..6ce042e41c1b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -85,7 +85,11 @@ case class DeleteFromPaimonTableCommand( val dropPartitions = matchedPartitions.map { partition => rowDataPartitionComputer.generatePartValues(partition).asScala.asJava } - commit.dropPartitions(dropPartitions.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER) + if (dropPartitions.nonEmpty) { + commit.dropPartitions(dropPartitions.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER) + } else { + writer.commit(Seq.empty) + } } else { val commitMessages = if (withPrimaryKeys) { performDeleteForPkTable(sparkSession) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala index 81f0d775325e..7c76dd2365ff 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala @@ -85,6 +85,12 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase { spark.sql("SELECT * FROM T ORDER BY id"), Seq((2, "b", "2024")).toDF() ) + + spark.sql("DELETE FROM T WHERE dt < '2023' OR dt > '2025'") + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY id"), + Seq((2, "b", "2024")).toDF() + ) } test("Paimon Delete: append-only table, condition contains IN/NOT IN subquery") {