diff --git a/.github/workflows/unitcase-flink-jdk11.yml b/.github/workflows/utitcase-flink-jdk11.yml similarity index 100% rename from .github/workflows/unitcase-flink-jdk11.yml rename to .github/workflows/utitcase-flink-jdk11.yml diff --git a/.github/workflows/unitcase-jdk11.yml b/.github/workflows/utitcase-jdk11.yml similarity index 81% rename from .github/workflows/unitcase-jdk11.yml rename to .github/workflows/utitcase-jdk11.yml index 1baed87f9027..f1d7c25cbe44 100644 --- a/.github/workflows/unitcase-jdk11.yml +++ b/.github/workflows/utitcase-jdk11.yml @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -name: UTCase and ITCase Non Flink on JDK 11 +name: UTCase and ITCase Others on JDK 11 on: issue_comment: @@ -52,6 +52,11 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B clean install -pl '!paimon-e2e-tests,!org.apache.paimon:paimon-hive-connector-3.1' -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone + test_modules="!paimon-e2e-tests,!org.apache.paimon:paimon-hive-connector-3.1," + for suffix in 3.5 3.4 3.3 3.2 common; do + test_modules+="!org.apache.paimon:paimon-spark-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B clean install -pl "${test_modules}" -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/utitcase-spark-scala2.13.yml b/.github/workflows/utitcase-spark-scala2.13.yml new file mode 100644 index 000000000000..05ee066c94bd --- /dev/null +++ b/.github/workflows/utitcase-spark-scala2.13.yml @@ -0,0 +1,63 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +name: UTCase and ITCase Spark on Scala 2.13 + +on: + push: + pull_request: + paths-ignore: + - 'docs/**' + - '**/*.md' + +env: + JDK_VERSION: 8 + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v2 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'adopt' + - name: Build Spark + run: mvn -T 1C -B clean install -DskipTests -Pscala-2.13 + - name: Test Spark + timeout-minutes: 60 + run: | + # run tests with random timezone to find out timezone related bugs + . .github/workflows/utils.sh + jvm_timezone=$(random_timezone) + echo "JVM timezone is set to $jvm_timezone" + test_modules="" + for suffix in common 3.5 3.4 3.3 3.2; do + test_modules+="org.apache.paimon:paimon-spark-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pscala-2.13 + env: + MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/utitcase-spark.yml b/.github/workflows/utitcase-spark.yml new file mode 100644 index 000000000000..0561b3857072 --- /dev/null +++ b/.github/workflows/utitcase-spark.yml @@ -0,0 +1,63 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +name: UTCase and ITCase Spark + +on: + push: + pull_request: + paths-ignore: + - 'docs/**' + - '**/*.md' + +env: + JDK_VERSION: 8 + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v2 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'adopt' + - name: Build Spark + run: mvn -T 1C -B clean install -DskipTests + - name: Test Spark + timeout-minutes: 60 + run: | + # run tests with random timezone to find out timezone related bugs + . .github/workflows/utils.sh + jvm_timezone=$(random_timezone) + echo "JVM timezone is set to $jvm_timezone" + test_modules="" + for suffix in common 3.5 3.4 3.3 3.2; do + test_modules+="org.apache.paimon:paimon-spark-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone + env: + MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml index 431b44332232..7963e7c210cb 100644 --- a/.github/workflows/utitcase.yml +++ b/.github/workflows/utitcase.yml @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -name: UTCase and ITCase Non Flink +name: UTCase and ITCase Others on: push: @@ -53,6 +53,11 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B clean install -pl '!paimon-e2e-tests' -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone + test_modules="!paimon-e2e-tests," + for suffix in 3.5 3.4 3.3 3.2 common; do + test_modules+="!org.apache.paimon:paimon-spark-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B clean install -pl "${test_modules}" -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.gitignore b/.gitignore index 3831eb7e9ef1..25ebf232470a 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,5 @@ target .DS_Store *.ipr *.iws +.java-version dependency-reduced-pom.xml diff --git a/docs/content/flink/consumer-id.md b/docs/content/flink/consumer-id.md new file mode 100644 index 000000000000..89450bf57d7e --- /dev/null +++ b/docs/content/flink/consumer-id.md @@ -0,0 +1,120 @@ +--- +title: "Consumer ID" +weight: 5 +type: docs +aliases: +- /flink/consumer-id.html +--- + + +# Consumer ID + +Consumer id can help you accomplish the following two things: + +1. Safe consumption: When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in + the file system, and if there are consumers that still depend on this snapshot, then this snapshot will not be + deleted by expiration. +2. Resume from breakpoint: When previous job is stopped, the newly started job can continue to consume from the previous + progress without resuming from the state. + +## Usage + +You can specify the `consumer-id` when streaming read table. + +The consumer will prevent expiration of the snapshot. In order to prevent too many snapshots caused by mistakes, +you need to specify `'consumer.expiration-time'` to manage the lifetime of consumers. + +```sql +ALTER TABLE t SET ('consumer.expiration-time' = '1 d'); +``` + +Then, restart streaming write job of this table, expiration of consumers will be triggered in writing job. + +```sql +SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.mode' = 'at-least-once') */; +``` + +## Ignore Progress + +Sometimes, you only want the feature of 'Safe Consumption'. You want to get a new snapshot progress when restarting the +stream consumption job , you can enable the `'consumer.ignore-progress'` option. + +```sql +SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.ignore-progress' = 'true') */; +``` + +The startup of this job will retrieve the snapshot that should be read again. + +## Consumer Mode + +By default, the consumption of snapshots is strictly aligned within the checkpoint to make 'Resume from breakpoint' +feature exactly-once. + +But in some scenarios where you don't need 'Resume from breakpoint', or you don't need strict 'Resume from breakpoint', +you can consider enabling `'consumer.mode' = 'at-least-once'` mode. This mode: +1. Allow readers consume snapshots at different rates and record the slowest snapshot-id among all readers into the + consumer. It doesn't affect the checkpoint time and have good performance. +2. This mode can provide more capabilities, such as watermark alignment. + +{{< hint >}} +About `'consumer.mode'`, since the implementation of `exactly-once` mode and `at-least-once` mode are completely +different, the state of flink is incompatible and cannot be restored from the state when switching modes. +{{< /hint >}} + +## Rest Consumer + +You can reset or delete a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given +consumer ID. First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer +action job. + +Run the following command: + +{{< tabs "reset_consumer" >}} + +{{< tab "Flink SQL" >}} + +```sql +CALL sys.reset_consumer( + `table` => 'database_name.table_name', + consumer_id => 'consumer_id', + next_snapshot_id => +); +-- No next_snapshot_id if you want to delete the consumer +``` +{{< /tab >}} + +{{< tab "Flink Action" >}} + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + reset-consumer \ + --warehouse \ + --database \ + --table \ + --consumer_id \ + [--next_snapshot ] \ + [--catalog_conf [--catalog_conf ...]] + +## No next_snapshot if you want to delete the consumer +``` +{{< /tab >}} + +{{< /tabs >}} diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index ce8c8043ae40..2ac9fb6258cb 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -221,6 +221,20 @@ All available procedures are listed below. CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag') + + expire_tags + + CALL [catalog.]sys.expire_tags('identifier', 'older_than') + + + To expire tags by time. Arguments: +
  • identifier: the target table identifier. Cannot be empty.
  • +
  • older_than: tagCreateTime before which tags will be removed.
  • + + + CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00') + + merge_into @@ -241,10 +255,13 @@ All available procedures are listed below. matched_upsert_setting => 'matchedUpsertSetting',
    not_matched_insert_condition => 'notMatchedInsertCondition',
    not_matched_insert_values => 'notMatchedInsertValues',
    - matched_delete_condition => 'matchedDeleteCondition')

    + matched_delete_condition => 'matchedDeleteCondition',
    + not_matched_by_source_upsert_condition => 'notMatchedBySourceUpsertCondition',
    + not_matched_by_source_upsert_setting => 'notMatchedBySourceUpsertSetting',
    + not_matched_by_source_delete_condition => 'notMatchedBySourceDeleteCondition')

    - To perform "MERGE INTO" syntax. See merge_into action for + To perform "MERGE INTO" syntax. See merge_into action for details of arguments. diff --git a/docs/content/flink/sql-alter.md b/docs/content/flink/sql-alter.md index fe96ec413796..a3dbde541f11 100644 --- a/docs/content/flink/sql-alter.md +++ b/docs/content/flink/sql-alter.md @@ -1,6 +1,6 @@ --- title: "SQL Alter" -weight: 6 +weight: 7 type: docs aliases: - /flink/sql-alter.html diff --git a/docs/content/flink/sql-ddl.md b/docs/content/flink/sql-ddl.md index 363d7475761c..0324e6655689 100644 --- a/docs/content/flink/sql-ddl.md +++ b/docs/content/flink/sql-ddl.md @@ -203,6 +203,9 @@ Paimon will automatically collect the statistics of the data file for speeding u The statistics collector mode can be configured by `'metadata.stats-mode'`, by default is `'truncate(16)'`. You can configure the field level by setting `'fields.{field_name}.stats-mode'`. +For the stats mode of `none`, we suggest that you configure `metadata.stats-dense-store` = `true`, which will +significantly reduce the storage size of the manifest. + ### Field Default Value Paimon table currently supports setting default values for fields in table properties by `'fields.item_id.default-value'`, diff --git a/docs/content/flink/sql-lookup.md b/docs/content/flink/sql-lookup.md index 296b9c862eee..8b95c6cb1b15 100644 --- a/docs/content/flink/sql-lookup.md +++ b/docs/content/flink/sql-lookup.md @@ -1,6 +1,6 @@ --- title: "SQL Lookup" -weight: 5 +weight: 6 type: docs aliases: - /flink/sql-lookup.html diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md index bb26d5d3c66a..89136b0b0635 100644 --- a/docs/content/flink/sql-query.md +++ b/docs/content/flink/sql-query.md @@ -172,70 +172,6 @@ prevent you from reading older incremental data. So, Paimon also provides anothe SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */; ``` -### Consumer ID - -You can specify the `consumer-id` when streaming read table: -```sql -SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.expiration-time' = '1 d', 'consumer.mode' = 'at-least-once') */; -``` - -When stream read Paimon tables, the next snapshot id to be recorded into the file system. This has several advantages: - -1. When previous job is stopped, the newly started job can continue to consume from the previous progress without - resuming from the state. The newly reading will start reading from next snapshot id found in consumer files. - If you don't want this behavior, you can set `'consumer.ignore-progress'` to true. -2. When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system, - and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration. - -{{< hint warning >}} -NOTE 1: The consumer will prevent expiration of the snapshot. You can specify `'consumer.expiration-time'` to manage the -lifetime of consumers. - -NOTE 2: If you don't want to affect the checkpoint time, you need to configure `'consumer.mode' = 'at-least-once'`. -This mode allow readers consume snapshots at different rates and record the slowest snapshot-id among all readers into -the consumer. This mode can provide more capabilities, such as watermark alignment. - -NOTE 3: About `'consumer.mode'`, since the implementation of `exactly-once` mode and `at-least-once` mode are completely -different, the state of flink is incompatible and cannot be restored from the state when switching modes. -{{< /hint >}} - -You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID. -First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer action job. - -Run the following command: - -{{< tabs "reset_consumer" >}} - -{{< tab "Flink SQL" >}} - -```sql -CALL sys.reset_consumer( - `table` => 'database_name.table_name', - consumer_id => 'consumer_id', - next_snapshot_id -> -); -``` -{{< /tab >}} - -{{< tab "Flink Action" >}} - -```bash -/bin/flink run \ - /path/to/paimon-flink-action-{{< version >}}.jar \ - reset-consumer \ - --warehouse \ - --database \ - --table \ - --consumer_id \ - [--next_snapshot ] \ - [--catalog_conf [--catalog_conf ...]] -``` -{{< /tab >}} - -{{< /tabs >}} - -please don't specify --next_snapshot parameter if you want to delete the consumer. - ### Read Overwrite Streaming reading will ignore the commits generated by `INSERT OVERWRITE` by default. If you want to read the diff --git a/docs/content/maintenance/system-tables.md b/docs/content/maintenance/system-tables.md index 05a0c0db7324..a94031e48c5b 100644 --- a/docs/content/maintenance/system-tables.md +++ b/docs/content/maintenance/system-tables.md @@ -283,6 +283,17 @@ SELECT * FROM my_table$manifests /*+ OPTIONS('scan.tag-name'='tag1') */; +--------------------------------+-------------+------------------+-------------------+---------------+ 1 rows in set */ + +- You can also query the manifest with specified timestamp in unix milliseconds +SELECT * FROM my_table$manifests /*+ OPTIONS('scan.timestamp-millis'='1678883047356') */; +/* ++--------------------------------+-------------+------------------+-------------------+---------------+ +| file_name | file_size | num_added_files | num_deleted_files | schema_id | ++--------------------------------+-------------+------------------+-------------------+---------------+ +| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 | ++--------------------------------+-------------+------------------+-------------------+---------------+ +1 rows in set +*/ ``` ### Aggregation fields Table diff --git a/docs/content/migration/iceberg-compatibility.md b/docs/content/migration/iceberg-compatibility.md new file mode 100644 index 000000000000..466f672a1634 --- /dev/null +++ b/docs/content/migration/iceberg-compatibility.md @@ -0,0 +1,395 @@ +--- +title: "Iceberg Compatibility" +weight: 4 +type: docs +aliases: +- /migration/iceberg-compatibility.html +--- + + +# Iceberg Compatibility + +Paimon supports generating Iceberg compatible metadata, +so that Paimon tables can be consumed directly by Iceberg readers. + +## Enable Iceberg Compatibility + +Set the following table options, so that Paimon tables can generate Iceberg compatible metadata. + + + + + + + + + + + + + + + + + + +
    OptionDefaultTypeDescription
    metadata.iceberg.storage
    disabledEnum + When set, produce Iceberg metadata after a snapshot is committed, so that Iceberg readers can read Paimon's raw data files. +
      +
    • disabled: Disable Iceberg compatibility support.
    • +
    • table-location: Store Iceberg metadata in each table's directory.
    • +
    • hadoop-catalog: Store Iceberg metadata in a separate directory. This directory can be specified as the warehouse directory of an Iceberg Hadoop catalog.
    • +
    +
    + +For most SQL users, we recommend setting `'metadata.iceberg.storage' = 'hadoop-catalog'`, +so that all tables can be visited as an Iceberg warehouse. +For Iceberg Java API users, you might consider setting `'metadata.iceberg.storage' = 'table-location'`, +so you can visit each table with its table path. + +## Example: Query Paimon Append Only Tables with Iceberg Connector + +Let's walk through a simple example, where we query Paimon tables with Iceberg connectors in Flink and Spark. +Before trying out this example, make sure that your compute engine already supports Iceberg. +Please refer to Iceberg's document if you haven't set up Iceberg. +* Flink: [Preparation when using Flink SQL Client](https://iceberg.apache.org/docs/latest/flink/#preparation-when-using-flink-sql-client) +* Spark: [Using Iceberg in Spark 3](https://iceberg.apache.org/docs/latest/spark-getting-started/#using-iceberg-in-spark-3) + +Let's now create a Paimon append only table with Iceberg compatibility enabled and insert some data. + +{{< tabs "create-paimon-append-only-table" >}} + +{{< tab "Flink SQL" >}} +```sql +CREATE CATALOG paimon_catalog WITH ( + 'type' = 'paimon', + 'warehouse' = '' +); + +CREATE TABLE paimon_catalog.`default`.cities ( + country STRING, + name STRING +) WITH ( + 'metadata.iceberg.storage' = 'hadoop-catalog' +); + +INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg'); +``` +{{< /tab >}} + +{{< tab "Spark SQL" >}} +Start `spark-sql` with the following command line. + +```bash +spark-sql --jars \ + --conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \ + --conf spark.sql.catalog.paimon_catalog.warehouse=/tmp/sparkware \ + --packages org.apache.iceberg:iceberg-spark-runtime- \ + --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.iceberg_catalog.type=hadoop \ + --conf spark.sql.catalog.iceberg_catalog.warehouse=/iceberg \ + --conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result + --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +``` + +Run the following Spark SQL to create Paimon table and insert data. + +```sql +CREATE TABLE paimon_catalog.`default`.cities ( + country STRING, + name STRING +) TBLPROPERTIES ( + 'metadata.iceberg.storage' = 'hadoop-catalog' +); + +INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg'); +``` +{{< /tab >}} + +{{< /tabs >}} + +Now let's query this Paimon table with Iceberg connector. + +{{< tabs "query-paimon-append-only-table" >}} + +{{< tab "Flink SQL" >}} +```sql +CREATE TABLE cities ( + country STRING, + name STRING +) WITH ( + 'connector' = 'iceberg', + 'catalog-type' = 'hadoop', + 'catalog-name' = 'test', + 'catalog-database' = 'default', + 'warehouse' = '/iceberg' +); + +SELECT * FROM cities WHERE country = 'germany'; +/* ++----+--------------------------------+--------------------------------+ +| op | country | name | ++----+--------------------------------+--------------------------------+ +| +I | germany | berlin | +| +I | germany | hamburg | ++----+--------------------------------+--------------------------------+ +*/ +``` +{{< /tab >}} + +{{< tab "Spark SQL" >}} +```sql +SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany'; +/* +germany berlin +germany hamburg +*/ +``` +{{< /tab >}} + +{{< /tabs >}} + +Let's insert more data and query again. + +{{< tabs "query-paimon-append-only-table-again" >}} + +{{< tab "Flink SQL" >}} +```sql +INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich'); + +SELECT * FROM cities WHERE country = 'germany'; +/* ++----+--------------------------------+--------------------------------+ +| op | country | name | ++----+--------------------------------+--------------------------------+ +| +I | germany | munich | +| +I | germany | berlin | +| +I | germany | hamburg | ++----+--------------------------------+--------------------------------+ +*/ +``` +{{< /tab >}} + +{{< tab "Spark SQL" >}} +```sql +INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich'); + +SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany'; +/* +germany munich +germany berlin +germany hamburg +*/ +``` +{{< /tab >}} + +{{< /tabs >}} + +## Example: Query Paimon Primary Key Tables with Iceberg Connector + +{{< tabs "paimon-primary-key-table" >}} + +{{< tab "Flink SQL" >}} +```sql +CREATE CATALOG paimon_catalog WITH ( + 'type' = 'paimon', + 'warehouse' = '' +); + +CREATE TABLE paimon_catalog.`default`.orders ( + order_id BIGINT, + status STRING, + payment DOUBLE, + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'metadata.iceberg.storage' = 'hadoop-catalog', + 'compaction.optimization-interval' = '1ms' -- ATTENTION: this option is only for testing, see "timeliness" section below for more information +); + +INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'SUBMITTED', CAST(NULL AS DOUBLE)), (2, 'COMPLETED', 200.0), (3, 'SUBMITTED', CAST(NULL AS DOUBLE)); + +CREATE TABLE orders ( + order_id BIGINT, + status STRING, + payment DOUBLE +) WITH ( + 'connector' = 'iceberg', + 'catalog-type' = 'hadoop', + 'catalog-name' = 'test', + 'catalog-database' = 'default', + 'warehouse' = '/iceberg' +); + +SELECT * FROM orders WHERE status = 'COMPLETED'; +/* ++----+----------------------+--------------------------------+--------------------------------+ +| op | order_id | status | payment | ++----+----------------------+--------------------------------+--------------------------------+ +| +I | 2 | COMPLETED | 200.0 | ++----+----------------------+--------------------------------+--------------------------------+ +*/ + +INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'COMPLETED', 100.0); + +SELECT * FROM orders WHERE status = 'COMPLETED'; +/* ++----+----------------------+--------------------------------+--------------------------------+ +| op | order_id | status | payment | ++----+----------------------+--------------------------------+--------------------------------+ +| +I | 1 | COMPLETED | 100.0 | +| +I | 2 | COMPLETED | 200.0 | ++----+----------------------+--------------------------------+--------------------------------+ +*/ +``` +{{< /tab >}} + +{{< tab "Spark SQL" >}} +Start `spark-sql` with the following command line. + +```bash +spark-sql --jars \ + --conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \ + --conf spark.sql.catalog.paimon_catalog.warehouse=/tmp/sparkware \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1 \ + --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.iceberg_catalog.type=hadoop \ + --conf spark.sql.catalog.iceberg_catalog.warehouse=/iceberg \ + --conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result + --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +``` + +Run the following Spark SQL to create Paimon table, insert/update data, and query with Iceberg catalog. + +```sql +CREATE TABLE paimon_catalog.`default`.orders ( + order_id BIGINT, + status STRING, + payment DOUBLE +) TBLPROPERTIES ( + 'primary-key' = 'order_id', + 'metadata.iceberg.storage' = 'hadoop-catalog', + 'compaction.optimization-interval' = '1ms' -- ATTENTION: this option is only for testing, see "timeliness" section below for more information +); + +INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'SUBMITTED', CAST(NULL AS DOUBLE)), (2, 'COMPLETED', 200.0), (3, 'SUBMITTED', CAST(NULL AS DOUBLE)); + +SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED'; +/* +2 COMPLETED 200.0 +*/ + +INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'COMPLETED', 100.0); + +SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED'; +/* +2 COMPLETED 200.0 +1 COMPLETED 100.0 +*/ +``` +{{< /tab >}} + +{{< /tabs >}} + +### Timeliness + +Paimon primary key tables organize data files as LSM trees, so data files must be merged in memory before querying. +However, Iceberg readers are not able to merge data files, so they can only query data files on the highest level of LSM trees. +Data files on the highest level are produced by the full compaction process. +So **to conclude, for primary key tables, Iceberg readers can only query data after full compaction**. + +By default, there is no guarantee on how frequently Paimon will perform full compaction. +You can configure the following table option, so that Paimon is forced to perform full compaction after several commits. + + + + + + + + + + + + + + + + + + + + + + + + +
    OptionDefaultTypeDescription
    compaction.optimization-interval
    (none)DurationFull compaction will be constantly triggered per time interval. First compaction after the job starts will always be full compaction.
    full-compaction.delta-commits
    (none)IntegerFull compaction will be constantly triggered after delta commits. Only implemented in Flink.
    + +Note that full compaction is a resource-consuming process, so the value of this table option should not be too small. +We recommend full compaction to be performed once or twice per hour. + +## Supported Types + +Paimon Iceberg compatibility currently supports the following data types. + +| Paimon Data Type | Iceberg Data Type | +|----------------------------|-------------------| +| `BOOLEAN` | `boolean` | +| `INT` | `int` | +| `BIGINT` | `long` | +| `FLOAT` | `float` | +| `DOUBLE` | `double` | +| `DECIMAL` | `decimal` | +| `CHAR` | `string` | +| `VARCHAR` | `string` | +| `BINARY` | `binary` | +| `VARBINARY` | `binary` | +| `DATE` | `date` | +| `TIMESTAMP`* | `timestamp` | +| `TIMESTAMP_LTZ`* | `timestamptz` | + +*: `TIMESTAMP` and `TIMESTAMP_LTZ` type only support precision from 4 to 6 + +## Other Related Table Options + + + + + + + + + + + + + + + + + + + + + + + + +
    OptionDefaultTypeDescription
    metadata.iceberg.compaction.min.file-num
    10IntegerMinimum number of Iceberg metadata files to trigger metadata compaction.
    metadata.iceberg.compaction.max.file-num
    50IntegerIf number of small Iceberg metadata files exceeds this limit, always trigger metadata compaction regardless of their total size.
    diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md b/docs/content/primary-key-table/merge-engine/aggregation.md index a9b9b5a38cb0..fa667ed35687 100644 --- a/docs/content/primary-key-table/merge-engine/aggregation.md +++ b/docs/content/primary-key-table/merge-engine/aggregation.md @@ -3,7 +3,7 @@ title: "Aggregation" weight: 3 type: docs aliases: -- /cdc-ingestion/merge-engin/aggregation.html +- /primary-key-table/merge-engin/aggregation.html --- @@ -338,6 +343,19 @@ under the License. + + + scala-2.13 + + 2.13 + ${scala213.version} + + + + scala-2.13 + + + @@ -615,10 +633,10 @@ under the License. - org.apache.flink:flink-table-planner_${scala.binary.version} + org.apache.flink:flink-table-planner_${flink.scala.binary.version} - org.apache.flink:flink-table-planner_${scala.binary.version}:*:*:test + org.apache.flink:flink-table-planner_${flink.scala.binary.version}:*:*:test Direct dependencies on flink-table-planner are not allowed.