Skip to content

Commit

Permalink
[doc] Update spark streaming read change log (apache#2669)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jan 10, 2024
1 parent 7992ee9 commit ec041d3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
17 changes: 14 additions & 3 deletions docs/content/engines/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,24 @@ val query = spark.readStream
```

Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its
change type) by setting `read.changelog` to true (default is false).
change type) in two ways:

- Direct streaming read with the system audit_log table
- Set `read.changelog` to true (default is false), then streaming read with table location

**Example:**

```scala
// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
// Option 1
val query1 = spark.readStream
.format("paimon")
.table("`table_name$audit_log`")
.writeStream
.format("console")
.start()

// Option 2
val query2 = spark.readStream
.format("paimon")
.option("read.changelog", "true")
.load("/path/to/paimon/source/table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,39 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest {
}
}

test("Paimon CDC Source: streaming read change-log with audit_log system table") {
withTable("T") {
withTempDir {
checkpointDir =>
spark.sql(
s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a','bucket'='2', 'changelog-producer' = 'lookup')
|""".stripMargin)

val readStream = spark.readStream
.format("paimon")
.table("`T$audit_log`")
.writeStream
.format("memory")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.queryName("mem_table")
.outputMode("append")
.start()

val currentResult = () => spark.sql("SELECT * FROM mem_table")
try {
spark.sql(s"INSERT INTO T VALUES (1, 'v_1')")
readStream.processAllAvailable()
checkAnswer(currentResult(), Row("+I", 1, "v_1") :: Nil)

spark.sql(s"INSERT INTO T VALUES (2, 'v_2')")
readStream.processAllAvailable()
checkAnswer(currentResult(), Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil)
} finally {
readStream.stop()
}
}
}
}
}

0 comments on commit ec041d3

Please sign in to comment.