From 69bdfe089089e3c718a0df57662f7996d778f75d Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Sat, 7 Dec 2024 00:15:37 +0800 Subject: [PATCH 1/5] [test][spark] Add insert with column list test case (#4654) --- docs/content/spark/sql-write.md | 39 +++++++++----- .../sql/InsertOverwriteTableTestBase.scala | 52 +++++++++++++++++++ 2 files changed, 79 insertions(+), 12 deletions(-) diff --git a/docs/content/spark/sql-write.md b/docs/content/spark/sql-write.md index d2777110914f..5f4fa2dabc9f 100644 --- a/docs/content/spark/sql-write.md +++ b/docs/content/spark/sql-write.md @@ -26,17 +26,30 @@ under the License. # SQL Write -## Syntax +## Insert Table + +The `INSERT` statement inserts new rows into a table or overwrites the existing data in the table. The inserted rows can be specified by value expressions or result from a query. + +**Syntax** ```sql INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }; ``` +**Parameters** + +- **table_identifier**: Specifies a table name, which may be optionally qualified with a database name. + +- **part_spec**: An optional parameter that specifies a comma-separated list of key and value pairs for partitions. -For more information, please check the syntax document: +- **column_list**: An optional parameter that specifies a comma-separated list of columns belonging to the table_identifier table. Spark will reorder the columns of the input query to match the table schema according to the specified column list. -[Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html) + Note: Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, column_list's size must be equal to the target table's column size, otherwise these commands would have failed. -## INSERT INTO +- **value_expr** ( { value | NULL } [ , … ] ) [ , ( … ) ]: Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows. + +For more information, please check the syntax document: [Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html) + +### Insert Into Use `INSERT INTO` to apply records and changes to tables. @@ -44,15 +57,15 @@ Use `INSERT INTO` to apply records and changes to tables. INSERT INTO my_table SELECT ... ``` -## Overwriting the Whole Table +### Insert Overwrite -Use `INSERT OVERWRITE` to overwrite the whole unpartitioned table. +Use `INSERT OVERWRITE` to overwrite the whole table. ```sql INSERT OVERWRITE my_table SELECT ... ``` -### Overwriting a Partition +#### Insert Overwrite Partition Use `INSERT OVERWRITE` to overwrite a partition. @@ -60,7 +73,7 @@ Use `INSERT OVERWRITE` to overwrite a partition. INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ... ``` -### Dynamic Overwrite +#### Dynamic Overwrite Partition Spark's default overwrite mode is `static` partition overwrite. To enable dynamic overwritten you need to set the Spark session configuration `spark.sql.sources.partitionOverwriteMode` to `dynamic` @@ -97,13 +110,15 @@ SELECT * FROM my_table; */ ``` -## Truncate tables +## Truncate Table + +The `TRUNCATE TABLE` statement removes all the rows from a table or partition(s). ```sql TRUNCATE TABLE my_table; ``` -## Updating tables +## Update Table spark supports update PrimitiveType and StructType, for example: @@ -125,13 +140,13 @@ UPDATE t SET name = 'a_new' WHERE id = 1; UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1; ``` -## Deleting from table +## Delete From Table ```sql DELETE FROM my_table WHERE currency = 'UNKNOWN'; ``` -## Merging into table +## Merge Into Table Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit. diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala index 03026e857429..977b74707069 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala @@ -508,4 +508,56 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase { ) :: Nil ) } + + test("Paimon Insert: insert with column list") { + sql("CREATE TABLE T (name String, student_id INT) PARTITIONED BY (address STRING)") + + // insert with a column list + sql("INSERT INTO T (name, student_id, address) VALUES ('a', '1', 'Hangzhou')") + // Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target + // table will automatically add the corresponding default values for the remaining columns (or NULL for any column + // lacking an explicitly-assigned default value). In Spark 3.3 or earlier, these commands would have failed. + // See https://issues.apache.org/jira/browse/SPARK-42521 + if (gteqSpark3_4) { + sql("INSERT INTO T (name) VALUES ('b')") + sql("INSERT INTO T (address, name) VALUES ('Hangzhou', 'c')") + } else { + sql("INSERT INTO T (name, student_id, address) VALUES ('b', null, null)") + sql("INSERT INTO T (name, student_id, address) VALUES ('c', null, 'Hangzhou')") + } + + // insert with both a partition spec and a column list + if (gteqSpark3_4) { + sql("INSERT INTO T PARTITION (address='Beijing') (name) VALUES ('d')") + } else { + sql("INSERT INTO T PARTITION (address='Beijing') (name, student_id) VALUES ('d', null)") + } + sql("INSERT INTO T PARTITION (address='Hangzhou') (student_id, name) VALUES (5, 'e')") + + checkAnswer( + sql("SELECT * FROM T ORDER BY name"), + Seq( + Row("a", 1, "Hangzhou"), + Row("b", null, null), + Row("c", null, "Hangzhou"), + Row("d", null, "Beijing"), + Row("e", 5, "Hangzhou")) + ) + + // insert overwrite with a column list + if (gteqSpark3_4) { + sql("INSERT OVERWRITE T (name, address) VALUES ('f', 'Shanghai')") + } else { + sql("INSERT OVERWRITE T (name, student_id, address) VALUES ('f', null, 'Shanghai')") + } + checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("f", null, "Shanghai")) + + // insert overwrite with both a partition spec and a column list + if (gteqSpark3_4) { + sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name) VALUES ('g')") + } else { + sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name, student_id) VALUES ('g', null)") + } + checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("g", null, "Shanghai")) + } } From 318678804b6d1b12b520c77aa50894200676b616 Mon Sep 17 00:00:00 2001 From: askwang <135721692+askwang@users.noreply.github.com> Date: Sat, 7 Dec 2024 00:17:59 +0800 Subject: [PATCH 2/5] [core] Expire partiitons add default delete num (#4652) --- .../generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 10 +++ .../org/apache/paimon/AbstractFileStore.java | 3 +- .../paimon/operation/PartitionExpire.java | 27 +++++--- .../procedure/ExpirePartitionsProcedure.java | 5 +- .../flink/action/ExpirePartitionsAction.java | 3 +- .../procedure/ExpirePartitionsProcedure.java | 5 +- .../ExpirePartitionsProcedureITCase.java | 37 +++++++++++ .../procedure/ExpirePartitionsProcedure.java | 5 +- .../ExpirePartitionsProcedureTest.scala | 65 +++++++++++++++++++ 10 files changed, 150 insertions(+), 16 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 6fb2c72650fe..7d6bacccb026 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -593,6 +593,12 @@ Duration The check interval of partition expiration. + +
partition.expiration-max-num
+ 100 + Integer + The default deleted num of partition expiration. +
partition.expiration-strategy
values-time diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 765d5a1e32d6..8aebf2f289a0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -809,6 +809,12 @@ public class CoreOptions implements Serializable { .defaultValue(Duration.ofHours(1)) .withDescription("The check interval of partition expiration."); + public static final ConfigOption PARTITION_EXPIRATION_MAX_NUM = + key("partition.expiration-max-num") + .intType() + .defaultValue(100) + .withDescription("The default deleted num of partition expiration."); + public static final ConfigOption PARTITION_TIMESTAMP_FORMATTER = key("partition.timestamp-formatter") .stringType() @@ -2126,6 +2132,10 @@ public Duration partitionExpireCheckInterval() { return options.get(PARTITION_EXPIRATION_CHECK_INTERVAL); } + public int partitionExpireMaxNum() { + return options.get(PARTITION_EXPIRATION_MAX_NUM); + } + public PartitionExpireStrategy partitionExpireStrategy() { return options.get(PARTITION_EXPIRATION_STRATEGY); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 1a538ad89e47..54f554aa46d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -309,7 +309,8 @@ public PartitionExpire newPartitionExpire(String commitUser) { newScan(), newCommit(commitUser), metastoreClient, - options.endInputCheckPartitionExpire()); + options.endInputCheckPartitionExpire(), + options.partitionExpireMaxNum()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index 62a9b796476a..d432a37dfd9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -54,7 +54,7 @@ public class PartitionExpire { private LocalDateTime lastCheck; private final PartitionExpireStrategy strategy; private final boolean endInputCheckPartitionExpire; - private int maxExpires; + private int maxExpireNum; public PartitionExpire( Duration expirationTime, @@ -63,7 +63,8 @@ public PartitionExpire( FileStoreScan scan, FileStoreCommit commit, @Nullable MetastoreClient metastoreClient, - boolean endInputCheckPartitionExpire) { + boolean endInputCheckPartitionExpire, + int maxExpireNum) { this.expirationTime = expirationTime; this.checkInterval = checkInterval; this.strategy = strategy; @@ -72,7 +73,7 @@ public PartitionExpire( this.metastoreClient = metastoreClient; this.lastCheck = LocalDateTime.now(); this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; - this.maxExpires = Integer.MAX_VALUE; + this.maxExpireNum = maxExpireNum; } public PartitionExpire( @@ -81,8 +82,17 @@ public PartitionExpire( PartitionExpireStrategy strategy, FileStoreScan scan, FileStoreCommit commit, - @Nullable MetastoreClient metastoreClient) { - this(expirationTime, checkInterval, strategy, scan, commit, metastoreClient, false); + @Nullable MetastoreClient metastoreClient, + int maxExpireNum) { + this( + expirationTime, + checkInterval, + strategy, + scan, + commit, + metastoreClient, + false, + maxExpireNum); } public PartitionExpire withLock(Lock lock) { @@ -90,8 +100,8 @@ public PartitionExpire withLock(Lock lock) { return this; } - public PartitionExpire withMaxExpires(int maxExpires) { - this.maxExpires = maxExpires; + public PartitionExpire withMaxExpireNum(int maxExpireNum) { + this.maxExpireNum = maxExpireNum; return this; } @@ -145,6 +155,7 @@ private List> doExpire( List> expired = new ArrayList<>(); if (!expiredPartValues.isEmpty()) { + // convert partition value to partition string, and limit the partition num expired = convertToPartitionString(expiredPartValues); LOG.info("Expire Partitions: {}", expired); if (metastoreClient != null) { @@ -175,7 +186,7 @@ private List> convertToPartitionString( .sorted() .map(s -> s.split(DELIMITER)) .map(strategy::toPartitionString) - .limit(Math.min(expiredPartValues.size(), maxExpires)) + .limit(Math.min(expiredPartValues.size(), maxExpireNum)) .collect(Collectors.toList()); } } diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index c0e5a65c49ef..1c0d73cfbe38 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -93,9 +93,10 @@ public String[] call( .catalogEnvironment() .metastoreClientFactory()) .map(MetastoreClient.Factory::create) - .orElse(null)); + .orElse(null), + fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { - partitionExpire.withMaxExpires(maxExpires); + partitionExpire.withMaxExpireNum(maxExpires); } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java index 9528bc137d6f..0fa17e1a8ddb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java @@ -72,7 +72,8 @@ public ExpirePartitionsAction( .catalogEnvironment() .metastoreClientFactory()) .map(MetastoreClient.Factory::create) - .orElse(null)); + .orElse(null), + fileStore.options().partitionExpireMaxNum()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index ee6075a927d3..ce282c6800cc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -97,9 +97,10 @@ public String identifier() { .catalogEnvironment() .metastoreClientFactory()) .map(MetastoreClient.Factory::create) - .orElse(null)); + .orElse(null), + fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { - partitionExpire.withMaxExpires(maxExpires); + partitionExpire.withMaxExpireNum(maxExpires); } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java index 2d1fb6dde78a..a40968e067bc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java @@ -415,6 +415,43 @@ public void testNullPartitionExpire() { .containsExactly("No expired partitions."); } + @Test + public void testExpirePartitionsWithDefaultNum() throws Exception { + sql( + "CREATE TABLE T (" + + " k STRING," + + " dt STRING," + + " PRIMARY KEY (k, dt) NOT ENFORCED" + + ") PARTITIONED BY (dt) WITH (" + + " 'bucket' = '1'," + + " 'partition.expiration-max-num'='2'" + + ")"); + FileStoreTable table = paimonTable("T"); + + sql("INSERT INTO T VALUES ('a', '2024-06-01')"); + sql("INSERT INTO T VALUES ('b', '2024-06-02')"); + sql("INSERT INTO T VALUES ('c', '2024-06-03')"); + // This partition never expires. + sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')"); + Function consumerReadResult = + (InternalRow row) -> row.getString(0) + ":" + row.getString(1); + + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder( + "a:2024-06-01", "b:2024-06-02", "c:2024-06-03", "Never-expire:9999-09-09"); + + assertThat( + callExpirePartitions( + "CALL sys.expire_partitions(" + + "`table` => 'default.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')")) + .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02"); + + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder("c:2024-06-03", "Never-expire:9999-09-09"); + } + /** Return a list of expired partitions. */ public List callExpirePartitions(String callSql) { return sql(callSql).stream() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index 7b388227e5a4..e3a53d2bd2ef 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -107,9 +107,10 @@ public InternalRow[] call(InternalRow args) { .catalogEnvironment() .metastoreClientFactory()) .map(MetastoreClient.Factory::create) - .orElse(null)); + .orElse(null), + fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { - partitionExpire.withMaxExpires(maxExpires); + partitionExpire.withMaxExpireNum(maxExpires); } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala index 4561e532f538..9f0d23dc9379 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala @@ -551,4 +551,69 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest } } } + + test("Paimon Procedure: expire partitions with default num") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql( + s""" + |CREATE TABLE T (k STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1', 'partition.expiration-max-num'='2') + |PARTITIONED BY (pt) + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(String, String)] + val stream = inputData + .toDS() + .toDF("k", "pt") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + // snapshot-1 + inputData.addData(("a", "2024-06-01")) + stream.processAllAvailable() + + // snapshot-2 + inputData.addData(("b", "2024-06-02")) + stream.processAllAvailable() + + // snapshot-3 + inputData.addData(("c", "2024-06-03")) + stream.processAllAvailable() + + // This partition never expires. + inputData.addData(("Never-expire", "9999-09-09")) + stream.processAllAvailable() + + checkAnswer( + query(), + Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c", "2024-06-03") :: Row( + "Never-expire", + "9999-09-09") :: Nil) + // call expire_partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')"), + Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil + ) + + checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire", "9999-09-09") :: Nil) + + } finally { + stream.stop() + } + } + } + } } From 8484bb4a25e5b1873cad1716bb6076d6f60913ed Mon Sep 17 00:00:00 2001 From: HunterXHunter Date: Sat, 7 Dec 2024 00:18:51 +0800 Subject: [PATCH 3/5] [flink] Optimizing parallelism for fixed bucekt and non-partitioned table (#4643) --- .../org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 5703c408243b..ecaa5678dd0b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -265,6 +265,16 @@ protected DataStreamSink buildDynamicBucketSink( } protected DataStreamSink buildForFixedBucket(DataStream input) { + int bucketNums = table.bucketSpec().getNumBuckets(); + if (parallelism == null + && bucketNums < input.getParallelism() + && table.partitionKeys().isEmpty()) { + // For non-partitioned table, if the bucketNums is less than job parallelism. + LOG.warn( + "For non-partitioned table, if bucketNums is less than the parallelism of inputOperator," + + " then the parallelism of writerOperator will be set to bucketNums."); + parallelism = bucketNums; + } DataStream partitioned = partition( input, From 96100eb83e8a6a2faeeef4d6d092d7dbf885d02f Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Sun, 8 Dec 2024 22:24:52 +0800 Subject: [PATCH 4/5] [flink] support flink sourceIdleTime metric in ReadOperator (#4644) --- .../metrics/FileStoreSourceReaderMetrics.java | 1 + .../flink/source/operator/ReadOperator.java | 24 ++++++++++++++++++- .../source/operator/OperatorSourceTest.java | 16 +++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java index 2e1e94777949..a270e0eceecd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java @@ -29,6 +29,7 @@ public class FileStoreSourceReaderMetrics { private long lastSplitUpdateTime = UNDEFINED; public static final long UNDEFINED = -1L; + public static final long ACTIVE = Long.MAX_VALUE; public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) { sourceReaderMetricGroup.gauge( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 80c85f7cdb35..d884724c6749 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -54,9 +54,11 @@ public class ReadOperator extends AbstractStreamOperator private transient IOManager ioManager; private transient FileStoreSourceReaderMetrics sourceReaderMetrics; - // we create our own gauge for currentEmitEventTimeLag, because this operator is not a FLIP-27 + // we create our own gauge for currentEmitEventTimeLag and sourceIdleTime, because this operator + // is not a FLIP-27 // source and Flink can't automatically calculate this metric private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED; + private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; private transient Counter numRecordsIn; public ReadOperator(ReadBuilder readBuilder) { @@ -69,6 +71,7 @@ public void open() throws Exception { this.sourceReaderMetrics = new FileStoreSourceReaderMetrics(getMetricGroup()); getMetricGroup().gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () -> emitEventTimeLag); + getMetricGroup().gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime); this.numRecordsIn = InternalSourceReaderMetricGroup.wrap(getMetricGroup()) .getIOMetricGroup() @@ -83,6 +86,7 @@ public void open() throws Exception { this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); this.reuseRecord = new StreamRecord<>(reuseRow); + this.idlingStarted(); } @Override @@ -94,6 +98,8 @@ public void processElement(StreamRecord record) throws Exception { .earliestFileCreationEpochMillis() .orElse(FileStoreSourceReaderMetrics.UNDEFINED); sourceReaderMetrics.recordSnapshotUpdate(eventTime); + // update idleStartTime when reading a new split + idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; boolean firstRecord = true; try (CloseableIterator iterator = @@ -113,6 +119,8 @@ public void processElement(StreamRecord record) throws Exception { output.collect(reuseRecord); } } + // start idle when data sending is completed + this.idlingStarted(); } @Override @@ -122,4 +130,18 @@ public void close() throws Exception { ioManager.close(); } } + + private void idlingStarted() { + if (!isIdling()) { + idleStartTime = System.currentTimeMillis(); + } + } + + private boolean isIdling() { + return idleStartTime != FileStoreSourceReaderMetrics.ACTIVE; + } + + private long getIdleTime() { + return isIdling() ? System.currentTimeMillis() - idleStartTime : 0; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 61a03a29a21b..0bce8c8901ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -204,6 +204,14 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { .getValue()) .isEqualTo(-1L); + Thread.sleep(300L); + assertThat( + (Long) + TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()) + .isGreaterThan(299L); + harness.processElement(new StreamRecord<>(splits.get(0))); assertThat( (Long) @@ -228,6 +236,14 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { "currentEmitEventTimeLag") .getValue()) .isEqualTo(emitEventTimeLag); + + assertThat( + (Long) + TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()) + .isGreaterThan(99L) + .isLessThan(300L); } private T testReadSplit( From e18f6ed6fde0f3b10170540aebefa0d437bb19aa Mon Sep 17 00:00:00 2001 From: Jingsong Date: Sun, 8 Dec 2024 22:46:44 +0800 Subject: [PATCH 5/5] [parquet] Fix minor format codes in parquet readers --- .../format/parquet/position/CollectionPosition.java | 6 +++--- .../paimon/format/parquet/position/LevelDelegation.java | 1 + .../format/parquet/reader/AbstractColumnReader.java | 5 +---- .../format/parquet/reader/BooleanColumnReader.java | 5 ----- .../parquet/reader/NestedPrimitiveColumnReader.java | 4 +--- .../paimon/format/parquet/reader/ParquetReadState.java | 9 +++------ .../format/parquet/reader/TimestampColumnReader.java | 5 +++-- .../apache/paimon/format/parquet/type/ParquetField.java | 1 + 8 files changed, 13 insertions(+), 23 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java index e72a4280f4aa..beb5de7a92e5 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java @@ -22,14 +22,14 @@ /** To represent collection's position in repeated type. */ public class CollectionPosition { + @Nullable private final boolean[] isNull; private final long[] offsets; - private final long[] length; - private final int valueCount; - public CollectionPosition(boolean[] isNull, long[] offsets, long[] length, int valueCount) { + public CollectionPosition( + @Nullable boolean[] isNull, long[] offsets, long[] length, int valueCount) { this.isNull = isNull; this.offsets = offsets; this.length = length; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java index 25bbedc861d1..8e30d90ba2c7 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java @@ -20,6 +20,7 @@ /** To delegate repetition level and definition level. */ public class LevelDelegation { + private final int[] repetitionLevel; private final int[] definitionLevel; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java index 5e3f4a7e6a33..d4a0ab039b53 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java @@ -69,9 +69,6 @@ public abstract class AbstractColumnReader /** If true, the current page is dictionary encoded. */ private boolean isCurrentPageDictionaryEncoded; - /** Total values in the current page. */ - // private int pageValueCount; - /** * Helper struct to track intermediate states while reading Parquet pages in the column chunk. */ @@ -90,7 +87,7 @@ public abstract class AbstractColumnReader */ /** Run length decoder for data and dictionary. */ - protected RunLengthDecoder runLenDecoder; + RunLengthDecoder runLenDecoder; /** Data input stream. */ ByteBufferInputStream dataInputStream; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java index 83d3c5a07d4b..4355392bf552 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java @@ -42,11 +42,6 @@ public BooleanColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadSt checkTypeName(PrimitiveType.PrimitiveTypeName.BOOLEAN); } - @Override - protected boolean supportLazyDecode() { - return true; - } - @Override protected void afterReadPage() { bitOffset = 0; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index 7db7aedbf6ae..f0a82a6d711e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -65,6 +65,7 @@ /** Reader to read nested primitive column. */ public class NestedPrimitiveColumnReader implements ColumnReader { + private static final Logger LOG = LoggerFactory.getLogger(NestedPrimitiveColumnReader.class); private final IntArrayList repetitionLevelList = new IntArrayList(0); @@ -95,9 +96,6 @@ public class NestedPrimitiveColumnReader implements ColumnReaderParquet + * Timestamp TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType. */ public class TimestampColumnReader extends AbstractColumnReader { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java index 94fe6b91d9d3..291e9ebbceb3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java @@ -22,6 +22,7 @@ /** Field that represent parquet's field type. */ public abstract class ParquetField { + private final DataType type; private final int repetitionLevel; private final int definitionLevel;