Skip to content

Commit

Permalink
[feature](hudi) support hudi incremental read (apache#32052)
Browse files Browse the repository at this point in the history
* [feature](hudi) support incremental read for hudi table

* fix jdk17 java options
  • Loading branch information
AshinGau authored Mar 14, 2024
1 parent 2815f97 commit aafd28f
Show file tree
Hide file tree
Showing 23 changed files with 1,095 additions and 118 deletions.
2 changes: 1 addition & 1 deletion conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/b
JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives"

# For jdk 17+, this JAVA_OPTS will be used as default JVM options
JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED"
JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED"

# since 1.2, the JAVA_HOME need to be set to run BE process.
# JAVA_HOME=/path/to/jdk/
Expand Down
33 changes: 23 additions & 10 deletions docs/en/docs/lakehouse/multi-catalog/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ under the License.

| Table Type | Supported Query types |
| ---- | ---- |
| Copy On Write | Snapshot Query + Time Travel |
| Merge On Read | Snapshot Queries + Read Optimized Queries + Time Travel |
| Copy On Write | Snapshot Query, Time Travel, Icremental Read |
| Merge On Read | Snapshot Queries, Read Optimized Queries, Time Travel, Icremental Read |

2. Doris supports Hive Metastore(Including catalogs compatible with Hive MetaStore, like [AWS Glue](./hive.md)/[Alibaba DLF](./dlf.md)) Catalogs.

Expand Down Expand Up @@ -94,16 +94,29 @@ Users can view the perfomace of Java SDK through [profile](../../admin-manual/ht

## Time Travel

Supports reading snapshots specified in Hudi table.

Every write operation to the Hudi table will generate a new snapshot.

By default, query requests will only read the latest version of the snapshot.
Every write operation to the Hudi table will generate a new snapshot. [Time Travel](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#timetravel) supports reading snapshots specified in Hudi table. By default, query requests will only read the latest version of the snapshot.

You can use the `FOR TIME AS OF` statement, based on the time of the snapshot to read historical version data. Examples are as follows:
```
SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";
SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";
SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07";
```
Hudi table does not support the `FOR VERSION AS OF` statement. Using this syntax to query the Hudi table will throw an error.

`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";`
## Incremental Read
Incremental Read obtains a set of records that changed between a start and end commit time, providing you with the "latest state" for each such record as of the end commit time.

`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";`
Doris uses `@incr` syntax to support Incremental Read:
```
SELECT * from hudi_table@incr('beginTime'='xxx', ['endTime'='xxx'], ['hoodie.read.timeline.holes.resolution.policy'='FAIL'], ...);
```
`beginTime` is required, the time format is consistent with [hudi_table_changes](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#incremental-query), and also supports "earliest". `endTime` is optional, default to latest commit time. The remaining optional parameters can be [Spark Read Options](https://hudi.apache.org/docs/0.14.0/configurations#Read-Options).

Hudi table does not support the `FOR VERSION AS OF` statement. Using this syntax to query the Hudi table will throw an error.
Incremental Read should turn on Nereids Planner. Doris translates `@incr` as `predicates` and pushdown to `VHUDI_SCAN_NODE`:
```
| 0:VHUDI_SCAN_NODE(113) |
| table: lineitem_mor |
| predicates: (_hoodie_commit_time[#0] >= '20240311151019723'), (_hoodie_commit_time[#0] <= '20240311151606605') |
| inputSplitNum=1, totalFileSize=13099711, scanRanges=1 |
```
37 changes: 25 additions & 12 deletions docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ under the License.

## 使用限制

1. Hudi 表支持的查询类型如下,后续将支持 Incremental Query
1. Hudi 表支持的查询类型如下,后续将支持 CDC

| 表类型 | 支持的查询类型 |
| ---- | ---- |
| Copy On Write | Snapshot Query + Time Travel |
| Merge On Read | Snapshot Queries + Read Optimized Queries + Time Travel |
| Copy On Write | Snapshot Query, Time Travel, Icremental Read |
| Merge On Read | Snapshot Queries, Read Optimized Queries, Time Travel, Icremental Read |

2. 目前支持 Hive Metastore 和兼容 Hive Metastore 类型(例如[AWS Glue](./hive.md)/[Alibaba DLF](./dlf.md))的 Catalog。

Expand Down Expand Up @@ -96,16 +96,29 @@ Doris 使用 parquet native reader 读取 COW 表的数据文件,使用 Java S

## Time Travel

支持读取 Hudi 表指定的 Snapshot。
每一次对 Hudi 表的写操作都会产生一个新的快照,Time Travel 支持读取 Hudi 表指定的 Snapshot。默认情况下,查询请求只会读取最新版本的快照

每一次对 Hudi 表的写操作都会产生一个新的快照。

默认情况下,查询请求只会读取最新版本的快照。

可以使用 `FOR TIME AS OF` 语句,根据快照的时间([时间格式](https://hudi.apache.org/docs/quick-start-guide#time-travel-query)和Hudi官网保持一致)读取历史版本的数据。示例如下:
可以使用 `FOR TIME AS OF` 语句,根据快照的时间([时间格式](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#timetravel)和Hudi官网保持一致)读取历史版本的数据。示例如下:
```
SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";
SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";
SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07";
```
Hudi 表不支持 `FOR VERSION AS OF` 语句,使用该语法查询 Hudi 表将抛错。

`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";`
## Incremental Read
Incremental Read 可以查询在 startTime 和 endTime 之间变化的数据,返回的结果集是数据在 endTime 的最终状态。

`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";`
Doris 提供了 `@incr` 语法支持 Incremental Read:
```
SELECT * from hudi_table@incr('beginTime'='xxx', ['endTime'='xxx'], ['hoodie.read.timeline.holes.resolution.policy'='FAIL'], ...);
```
`beginTime` 是必须的,时间格式和 hudi 官网 [hudi_table_changes](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#incremental-query) 保持一致,支持 "earliest"。`endTime` 选填,默认最新commitTime。兼容 [Spark Read Options](https://hudi.apache.org/docs/0.14.0/configurations#Read-Options)

Hudi 表不支持 `FOR VERSION AS OF` 语句,使用该语法查询 Hudi 表将抛错。
支持 Incremental Read 需要开启[新优化器](../../query-acceleration/nereids.md),新优化器默认打开。通过 `desc` 查看执行计划,可以发现 Doris 将 `@incr` 转化为 `predicates` 下推给 `VHUDI_SCAN_NODE`:
```
| 0:VHUDI_SCAN_NODE(113) |
| table: lineitem_mor |
| predicates: (_hoodie_commit_time[#0] >= '20240311151019723'), (_hoodie_commit_time[#0] <= '20240311151606605') |
| inputSplitNum=1, totalFileSize=13099711, scanRanges=1 |
```
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,15 @@ public void open() throws IOException {
cleanResolverLock.readLock().lock();
try {
lastUpdateTime.set(System.currentTimeMillis());
recordIterator = HadoopUGI.ugiDoAs(
AuthenticationConfig.getKerberosConfig(split.hadoopConf()), () -> new MORSnapshotSplitReader(
split).buildScanIterator(new Filter[0]));
if (split.incrementalRead()) {
recordIterator = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(
split.hadoopConf()),
() -> new MORIncrementalSplitReader(split).buildScanIterator(new Filter[0]));
} else {
recordIterator = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(
split.hadoopConf()),
() -> new MORSnapshotSplitReader(split).buildScanIterator(new Filter[0]));
}
if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) {
cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
threadId -> AVRO_RESOLVER_CACHE.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class HoodieSplit(private val params: jutil.Map[String, String]) {
conf
}

def incrementalRead: Boolean = {
"true".equalsIgnoreCase(optParams.getOrElse("hoodie.datasource.read.incr.operation", "false"))
}

// NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
// Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
// case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
Expand Down Expand Up @@ -169,6 +173,8 @@ abstract class BaseSplitReader(val split: HoodieSplit) {

protected val tableInformation: HoodieTableInformation = cache.get(split)

protected val timeline: HoodieTimeline = tableInformation.timeline

protected val sparkSession: SparkSession = tableInformation.sparkSession
protected val sqlContext: SQLContext = sparkSession.sqlContext
imbueConfigs(sqlContext)
Expand Down Expand Up @@ -578,8 +584,6 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
)
}

protected val timeline: HoodieTimeline = tableInformation.timeline

protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = {
val internalSchema = internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema)
val querySchemaString = SerDeHelper.toJson(internalSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ package org.apache.doris.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, LogFileIterator, RecordMergingFileIterator}
import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.catalyst.InternalRow

import java.io.Closeable
import java.util.function.Predicate

/**
* Class holding base-file readers for 3 different use-cases:
Expand Down Expand Up @@ -84,29 +87,61 @@ class HoodieMORRecordIterator(config: Configuration,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
fileSplit: HoodieMergeOnReadFileSplit) extends Iterator[InternalRow] with Closeable {
fileSplit: HoodieMergeOnReadFileSplit,
includeStartTime: Boolean = false,
startTimestamp: String = null,
endTimestamp: String = null) extends Iterator[InternalRow] with Closeable {
protected val maxCompactionMemoryInBytes: Long = config.getLongBytes(
"hoodie.compaction.memory", 512 * 1024 * 1024)

protected val recordIterator: Iterator[InternalRow] = fileSplit match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema)
projectedReader(dataFileOnlySplit.dataFile.get)
protected val recordIterator: Iterator[InternalRow] = {
val iter = fileSplit match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema)
projectedReader(dataFileOnlySplit.dataFile.get)

case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, config)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, config)

case split => mergeType match {
case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
// val reader = fileReaders.requiredSchemaReaderSkipMerging
// new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config)
throw new UnsupportedOperationException("Skip merge is optimized by native read")
case split => mergeType match {
case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
val reader = fileReaders.requiredSchemaReaderSkipMerging
new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config)

case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
val reader = pickBaseFileReader()
new RecordMergingFileIterator(split, reader, tableSchema, requiredSchema, tableState, config)
case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
val reader = pickBaseFileReader()
new RecordMergingFileIterator(split, reader, tableSchema, requiredSchema, tableState, config)

case _ => throw new UnsupportedOperationException(s"Not supported merge type ($mergeType)")
case _ => throw new UnsupportedOperationException(s"Not supported merge type ($mergeType)")
}
}

val commitTimeMetadataFieldIdx = requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val needsFiltering = commitTimeMetadataFieldIdx >= 0 && !StringUtils.isNullOrEmpty(startTimestamp) && !StringUtils.isNullOrEmpty(endTimestamp)
if (needsFiltering) {
val filterT: Predicate[InternalRow] = getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx)
iter.filter(filterT.test)
}
else {
iter
}
}

private def getCommitTimeFilter(includeStartTime: Boolean, commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = {
if (includeStartTime) {
new Predicate[InternalRow] {
override def test(row: InternalRow): Boolean = {
val commitTime = row.getString(commitTimeMetadataFieldIdx)
commitTime >= startTimestamp && commitTime <= endTimestamp
}
}
} else {
new Predicate[InternalRow] {
override def test(row: InternalRow): Boolean = {
val commitTime = row.getString(commitTimeMetadataFieldIdx)
commitTime > startTimestamp && commitTime <= endTimestamp
}
}
}
}

Expand Down
Loading

0 comments on commit aafd28f

Please sign in to comment.