Skip to content

Commit

Permalink
[spark] update branch UT and doc (apache#4169)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored Sep 13, 2024
1 parent 7babea6 commit 15d3302
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
43 changes: 43 additions & 0 deletions docs/content/maintenance/manage-branches.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ Run the following command:
```
{{< /tab >}}
{{< tab "Spark SQL" >}}
Run the following sql:
```sql
-- create branch named 'branch1' from tag 'tag1'
CALL sys.create_branch('default.T', 'branch1', 'tag1');

-- create empty branch named 'branch1'
CALL sys.create_branch('default.T', 'empty_branch');
```
{{< /tab >}}
{{< /tabs >}}
## Delete Branches
Expand Down Expand Up @@ -101,6 +114,15 @@ Run the following command:
```
{{< /tab >}}
{{< tab "Spark SQL" >}}
Run the following sql:
```sql
CALL sys.delete_branch('default.T', 'branch1');
```
{{< /tab >}}
{{< /tabs >}}
## Read / Write With Branch
Expand All @@ -122,6 +144,27 @@ INSERT INTO `t$branch_branch1` SELECT ...
{{< /tab >}}
{{< tab "Spark SQL" >}}
```sql
-- read from branch 'branch1'
SELECT * FROM `t$branch_branch1`;

-- write to branch 'branch1'
INSERT INTO `t$branch_branch1` SELECT ...
```
{{< /tab >}}
{{< tab "Spark DataFrame" >}}
```sql
-- read from branch 'branch1'
spark.read.format("paimon").option("branch", "branch1").table("t")
```
{{< /tab >}}
{{< /tabs >}}
## Fast Forward
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest

class CreateAndDeleteBranchProcedureTest extends PaimonSparkTestBase with StreamTest {
class BranchProcedureTest extends PaimonSparkTestBase with StreamTest {

import testImplicits._
test("Paimon Procedure: create and delete branch") {
test("Paimon Procedure: create, query, write and delete branch") {
failAfter(streamingTimeout) {
withTempDir {
checkpointDir =>
Expand Down Expand Up @@ -86,19 +86,43 @@ class CreateAndDeleteBranchProcedureTest extends PaimonSparkTestBase with Stream
val branchManager = table.branchManager()
assert(branchManager.branchExists("test_branch"))

// query from branch
checkAnswer(
spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a"),
Row(1, "a") :: Row(2, "b") :: Nil
)
checkAnswer(
spark.read.format("paimon").option("branch", "test_branch").table("T").orderBy("a"),
Row(1, "a") :: Row(2, "b") :: Nil
)

// update branch
spark.sql("INSERT INTO `T$branch_test_branch` VALUES (3, 'c')")
checkAnswer(
spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a"),
Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil
)

// create empty branch
checkAnswer(
spark.sql(
"CALL paimon.sys.create_branch(table => 'test.T', branch => 'empty_branch')"),
Row(true) :: Nil)
assert(branchManager.branchExists("empty_branch"))
checkAnswer(
spark.sql("SELECT * FROM `T$branch_empty_branch` ORDER BY a"),
Nil
)

// delete branch
checkAnswer(
spark.sql(
"CALL paimon.sys.delete_branch(table => 'test.T', branch => 'test_branch')"),
Row(true) :: Nil)
assert(!branchManager.branchExists("test_branch"))
intercept[Exception] {
spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a")
}

} finally {
stream.stop()
Expand Down

0 comments on commit 15d3302

Please sign in to comment.