Skip to content

Commit

Permalink
Merge branch 'apache:master' into fix-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
eric666666 authored Apr 28, 2024
2 parents 777acc0 + 82f5a1d commit fcfe6a8
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@

import org.apache.paimon.options.ConfigOption;

import org.apache.orc.OrcConf;

import static org.apache.orc.OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD;
import static org.apache.orc.OrcConf.DIRECT_ENCODING_COLUMNS;
import static org.apache.paimon.options.ConfigOptions.key;

/** Options for orc format. */
Expand All @@ -36,7 +32,7 @@ public class OrcOptions {
.withDescription("write batch size for orc.");

public static final ConfigOption<String> ORC_COMPRESS =
key(OrcConf.COMPRESS.getAttribute())
key("orc.compress")
.stringType()
.defaultValue("lz4")
.withDescription(
Expand All @@ -45,16 +41,16 @@ public class OrcOptions {
+ "orc.compression.zstd.level");

public static final ConfigOption<Integer> ORC_COLUMN_ENCODING_DIRECT =
key(DIRECT_ENCODING_COLUMNS.getAttribute())
key("orc.column.encoding.direct")
.intType()
.noDefaultValue()
.withDescription(
"Comma-separated list of fields for which dictionary encoding is to be skipped in orc.");

public static final ConfigOption<Double> ORC_DICTIONARY_KEY_THRESHOLD =
key(DICTIONARY_KEY_SIZE_THRESHOLD.getAttribute())
key("orc.dictionary.key.threshold")
.doubleType()
.defaultValue((Double) DICTIONARY_KEY_SIZE_THRESHOLD.getDefaultValue())
.defaultValue(0.8)
.withDescription(
"If the number of distinct keys in a dictionary is greater than this "
+ "fraction of the total number of non-null rows, turn off "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,34 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)

// snapshot-4
inputData.addData((2, "c1"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Nil)

checkAnswer(
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's4')"),
Row(true) :: Nil)

// snapshot-5
inputData.addData((3, "c2"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Row(3, "c2") :: Nil)

checkAnswer(
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's5')"),
Row(true) :: Nil)

checkAnswer(
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("s4") :: Row("s5") :: Nil)

checkAnswer(
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 's4,s5')"),
Row(true) :: Nil)

checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
} finally {
stream.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String tag = args.getString(1);
String tagStr = args.getString(1);
String[] tags = tagStr.split(",");

return modifyPaimonTable(
tableIdent,
table -> {
table.deleteTag(tag);
for (String tag : tags) {
table.deleteTag(tag);
}
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ case class DeleteFromPaimonTableCommand(
val dropPartitions = matchedPartitions.map {
partition => rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
}
commit.dropPartitions(dropPartitions.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER)
if (dropPartitions.nonEmpty) {
commit.dropPartitions(dropPartitions.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
writer.commit(Seq.empty)
}
} else {
val commitMessages = if (withPrimaryKeys) {
performDeleteForPkTable(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((2, "b", "2024")).toDF()
)

spark.sql("DELETE FROM T WHERE dt < '2023' OR dt > '2025'")
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((2, "b", "2024")).toDF()
)
}

test("Paimon Delete: append-only table, condition contains IN/NOT IN subquery") {
Expand Down

0 comments on commit fcfe6a8

Please sign in to comment.