Skip to content

Commit

Permalink
[spark] Spark supports read changelog (#2107)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Oct 10, 2023
1 parent 471aee1 commit fe7ff3d
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 18 deletions.
21 changes: 21 additions & 0 deletions docs/content/engines/spark3.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,27 @@ val query = spark.readStream
.start()
```

Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its
change type) by setting `read.changelog` to true (default is false).

**Example:**

```scala
// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
.format("paimon")
.option("read.changelog", "true")
.load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()

/*
+I 1 Hi
+I 2 Hello
*/
```

## Spark Type Conversion

This section lists all supported type conversion between Spark and Paimon.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<td>Long</td>
<td>The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.</td>
</tr>
<tr>
<td><h5>read.changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to read row in the form of changelog (add rowkind column in row to represent its change type).</td>
</tr>
<tr>
<td><h5>write.merge-schema</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public static RowType project(RowType inputType, int[] mapping) {
Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
}

public static RowType project(RowType inputType, List<String> names) {
List<DataField> fields = inputType.getFields();
List<String> fieldNames = fields.stream().map(DataField::name).collect(Collectors.toList());
return new RowType(
names.stream()
.map(k -> fields.get(fieldNames.indexOf(k)))
.collect(Collectors.toList()));
}

public static Object castFromString(String s, DataType type) {
return castFromStringInternal(s, type, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ public class SparkConnectorOptions {
.noDefaultValue()
.withDescription(
"The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.");

public static final ConfigOption<Boolean> READ_CHANGELOG =
key("read.changelog")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to read row in the form of changelog (add rowkind column in row to represent its change type).");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark;

import org.apache.paimon.spark.sources.PaimonMicroBatchStream;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
Expand Down Expand Up @@ -83,7 +83,7 @@ public PartitionReaderFactory createReaderFactory() {

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new PaimonMicroBatchStream((FileStoreTable) table, readBuilder, checkpointLocation);
return new PaimonMicroBatchStream((DataTable) table, readBuilder, checkpointLocation);
}

protected List<Split> splits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.options.Options
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory}
import org.apache.paimon.table.system.AuditLogTable

import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
Expand Down Expand Up @@ -70,17 +71,22 @@ class SparkSource
mode: SparkSaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val table = loadTable(parameters.asJava)
val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
WriteIntoPaimonTable(table, SaveMode.transform(mode), data, Options.fromMap(parameters.asJava))
.run(sqlContext.sparkSession)
SparkSource.toBaseRelation(table, sqlContext)
}

private def loadTable(options: JMap[String, String]): FileStoreTable = {
private def loadTable(options: JMap[String, String]): DataTable = {
val catalogContext = CatalogContext.create(
Options.fromMap(options),
SparkSession.active.sessionState.newHadoopConf())
FileStoreTableFactory.create(catalogContext)
val table = FileStoreTableFactory.create(catalogContext)
if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
new AuditLogTable(table)
} else {
table
}
}

override def createSink(
Expand All @@ -91,7 +97,7 @@ class SparkSource
if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) {
throw new RuntimeException("Paimon supports only Complete and Append output mode.")
}
val table = loadTable(parameters.asJava)
val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
val options = Options.fromMap(parameters.asJava)
new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.paimon.spark.sources

import org.apache.paimon.options.Options
import org.apache.paimon.spark.{PaimonImplicits, SparkConnectorOptions, SparkInputPartition, SparkReaderFactory}
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.DataTable
import org.apache.paimon.table.source.ReadBuilder

import org.apache.spark.internal.Logging
Expand All @@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset,
import scala.collection.mutable

class PaimonMicroBatchStream(
originTable: FileStoreTable,
originTable: DataTable,
readBuilder: ReadBuilder,
checkpointLocation: String)
extends MicroBatchStream
Expand Down Expand Up @@ -144,6 +144,6 @@ class PaimonMicroBatchStream(

override def stop(): Unit = {}

override def table: FileStoreTable = originTable
override def table: DataTable = originTable

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.paimon.spark.sources
import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.spark.commands.WithFileStoreTable
import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan, ScanMode}
import org.apache.paimon.table.DataTable
import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
import org.apache.paimon.table.source.TableScan.Plan
import org.apache.paimon.table.source.snapshot.StartingContext
import org.apache.paimon.utils.RowDataPartitionComputer
import org.apache.paimon.utils.{RowDataPartitionComputer, TypeUtils}

import org.apache.spark.sql.connector.read.streaming.ReadLimit
import org.apache.spark.sql.execution.datasources.PartitioningUtils
Expand All @@ -35,7 +35,9 @@ import scala.collection.mutable

case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit)

trait StreamHelper extends WithFileStoreTable {
trait StreamHelper {

def table: DataTable

val initOffset: PaimonSourceOffset

Expand All @@ -44,12 +46,12 @@ trait StreamHelper extends WithFileStoreTable {
private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()

private lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(table.schema().logicalPartitionType())
SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), table.partitionKeys()))

private lazy val partitionComputer: RowDataPartitionComputer = new RowDataPartitionComputer(
new CoreOptions(table.schema.options).partitionDefaultName,
table.schema.logicalPartitionType,
table.schema.partitionKeys.asScala.toArray
new CoreOptions(table.options).partitionDefaultName,
TypeUtils.project(table.rowType(), table.partitionKeys()),
table.partitionKeys().asScala.toArray
)

// Used to get the initial offset.
Expand Down
Loading

0 comments on commit fe7ff3d

Please sign in to comment.