Skip to content

Commit

Permalink
[datalake](hudi) add hudi docker compose to run hudi examples (apache…
Browse files Browse the repository at this point in the history
…#37451)

## Proposed changes

**Doris+Hudi+MINIO Environments**:
Launch spark/doris/hive/hudi/minio test environments, and give examples
to query hudi in Doris.

## Launch Docker Compose
**Create Network**
```shell
sudo docker network create -d bridge hudi-net
```
**Launch all components in docker**
```shell
sudo ./start-hudi-compose.sh
```
**Login into Spark**
```shell
sudo ./login-spark.sh
```
**Login into Doris**
```shell
sudo ./login-doris.sh
```
  • Loading branch information
AshinGau authored Jul 9, 2024
1 parent b48b5da commit e338cce
Show file tree
Hide file tree
Showing 16 changed files with 602 additions and 0 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ header:
- "**/*.sql"
- "**/*.lock"
- "**/*.out"
- "**/*.parquet"
- "docs/.markdownlintignore"
- "fe/fe-core/src/test/resources/data/net_snmp_normal"
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaLexer.g4"
Expand Down
207 changes: 207 additions & 0 deletions samples/datalake/hudi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Doris+Hudi+MINIO Environments
Launch spark/doris/hive/hudi/minio test environments, and give examples to query hudi in Doris.

## Launch Docker Compose
**Create Network**
```shell
sudo docker network create -d bridge hudi-net
```
**Launch all components in docker**
```shell
sudo ./start-hudi-compose.sh
```
**Login into Spark**
```shell
sudo ./login-spark.sh
```
**Login into Doris**
```shell
sudo ./login-doris.sh
```

## Prepare Hudi Data
There's already a hive table named `customer` in hive default. Create a hudi table from the hive table:
```sql
-- ./login-spark.sh
use default;

-- create a COW table
CREATE TABLE customer_cow
USING hudi
TBLPROPERTIES (
type = 'cow',
primaryKey = 'c_custkey',
preCombineField = 'c_name'
)
PARTITIONED BY (c_nationkey)
AS SELECT * FROM customer;

-- create a MOR table
CREATE TABLE customer_mor
USING hudi
TBLPROPERTIES (
type = 'mor',
primaryKey = 'c_custkey',
preCombineField = 'c_name'
)
PARTITIONED BY (c_nationkey)
AS SELECT * FROM customer;
```

## Query Data
Doris refresh hive catalog in [10min in default](https://doris.apache.org/docs/lakehouse/datalake-analytics/hive/#metadata-cache--refresh),
users can refresh directly to access the hudi table in Doris by `doris> refresh catalog hive;`

After hudi table is ready in Doris, all operations in hudi table will be detected by Doris, and there's no need to refresh catalog or tables.

Insert new data into hudi tables in spark-sql:
```sql
spark-sql> insert into customer_cow values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25);
spark-sql> insert into customer_mor values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25);
```
`c_nationkey=25` is a new partition, doris can query the new data at once without refresh:
```sql
doris> use hive.default;
doris> select * from customer_cow where c_custkey = 100;
doris> select * from customer_mor where c_custkey = 100;
```
Insert a record with `c_custkey=32`(primary key, already in table) will remove the old record:
```sql
spark-sql> insert into customer_cow values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15);
spark-sql> insert into customer_mor values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15);
```
Query the updated data at once in doris:
```sql
doris> select * from customer_cow where c_custkey = 32;
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
doris> select * from customer_mor where c_custkey = 32;
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
```

## Query Optimization
Doris uses native reader(c++) to read the data files of the **COW** table, and uses the Java SDK (By calling hudi-bundle through JNI) to read the data files of the **MOR** table. In upsert scenario, there may still remains base files that have not been updated in the MOR table, which can be read through the native reader. Users can view the execution plan of hudi scan through the explain command, where `hudiNativeReadSplits` indicates how many split files are read through the native reader.
```sql
-- COW table is read natively
doris> explain select * from customer_cow where c_custkey = 32;
| 0:VHUDI_SCAN_NODE(68) |
| table: customer_cow |
| predicates: (c_custkey[#5] = 32) |
| inputSplitNum=101, totalFileSize=45338886, scanRanges=101 |
| partition=26/26 |
| cardinality=1, numNodes=1 |
| pushdown agg=NONE |
| hudiNativeReadSplits=101/101 |

-- MOR table: because only the base file contains `c_custkey = 32` that is updated, 100 splits are read natively, while the split with log file is read by JNI.
doris> explain select * from customer_mor where c_custkey = 32;
| 0:VHUDI_SCAN_NODE(68) |
| table: customer_mor |
| predicates: (c_custkey[#5] = 32) |
| inputSplitNum=101, totalFileSize=45340731, scanRanges=101 |
| partition=26/26 |
| cardinality=1, numNodes=1 |
| pushdown agg=NONE |
| hudiNativeReadSplits=100/101 |

-- Use delete statement to see more differences
spark-sql> delete from customer_cow where c_custkey = 64;
doris> explain select * from customer_cow where c_custkey = 64;

spark-sql> delete from customer_mor where c_custkey = 64;
doris> explain select * from customer_mor where c_custkey = 64;

-- customer_xxx is partitioned by c_nationkey, we can use the partition column to prune data
doris> explain select * from customer_mor where c_custkey = 64 and c_nationkey = 15;
| 0:VHUDI_SCAN_NODE(68) |
| table: customer_mor |
| predicates: (c_custkey[#5] = 64), (c_nationkey[#12] = 15) |
| inputSplitNum=4, totalFileSize=1798186, scanRanges=4 |
| partition=1/26 |
| cardinality=1, numNodes=1 |
| pushdown agg=NONE |
| hudiNativeReadSplits=3/4 |
```

## TimeTravel
See the commit metadata in spark-sql:
```sql
spark-sql> call show_commits(table => 'customer_cow', limit => 10);
20240603033556094 20240603033558249 commit 448833 0 1 1 183 0 0
20240603015444737 20240603015446588 commit 450238 0 1 1 202 1 0
20240603015018572 20240603015020503 commit 436692 1 0 1 1 0 0
20240603013858098 20240603013907467 commit 44902033 100 0 25 18751 0 0

spark-sql> call show_commits(table => 'customer_mor', limit => 10);
20240603033745977 20240603033748021 deltacommit 1240 0 1 1 0 0 0
20240603015451860 20240603015453539 deltacommit 1434 0 1 1 1 1 0
20240603015058442 20240603015100120 deltacommit 436691 1 0 1 1 0 0
20240603013918515 20240603013922961 deltacommit 44904040 100 0 25 18751 0 0
```
Let's travel to the commit we insert `c_custkey=100` in doris where `c_custkey=32` is not updated:
```sql
doris> select * from customer_cow for time as of '20240603015018572' where c_custkey = 32 or c_custkey = 100;
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
| c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey |
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
| 32 | Customer#000000032 | jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J | 25-430-914-2194 | 3471.53 | BUILDING | cial ideas. final, furious requests across the e | 15 |
| 100 | Customer#000000100 | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 25 |
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
-- compare with spark-sql
spark-sql> select * from customer_mor timestamp as of '20240603015018572' where c_custkey = 32 or c_custkey = 100;

doris> select * from customer_mor for time as of '20240603015058442' where c_custkey = 32 or c_custkey = 100;
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
| c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey |
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
| 100 | Customer#000000100 | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 25 |
| 32 | Customer#000000032 | jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J | 25-430-914-2194 | 3471.53 | BUILDING | cial ideas. final, furious requests across the e | 15 |
+-----------+--------------------+---------------------------------------+-----------------+-----------+--------------+--------------------------------------------------+-------------+
spark-sql> select * from customer_mor timestamp as of '20240603015058442' where c_custkey = 32 or c_custkey = 100;
```

## Incremental Read
Seed the data changed between after inserting `c_custkey=100`
```sql
doris> select * from customer_cow@incr('beginTime'='20240603015018572');
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
spark-sql> select * from hudi_table_changes('customer_cow', 'latest_state', '20240603015018572');

doris> select * from customer_mor@incr('beginTime'='20240603015058442');
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| c_custkey | c_name | c_address | c_phone | c_acctbal | c_mktsegment | c_comment | c_nationkey |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
| 32 | Customer#000000032_update | jD2xZzi | 25-430-914-2194 | 3471.59 | BUILDING | cial ideas. final, furious requests | 15 |
+-----------+---------------------------+-----------+-----------------+-----------+--------------+-------------------------------------+-------------+
spark-sql> select * from hudi_table_changes('customer_mor', 'latest_state', '20240603015058442');
```
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
21 changes: 21 additions & 0 deletions samples/datalake/hudi/hudi-compose.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

HIVE_THRIFT_PORT=9771
MINIO_SERVICE_PORT=9772
DORIS_QUERY_PORT=9773
109 changes: 109 additions & 0 deletions samples/datalake/hudi/hudi-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version: "3.9"
services:
metastore_db:
image: postgres:11
hostname: metastore_db
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore

hive-metastore:
hostname: hive-metastore
container_name: spark-hudi-hive
image: 'starburstdata/hive:3.1.2-e.18'
volumes:
- './packages/spark-3.4.2-bin-hadoop3:/opt/spark-3.4.2-bin-hadoop3'
- './scripts:/opt/scripts'
ports:
- '${HIVE_THRIFT_PORT}:9083' # Metastore Thrift
environment:
HIVE_METASTORE_DRIVER: org.postgresql.Driver
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
HIVE_METASTORE_USER: hive
HIVE_METASTORE_PASSWORD: hive
HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: minio
S3_SECRET_KEY: minio123
S3_PATH_STYLE_ACCESS: "true"
REGION: ""
GOOGLE_CLOUD_KEY_FILE_PATH: ""
AZURE_ADL_CLIENT_ID: ""
AZURE_ADL_CREDENTIAL: ""
AZURE_ADL_REFRESH_URL: ""
AZURE_ABFS_STORAGE_ACCOUNT: ""
AZURE_ABFS_ACCESS_KEY: ""
AZURE_WASB_STORAGE_ACCOUNT: ""
AZURE_ABFS_OAUTH: ""
AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
AZURE_ABFS_OAUTH_CLIENT_ID: ""
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "hive"
depends_on:
- metastore_db

doris-hudi-env:
hostname: doris-hudi-env
container_name: doris-hudi-env
image: 'apache/hadoop:3.3.6'
environment:
LD_LIBRARY_PATH: /opt/doris/be/lib
ports:
- '${DORIS_QUERY_PORT}:9030'
volumes:
- './packages/jdk-17.0.2:/opt/jdk-17.0.2'
- './packages/doris-bin:/opt/doris-bin'
- './scripts:/opt/scripts'
command: sh /opt/scripts/start_doris.sh

minio:
hostname: minio
image: 'minio/minio:RELEASE.2022-05-26T05-48-41Z'
container_name: minio-hudi-storage
ports:
- '${MINIO_SERVICE_PORT}:9000'
environment:
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
command: server /data --console-address ":9001"

# This job will create the "datalake" bucket on Minio
mc-job:
image: 'minio/mc:RELEASE.2022-05-09T04-08-26Z'
volumes:
- './data:/data'
entrypoint: |
/bin/bash -c "
sleep 5;
/usr/bin/mc config --quiet host add myminio http://minio:9000 minio minio123;
/usr/bin/mc mb --quiet myminio/datalake;
/usr/bin/mc mb --quiet myminio/data;
/usr/bin/mc mirror /data myminio/data
"
depends_on:
- minio

networks:
default:
name: hudi-net
external: true
20 changes: 20 additions & 0 deletions samples/datalake/hudi/login-doris.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

docker exec -it spark-hudi-hive mysql -u root -h doris-hudi-env -P 9030
20 changes: 20 additions & 0 deletions samples/datalake/hudi/login-spark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

docker exec -it spark-hudi-hive /opt/scripts/spark-hudi.sh
Loading

0 comments on commit e338cce

Please sign in to comment.