diff --git a/docs/content/engines/spark.md b/docs/content/engines/spark.md index f18b8ee905cb..a583d5a24b78 100644 --- a/docs/content/engines/spark.md +++ b/docs/content/engines/spark.md @@ -459,13 +459,24 @@ val query = spark.readStream ``` Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its -change type) by setting `read.changelog` to true (default is false). +change type) in two ways: + +- Direct streaming read with the system audit_log table +- Set `read.changelog` to true (default is false), then streaming read with table location **Example:** ```scala -// no any scan-related configs are provided, that will use latest-full scan mode. -val query = spark.readStream +// Option 1 +val query1 = spark.readStream + .format("paimon") + .table("`table_name$audit_log`") + .writeStream + .format("console") + .start() + +// Option 2 +val query2 = spark.readStream .format("paimon") .option("read.changelog", "true") .load("/path/to/paimon/source/table") diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala index 8782aebeb416..4e850c250329 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala @@ -213,4 +213,39 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest { } } + test("Paimon CDC Source: streaming read change-log with audit_log system table") { + withTable("T") { + withTempDir { + checkpointDir => + spark.sql( + s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a','bucket'='2', 'changelog-producer' = 'lookup') + |""".stripMargin) + + val readStream = spark.readStream + .format("paimon") + .table("`T$audit_log`") + .writeStream + .format("memory") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .queryName("mem_table") + .outputMode("append") + .start() + + val currentResult = () => spark.sql("SELECT * FROM mem_table") + try { + spark.sql(s"INSERT INTO T VALUES (1, 'v_1')") + readStream.processAllAvailable() + checkAnswer(currentResult(), Row("+I", 1, "v_1") :: Nil) + + spark.sql(s"INSERT INTO T VALUES (2, 'v_2')") + readStream.processAllAvailable() + checkAnswer(currentResult(), Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil) + } finally { + readStream.stop() + } + } + } + } }