diff --git a/.github/workflows/publish_snapshot.yml b/.github/workflows/publish_snapshot.yml
index 7d370814da94..c7f97cab991f 100644
--- a/.github/workflows/publish_snapshot.yml
+++ b/.github/workflows/publish_snapshot.yml
@@ -64,6 +64,6 @@ jobs:
echo "$ASF_PASSWORD" >> $tmp_settings
echo "" >> $tmp_settings
- mvn --settings $tmp_settings clean deploy -Dgpg.skip -Drat.skip -DskipTests -Papache-release
+ mvn --settings $tmp_settings clean deploy -Dgpg.skip -Drat.skip -DskipTests -Papache-release,spark3
rm $tmp_settings
diff --git a/.github/workflows/unitcase-flink-jdk11.yml b/.github/workflows/utitcase-flink-jdk11.yml
similarity index 100%
rename from .github/workflows/unitcase-flink-jdk11.yml
rename to .github/workflows/utitcase-flink-jdk11.yml
diff --git a/.github/workflows/unitcase-jdk11.yml b/.github/workflows/utitcase-jdk11.yml
similarity index 82%
rename from .github/workflows/unitcase-jdk11.yml
rename to .github/workflows/utitcase-jdk11.yml
index 1baed87f9027..878ce5f96898 100644
--- a/.github/workflows/unitcase-jdk11.yml
+++ b/.github/workflows/utitcase-jdk11.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: UTCase and ITCase Non Flink on JDK 11
+name: UTCase and ITCase Others on JDK 11
on:
issue_comment:
@@ -52,6 +52,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B clean install -pl '!paimon-e2e-tests,!org.apache.paimon:paimon-hive-connector-3.1' -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone
+ test_modules="!paimon-e2e-tests,!org.apache.paimon:paimon-hive-connector-3.1,"
+ for suffix in 3.5 3.4 3.3 3.2 ut; do
+ test_modules+="!org.apache.paimon:paimon-spark-${suffix},"
+ done
+ test_modules="${test_modules%,}"
+ mvn -T 1C -B clean install -pl "${test_modules}" -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-spark-3.x.yml b/.github/workflows/utitcase-spark-3.x.yml
new file mode 100644
index 000000000000..2d3df5f4d005
--- /dev/null
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -0,0 +1,63 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+name: UTCase and ITCase Spark 3.x
+
+on:
+ push:
+ pull_request:
+ paths-ignore:
+ - 'docs/**'
+ - '**/*.md'
+
+env:
+ JDK_VERSION: 8
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
+ cancel-in-progress: true
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v2
+ - name: Set up JDK ${{ env.JDK_VERSION }}
+ uses: actions/setup-java@v2
+ with:
+ java-version: ${{ env.JDK_VERSION }}
+ distribution: 'adopt'
+ - name: Build Spark
+ run: mvn -T 1C -B clean install -DskipTests
+ - name: Test Spark
+ timeout-minutes: 60
+ run: |
+ # run tests with random timezone to find out timezone related bugs
+ . .github/workflows/utils.sh
+ jvm_timezone=$(random_timezone)
+ echo "JVM timezone is set to $jvm_timezone"
+ test_modules=""
+ for suffix in ut 3.5 3.4 3.3 3.2; do
+ test_modules+="org.apache.paimon:paimon-spark-${suffix},"
+ done
+ test_modules="${test_modules%,}"
+ mvn -T 1C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone
+ env:
+ MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-spark-4.x.yml b/.github/workflows/utitcase-spark-4.x.yml
new file mode 100644
index 000000000000..c58fd7c03be2
--- /dev/null
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -0,0 +1,63 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+name: UTCase and ITCase Spark 4.x
+
+on:
+ push:
+ pull_request:
+ paths-ignore:
+ - 'docs/**'
+ - '**/*.md'
+
+env:
+ JDK_VERSION: 17
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
+ cancel-in-progress: true
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v2
+ - name: Set up JDK ${{ env.JDK_VERSION }}
+ uses: actions/setup-java@v2
+ with:
+ java-version: ${{ env.JDK_VERSION }}
+ distribution: 'adopt'
+ - name: Build Spark
+ run: mvn -T 1C -B clean install -DskipTests -Pspark4
+ - name: Test Spark
+ timeout-minutes: 60
+ run: |
+ # run tests with random timezone to find out timezone related bugs
+ . .github/workflows/utils.sh
+ jvm_timezone=$(random_timezone)
+ echo "JVM timezone is set to $jvm_timezone"
+ test_modules=""
+ for suffix in ut 4.0; do
+ test_modules+="org.apache.paimon:paimon-spark-${suffix},"
+ done
+ test_modules="${test_modules%,}"
+ mvn -T 1C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4
+ env:
+ MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml
index 431b44332232..8aa33f5b8218 100644
--- a/.github/workflows/utitcase.yml
+++ b/.github/workflows/utitcase.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: UTCase and ITCase Non Flink
+name: UTCase and ITCase Others
on:
push:
@@ -53,6 +53,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B clean install -pl '!paimon-e2e-tests' -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone
+ test_modules="!paimon-e2e-tests,"
+ for suffix in 3.5 3.4 3.3 3.2 ut; do
+ test_modules+="!org.apache.paimon:paimon-spark-${suffix},"
+ done
+ test_modules="${test_modules%,}"
+ mvn -T 1C -B clean install -pl "${test_modules}" -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 3831eb7e9ef1..25ebf232470a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,4 +17,5 @@ target
.DS_Store
*.ipr
*.iws
+.java-version
dependency-reduced-pom.xml
diff --git a/docs/content/append-table/query.md b/docs/content/append-table/query-performance.md
similarity index 94%
rename from docs/content/append-table/query.md
rename to docs/content/append-table/query-performance.md
index 4f7e0ab9f66f..7ec745468ef5 100644
--- a/docs/content/append-table/query.md
+++ b/docs/content/append-table/query-performance.md
@@ -1,9 +1,9 @@
---
-title: "Query"
+title: "Query Performance"
weight: 3
type: docs
aliases:
-- /append-table/query.html
+- /append-table/query-performance.html
---
-# Query
+# Query Performance
## Data Skipping By Order
@@ -57,8 +57,6 @@ multiple columns.
Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup
scenario. Using a bitmap may consume more space but can result in greater accuracy.
-Currently, file index is only supported in append-only table.
-
`Bloom Filter`:
* `file-index.bloom-filter.columns`: specify the columns that need bloom filter index.
* `file-index.bloom-filter..fpp` to config false positive probability.
@@ -67,6 +65,9 @@ Currently, file index is only supported in append-only table.
`Bitmap`:
* `file-index.bitmap.columns`: specify the columns that need bitmap index.
+`Bit-Slice Index Bitmap`
+* `file-index.bsi.columns`: specify the columns that need bsi index.
+
More filter types will be supported...
If you want to add file index to existing table, without any rewrite, you can use `rewrite_file_index` procedure. Before
diff --git a/docs/content/flink/cdc-ingestion/_index.md b/docs/content/cdc-ingestion/_index.md
similarity index 90%
rename from docs/content/flink/cdc-ingestion/_index.md
rename to docs/content/cdc-ingestion/_index.md
index 0c76825b0f6a..b3d4dce2a0c7 100644
--- a/docs/content/flink/cdc-ingestion/_index.md
+++ b/docs/content/cdc-ingestion/_index.md
@@ -1,7 +1,9 @@
---
title: CDC Ingestion
+icon:
+bold: true
bookCollapseSection: true
-weight: 95
+weight: 91
---
+
+# Catalog
+
+Paimon provides a Catalog abstraction to manage the table of contents and metadata. The Catalog abstraction provides
+a series of ways to help you better integrate with computing engines. We always recommend that you use Catalog to
+access the Paimon table.
+
+## Catalogs
+
+Paimon catalogs currently support three types of metastores:
+
+* `filesystem` metastore (default), which stores both metadata and table files in filesystems.
+* `hive` metastore, which additionally stores metadata in Hive metastore. Users can directly access the tables from Hive.
+* `jdbc` metastore, which additionally stores metadata in relational databases such as MySQL, Postgres, etc.
+
+## Filesystem Catalog
+
+Metadata and table files are stored under `hdfs:///path/to/warehouse`.
+
+```sql
+-- Flink SQL
+CREATE CATALOG my_catalog WITH (
+ 'type' = 'paimon',
+ 'warehouse' = 'hdfs:///path/to/warehouse'
+);
+```
+
+## Hive Catalog
+
+By using Paimon Hive catalog, changes to the catalog will directly affect the corresponding Hive metastore. Tables
+created in such catalog can also be accessed directly from Hive. Metadata and table files are stored under
+`hdfs:///path/to/warehouse`. In addition, schema is also stored in Hive metastore.
+
+```sql
+-- Flink SQL
+CREATE CATALOG my_hive WITH (
+ 'type' = 'paimon',
+ 'metastore' = 'hive',
+ -- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
+);
+```
+
+By default, Paimon does not synchronize newly created partitions into Hive metastore. Users will see an unpartitioned
+table in Hive. Partition push-down will be carried out by filter push-down instead.
+
+If you want to see a partitioned table in Hive and also synchronize newly created partitions into Hive metastore,
+please set the table option `metastore.partitioned-table` to true.
+
+## JDBC Catalog
+
+By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as SQLite,
+MySQL, postgres, etc.
+
+```sql
+-- Flink SQL
+CREATE CATALOG my_jdbc WITH (
+ 'type' = 'paimon',
+ 'metastore' = 'jdbc',
+ 'uri' = 'jdbc:mysql://:/',
+ 'jdbc.user' = '...',
+ 'jdbc.password' = '...',
+ 'catalog-key'='jdbc',
+ 'warehouse' = 'hdfs:///path/to/warehouse'
+);
+```
diff --git a/docs/content/concepts/data-types.md b/docs/content/concepts/data-types.md
new file mode 100644
index 000000000000..b33dcd428399
--- /dev/null
+++ b/docs/content/concepts/data-types.md
@@ -0,0 +1,179 @@
+---
+title: "Data Types"
+weight: 7
+type: docs
+aliases:
+- /concepts/data-types.html
+---
+
+
+# Data Types
+
+A data type describes the logical type of a value in the table ecosystem. It can be used to declare input and/or output types of operations.
+
+All data types supported by Paimon are as follows:
+
+
+
+
+
DataType
+
Description
+
+
+
+
+
BOOLEAN
+
Data type of a boolean with a (possibly) three-valued logic of TRUE, FALSE, and UNKNOWN.
+
+
+
CHAR
+ CHAR(n)
+
+
Data type of a fixed-length character string.
+ The type can be declared using CHAR(n) where n is the number of code points. n must have a value between 1 and 2,147,483,647 (both inclusive). If no length is specified, n is equal to 1.
+
+
+
+
VARCHAR
+ VARCHAR(n)
+ STRING
+
+
Data type of a variable-length character string.
+ The type can be declared using VARCHAR(n) where n is the maximum number of code points. n must have a value between 1 and 2,147,483,647 (both inclusive). If no length is specified, n is equal to 1.
+ STRING is a synonym for VARCHAR(2147483647).
+
+
+
+
BINARY
+ BINARY(n)
+
+
Data type of a fixed-length binary string (=a sequence of bytes).
+ The type can be declared using BINARY(n) where n is the number of bytes. n must have a value between 1 and 2,147,483,647 (both inclusive). If no length is specified, n is equal to 1.
+
+
+
+
VARBINARY
+ VARBINARY(n)
+ BYTES
+
+
Data type of a variable-length binary string (=a sequence of bytes).
+ The type can be declared using VARBINARY(n) where n is the maximum number of bytes. n must have a value between 1 and 2,147,483,647 (both inclusive). If no length is specified, n is equal to 1.
+ BYTES is a synonym for VARBINARY(2147483647).
+
+
+
+
DECIMAL
+ DECIMAL(p)
+ DECIMAL(p, s)
+
+
Data type of a decimal number with fixed precision and scale.
+ The type can be declared using DECIMAL(p, s) where p is the number of digits in a number (precision) and s is the number of digits to the right of the decimal point in a number (scale). p must have a value between 1 and 38 (both inclusive). s must have a value between 0 and p (both inclusive). The default value for p is 10. The default value for s is 0.
+
+
+
+
TINYINT
+
Data type of a 1-byte signed integer with values from -128 to 127.
+
+
+
SMALLINT
+
Data type of a 2-byte signed integer with values from -32,768 to 32,767.
+
+
+
INT
+
Data type of a 4-byte signed integer with values from -2,147,483,648 to 2,147,483,647.
+
+
+
BIGINT
+
Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807.
+
+
+
FLOAT
+
Data type of a 4-byte single precision floating point number.
+ Compared to the SQL standard, the type does not take parameters.
+
+
+
+
DOUBLE
+
Data type of an 8-byte double precision floating point number.
+
+
+
DATE
+
Data type of a date consisting of year-month-day with values ranging from 0000-01-01 to 9999-12-31.
+ Compared to the SQL standard, the range starts at year 0000.
+
+
+
+
TIME
+ TIME(p)
+
+
Data type of a time without time zone consisting of hour:minute:second[.fractional] with up to nanosecond precision and values ranging from 00:00:00.000000000 to 23:59:59.999999999.
+ The type can be declared using TIME(p) where p is the number of digits of fractional seconds (precision). p must have a value between 0 and 9 (both inclusive). If no precision is specified, p is equal to 0.
+
+
+
+
TIMESTAMP
+ TIMESTAMP(p)
+
+
Data type of a timestamp without time zone consisting of year-month-day hour:minute:second[.fractional] with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000 to 9999-12-31 23:59:59.999999999.
+ The type can be declared using TIMESTAMP(p) where p is the number of digits of fractional seconds (precision). p must have a value between 0 and 9 (both inclusive). If no precision is specified, p is equal to 6.
+
+
+
+
TIMESTAMP WITH TIME ZONE
+ TIMESTAMP(p) WITH TIME ZONE
+
+
Data type of a timestamp with time zone consisting of year-month-day hour:minute:second[.fractional] zone with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000 +14:59 to 9999-12-31 23:59:59.999999999 -14:59.
+ This type fills the gap between time zone free and time zone mandatory timestamp types by allowing the interpretation of UTC timestamps according to the configured session time zone. A conversion from and to int describes the number of seconds since epoch. A conversion from and to long describes the number of milliseconds since epoch.
+
+
+
+
ARRAY<t>
+
Data type of an array of elements with same subtype.
+ Compared to the SQL standard, the maximum cardinality of an array cannot be specified but is fixed at 2,147,483,647. Also, any valid type is supported as a subtype.
+ The type can be declared using ARRAY<t> where t is the data type of the contained elements.
+
+
+
+
MAP<kt, vt>
+
Data type of an associative array that maps keys (including NULL) to values (including NULL). A map cannot contain duplicate keys; each key can map to at most one value.
+ There is no restriction of element types; it is the responsibility of the user to ensure uniqueness.
+ The type can be declared using MAP<kt, vt> where kt is the data type of the key elements and vt is the data type of the value elements.
+
+
+
+
MULTISET<t>
+
Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its elements with a common subtype. Each unique value (including NULL) is mapped to some multiplicity.
+ There is no restriction of element types; it is the responsibility of the user to ensure uniqueness.
+ The type can be declared using MULTISET<t> where t is the data type of the contained elements.
+
+ A field consists of a field name, field type, and an optional description. The most specific type of a row of a table is a row type. In this case, each column of the row corresponds to the field of the row type that has the same ordinal position as the column.
+ Compared to the SQL standard, an optional field description simplifies the handling with complex structures.
+ A row type is similar to the STRUCT type known from other non-standard-compliant frameworks.
+ The type can be declared using ROW<n0 t0 'd0', n1 t1 'd1', ...> where n is the unique name of a field, t is the logical type of a field, d is the description of a field.
+
+
+
+
diff --git a/docs/content/concepts/spec/_index.md b/docs/content/concepts/spec/_index.md
index 3bd8e657ffbc..cc148d6a8b53 100644
--- a/docs/content/concepts/spec/_index.md
+++ b/docs/content/concepts/spec/_index.md
@@ -1,7 +1,7 @@
---
title: Specification
bookCollapseSection: true
-weight: 4
+weight: 8
---
+
+# Table Types
+
+Paimon supports table types:
+
+1. table with pk: Paimon Data Table with Primary key
+2. table w/o pk: Paimon Data Table without Primary key
+3. view: metastore required, views in SQL are a kind of virtual table
+4. format-table: file format table refers to a directory that contains multiple files of the same format, where
+ operations on this table allow for reading or writing to these files, compatible with Hive tables
+5. object table: provides metadata indexes for unstructured data objects in the specified Object Storage directory.
+6. materialized-table: aimed at simplifying both batch and stream data pipelines, providing a consistent development
+ experience, see [Flink Materialized Table](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/)
+
+## Table with PK
+
+See [Paimon with Primary key]({{< ref "primary-key-table/overview" >}}).
+
+Primary keys consist of a set of columns that contain unique values for each record. Paimon enforces data ordering by
+sorting the primary key within each bucket, allowing streaming update and streaming changelog read.
+
+The definition of primary key is similar to that of standard SQL, as it ensures that there is only one data entry for
+the same primary key during batch queries.
+
+{{< tabs "primary-table" >}}
+{{< tab "Flink SQL" >}}
+
+```sql
+CREATE TABLE my_table (
+ a INT PRIMARY KEY NOT ENFORCED,
+ b STRING
+) WITH (
+ 'bucket'='8'
+)
+```
+{{< /tab >}}
+
+{{< tab "Spark SQL" >}}
+
+```sql
+CREATE TABLE my_table (
+ a INT,
+ b STRING
+) TBLPROPERTIES (
+ 'primary-key' = 'a',
+ 'bucket' = '8'
+)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Table w/o PK
+
+See [Paimon w/o Primary key]({{< ref "append-table/overview" >}}).
+
+If a table does not have a primary key defined, it is an append table. Compared to the primary key table, it does not
+have the ability to directly receive changelogs. It cannot be directly updated with data through streaming upsert. It
+can only receive incoming data from append data.
+
+However, it also supports batch sql: DELETE, UPDATE, and MERGE-INTO.
+
+```sql
+CREATE TABLE my_table (
+ a INT,
+ b STRING
+)
+```
+
+## View
+
+View is supported when the metastore can support view, for example, hive metastore. If you don't have metastore, you
+can only use temporary View, which only exists in the current session. This chapter mainly describes persistent views.
+
+View will currently save the original SQL. If you need to use View across engines, you can write a cross engine
+SQL statement. For example:
+
+{{< tabs "view" >}}
+{{< tab "Flink SQL" >}}
+
+```sql
+CREATE VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
+ [( columnName [, columnName ]* )] [COMMENT view_comment]
+AS query_expression;
+
+DROP VIEW [IF EXISTS] [catalog_name.][db_name.]view_name;
+
+SHOW VIEWS;
+
+SHOW CREATE VIEW my_view;
+```
+{{< /tab >}}
+
+{{< tab "Spark SQL" >}}
+
+```sql
+CREATE [OR REPLACE] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
+ [( columnName [, columnName ]* )] [COMMENT view_comment]
+AS query_expression;
+
+DROP VIEW [IF EXISTS] [catalog_name.][db_name.]view_name;
+
+SHOW VIEWS;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Format Table
+
+Format table is supported when the metastore can support format table, for example, hive metastore. The Hive tables
+inside the metastore will be mapped to Paimon's Format Table for computing engines (Spark, Hive, Flink) to read and write.
+
+Format table refers to a directory that contains multiple files of the same format, where operations on this table
+allow for reading or writing to these files, facilitating the retrieval of existing data and the addition of new files.
+
+Partitioned file format table just like the standard hive format. Partitions are discovered and inferred based on
+directory structure.
+
+Format Table is enabled by default, you can disable it by configuring Catalog option: `'format-table.enabled'`.
+
+Currently only support `CSV`, `Parquet`, `ORC` formats.
+
+{{< tabs "format-table" >}}
+{{< tab "Flink-CSV" >}}
+
+```sql
+CREATE TABLE my_csv_table (
+ a INT,
+ b STRING
+) WITH (
+ 'type'='format-table',
+ 'file.format'='csv',
+ 'field-delimiter'=','
+)
+```
+{{< /tab >}}
+
+{{< tab "Spark-CSV" >}}
+
+```sql
+CREATE TABLE my_csv_table (
+ a INT,
+ b STRING
+) USING csv OPTIONS ('field-delimiter' ',')
+```
+
+{{< /tab >}}
+
+{{< tab "Flink-Parquet" >}}
+
+```sql
+CREATE TABLE my_parquet_table (
+ a INT,
+ b STRING
+) WITH (
+ 'type'='format-table',
+ 'file.format'='parquet'
+)
+```
+{{< /tab >}}
+
+{{< tab "Spark-Parquet" >}}
+
+```sql
+CREATE TABLE my_parquet_table (
+ a INT,
+ b STRING
+) USING parquet
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Object Table
+
+Object Table provides metadata indexes for unstructured data objects in the specified Object Storage storage directory.
+Object tables allow users to analyze unstructured data in Object Storage:
+
+1. Use Python API to manipulate these unstructured data, such as converting images to PDF format.
+2. Model functions can also be used to perform inference, and then the results of these operations can be concatenated
+ with other structured data in the Catalog.
+
+The object table is managed by Catalog and can also have access permissions and the ability to manage blood relations.
+
+{{< tabs "object-table" >}}
+
+{{< tab "Flink-SQL" >}}
+
+```sql
+-- Create Object Table
+
+CREATE TABLE `my_object_table` WITH (
+ 'type' = 'object-table',
+ 'object-location' = 'oss://my_bucket/my_location'
+);
+
+-- Refresh Object Table
+
+CALL sys.refresh_object_table('mydb.my_object_table');
+
+-- Query Object Table
+
+SELECT * FROM `my_object_table`;
+
+-- Query Object Table with Time Travel
+
+SELECT * FROM `my_object_table` /*+ OPTIONS('scan.snapshot-id' = '1') */;
+```
+
+{{< /tab >}}
+
+{{< tab "Spark-SQL" >}}
+
+```sql
+-- Create Object Table
+
+CREATE TABLE `my_object_table` TBLPROPERTIES (
+ 'type' = 'object-table',
+ 'object-location' = 'oss://my_bucket/my_location'
+);
+
+-- Refresh Object Table
+
+CALL sys.refresh_object_table('mydb.my_object_table');
+
+-- Query Object Table
+
+SELECT * FROM `my_object_table`;
+
+-- Query Object Table with Time Travel
+
+SELECT * FROM `my_object_table` VERSION AS OF 1;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Materialized Table
+
+Materialized Table aimed at simplifying both batch and stream data pipelines, providing a consistent development
+experience, see [Flink Materialized Table](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/).
+
+Now only Flink SQL integrate to Materialized Table, we plan to support it in Spark SQL too.
+
+```sql
+CREATE MATERIALIZED TABLE continuous_users_shops
+PARTITIONED BY (ds)
+FRESHNESS = INTERVAL '30' SECOND
+AS SELECT
+ user_id,
+ ds,
+ SUM (payment_amount_cents) AS payed_buy_fee_sum,
+ SUM (1) AS PV
+FROM (
+ SELECT user_id, order_created_at AS ds, payment_amount_cents
+ FROM json_source
+ ) AS tmp
+GROUP BY user_id, ds;
+```
diff --git a/docs/content/engines/doris.md b/docs/content/engines/doris.md
index 634e7f7c71da..6d22bc376a88 100644
--- a/docs/content/engines/doris.md
+++ b/docs/content/engines/doris.md
@@ -73,13 +73,13 @@ See [Apache Doris Website](https://doris.apache.org/docs/lakehouse/datalake-anal
1. Query Paimon table with full qualified name
- ```
+ ```sql
SELECT * FROM paimon_hdfs.paimon_db.paimon_table;
```
2. Switch to Paimon Catalog and query
- ```
+ ```sql
SWITCH paimon_hdfs;
USE paimon_db;
SELECT * FROM paimon_table;
@@ -89,11 +89,11 @@ See [Apache Doris Website](https://doris.apache.org/docs/lakehouse/datalake-anal
- Read optimized for Primary Key Table
- Doris can utilize the [Read optimized](https://paimon.apache.org/releases/release-0.6/#read-optimized) feature for Primary Key Table(release in Paimon 0.6), by reading base data files using native Parquet/ORC reader and delta file using JNI.
+ Doris can utilize the [Read optimized](https://paimon.apache.org/docs/0.8/primary-key-table/read-optimized/) feature for Primary Key Table(release in Paimon 0.6), by reading base data files using native Parquet/ORC reader and delta file using JNI.
- Deletion Vectors
- Doris(2.1.4+) natively supports [Deletion Vectors](https://paimon.apache.org/releases/release-0.8/#deletion-vectors)(released in Paimon 0.8).
+ Doris(2.1.4+) natively supports [Deletion Vectors](https://paimon.apache.org/docs/0.8/primary-key-table/deletion-vectors/)(released in Paimon 0.8).
## Doris to Paimon type mapping
diff --git a/docs/content/engines/presto.md b/docs/content/engines/presto.md
deleted file mode 100644
index c336226bcf0a..000000000000
--- a/docs/content/engines/presto.md
+++ /dev/null
@@ -1,321 +0,0 @@
----
-title: "Presto"
-weight: 6
-type: docs
-aliases:
-- /engines/presto.html
----
-
-
-# Presto
-
-This documentation is a guide for using Paimon in Presto.
-
-## Version
-
-Paimon currently supports Presto 0.236 and above.
-
-## Preparing Paimon Jar File
-
-{{< stable >}}
-
-Download from master:
-https://paimon.apache.org/docs/master/project/download/
-
-{{< /stable >}}
-
-{{< unstable >}}
-
-| Version | Jar |
-|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
-| [0.236, 0.268) | [paimon-presto-0.236-{{< version >}}-plugin.tar.gz](https://repository.apache.org/snapshots/org/apache/paimon/paimon-presto-0.236/{{< version >}}/) |
-| [0.268, 0.273) | [paimon-presto-0.268-{{< version >}}-plugin.tar.gz](https://repository.apache.org/snapshots/org/apache/paimon/paimon-presto-0.268/{{< version >}}/) |
-| [0.273, latest] | [paimon-presto-0.273-{{< version >}}-plugin.tar.gz](https://repository.apache.org/snapshots/org/apache/paimon/paimon-presto-0.273/{{< version >}}/) |
-
-{{< /unstable >}}
-
-You can also manually build a bundled jar from the source code.
-
-To build from the source code, [clone the git repository]({{< presto_github_repo >}}).
-
-Build presto connector plugin with the following command.
-
-```
-mvn clean install -DskipTests
-```
-
-After the packaging is complete, you can choose the corresponding connector based on your own Presto version:
-
-| Version | Package |
-|-----------------|----------------------------------------------------------------------------------|
-| [0.236, 0.268) | `./paimon-presto-0.236/target/paimon-presto-0.236-{{< version >}}-plugin.tar.gz` |
-| [0.268, 0.273) | `./paimon-presto-0.268/target/paimon-presto-0.268-{{< version >}}-plugin.tar.gz` |
-| [0.273, latest] | `./paimon-presto-0.273/target/paimon-presto-0.273-{{< version >}}-plugin.tar.gz` |
-
-Of course, we also support different versions of Hive and Hadoop. But note that we utilize
-Presto-shaded versions of Hive and Hadoop packages to address dependency conflicts.
-You can check the following two links to select the appropriate versions of Hive and Hadoop:
-
-[hadoop-apache2](https://mvnrepository.com/artifact/com.facebook.presto.hadoop/hadoop-apache2)
-
-[hive-apache](https://mvnrepository.com/artifact/com.facebook.presto.hive/hive-apache)
-
-Both Hive 2 and 3, as well as Hadoop 2 and 3, are supported.
-
-For example, if your presto version is 0.274, hive and hadoop version is 2.x, you could run:
-
-```bash
-mvn clean install -DskipTests -am -pl paimon-presto-0.273 -Dpresto.version=0.274 -Dhadoop.apache2.version=2.7.4-9 -Dhive.apache.version=1.2.2-2
-```
-
-## Tmp Dir
-
-Paimon will unzip some jars to the tmp directory for codegen. By default, Presto will use `'/tmp'` as the temporary
-directory, but `'/tmp'` may be periodically deleted.
-
-You can configure this environment variable when Presto starts:
-```shell
--Djava.io.tmpdir=/path/to/other/tmpdir
-```
-
-Let Paimon use a secure temporary directory.
-
-## Configure Paimon Catalog
-
-### Install Paimon Connector
-
-```bash
-tar -zxf paimon-presto-${PRESTO_VERSION}/target/paimon-presto-${PRESTO_VERSION}-${PAIMON_VERSION}-plugin.tar.gz -C ${PRESTO_HOME}/plugin
-```
-
-Note that, the variable `PRESTO_VERSION` is module name, must be one of 0.236, 0.268, 0.273.
-
-### Configuration
-
-```bash
-cd ${PRESTO_HOME}
-mkdir -p etc/catalog
-```
-
-```properties
-connector.name=paimon
-# set your filesystem path, such as hdfs://namenode01:8020/path and s3://${YOUR_S3_BUCKET}/path
-warehouse=${YOUR_FS_PATH}
-```
-
-If you are using HDFS FileSystem, you will also need to do one more thing: choose one of the following ways to configure your HDFS:
-
-- set environment variable HADOOP_HOME.
-- set environment variable HADOOP_CONF_DIR.
-- configure `hadoop-conf-dir` in the properties.
-
-If you are using S3 FileSystem, you need to add `paimon-s3-${PAIMON_VERSION}.jar` in `${PRESTO_HOME}/plugin/paimon` and additionally configure the following properties in `paimon.properties`:
-
-```properties
-s3.endpoint=${YOUR_ENDPOINTS}
-s3.access-key=${YOUR_AK}
-s3.secret-key=${YOUR_SK}
-```
-
-**Query HiveCatalog table:**
-
-```bash
-vim etc/catalog/paimon.properties
-```
-
-and set the following config:
-
-```properties
-connector.name=paimon
-# set your filesystem path, such as hdfs://namenode01:8020/path and s3://${YOUR_S3_BUCKET}/path
-warehouse=${YOUR_FS_PATH}
-metastore=hive
-uri=thrift://${YOUR_HIVE_METASTORE}:9083
-```
-
-## Kerberos
-
-You can configure kerberos keytab file when using KERBEROS authentication in the properties.
-
-```
-security.kerberos.login.principal=hadoop-user
-security.kerberos.login.keytab=/etc/presto/hdfs.keytab
-```
-
-Keytab files must be distributed to every node in the cluster that runs Presto.
-
-## Create Schema
-
-```
-CREATE SCHEMA paimon.test_db;
-```
-
-## Create Table
-
-```
-CREATE TABLE paimon.test_db.orders (
- order_key bigint,
- order_status varchar,
- total_price decimal(18,4),
- order_date date
-)
-WITH (
- file_format = 'ORC',
- primary_key = ARRAY['order_key','order_date'],
- partitioned_by = ARRAY['order_date'],
- bucket = '2',
- bucket_key = 'order_key',
- changelog_producer = 'input'
-)
-```
-
-## Add Column
-
-```
-CREATE TABLE paimon.test_db.orders (
- order_key bigint,
- orders_tatus varchar,
- total_price decimal(18,4),
- order_date date
-)
-WITH (
- file_format = 'ORC',
- primary_key = ARRAY['order_key','order_date'],
- partitioned_by = ARRAY['order_date'],
- bucket = '2',
- bucket_key = 'order_key',
- changelog_producer = 'input'
-)
-
-ALTER TABLE paimon.test_db.orders ADD COLUMN "shipping_address varchar;
-```
-
-## Query
-
-```
-SELECT * FROM paimon.default.MyTable
-```
-
-## Presto to Paimon type mapping
-
-This section lists all supported type conversion between Presto and Paimon.
-All Presto's data types are available in package ` com.facebook.presto.common.type`.
-
-
-
-
-
Presto Data Type
-
Paimon Data Type
-
Atomic Type
-
-
-
-
-
RowType
-
RowType
-
false
-
-
-
MapType
-
MapType
-
false
-
-
-
ArrayType
-
ArrayType
-
false
-
-
-
BooleanType
-
BooleanType
-
true
-
-
-
TinyintType
-
TinyIntType
-
true
-
-
-
SmallintType
-
SmallIntType
-
true
-
-
-
IntegerType
-
IntType
-
true
-
-
-
BigintType
-
BigIntType
-
true
-
-
-
RealType
-
FloatType
-
true
-
-
-
DoubleType
-
DoubleType
-
true
-
-
-
CharType(length)
-
CharType(length)
-
true
-
-
-
VarCharType(VarCharType.MAX_LENGTH)
-
VarCharType(VarCharType.MAX_LENGTH)
-
true
-
-
-
VarCharType(length)
-
VarCharType(length), length is less than VarCharType.MAX_LENGTH
-
true
-
-
-
DateType
-
DateType
-
true
-
-
-
TimestampType
-
TimestampType
-
true
-
-
-
DecimalType(precision, scale)
-
DecimalType(precision, scale)
-
true
-
-
-
VarBinaryType(length)
-
VarBinaryType(length)
-
true
-
-
-
TimestampWithTimeZoneType
-
LocalZonedTimestampType
-
true
-
-
-
diff --git a/docs/content/engines/starrocks.md b/docs/content/engines/starrocks.md
index 1ab821a9a103..dda22d35f76a 100644
--- a/docs/content/engines/starrocks.md
+++ b/docs/content/engines/starrocks.md
@@ -81,7 +81,7 @@ SELECT * FROM paimon_catalog.test_db.partition_tbl$partitions;
## StarRocks to Paimon type mapping
This section lists all supported type conversion between StarRocks and Paimon.
-All StarRocks’s data types can be found in this doc [StarRocks Data type overview](https://docs.starrocks.io/docs/sql-reference/data-types/data-type-list/).
+All StarRocks’s data types can be found in this doc [StarRocks Data type overview](https://docs.starrocks.io/docs/sql-reference/data-types/).
diff --git a/docs/content/engines/trino.md b/docs/content/engines/trino.md
index 0f0fe8b94bf9..bef10f9d2870 100644
--- a/docs/content/engines/trino.md
+++ b/docs/content/engines/trino.md
@@ -30,36 +30,22 @@ This documentation is a guide for using Paimon in Trino.
## Version
-Paimon currently supports Trino 420 and above.
+Paimon currently supports Trino 440.
## Filesystem
-From version 0.8, paimon share trino filesystem for all actions, which means, you should
-config trino filesystem before using trino-paimon. You can find information about how to config
-filesystems for trino on trino official website.
+From version 0.8, Paimon share Trino filesystem for all actions, which means, you should
+config Trino filesystem before using trino-paimon. You can find information about how to config
+filesystems for Trino on Trino official website.
## Preparing Paimon Jar File
-{{< stable >}}
-
-Download from master:
-https://paimon.apache.org/docs/master/project/download/
-
-{{< /stable >}}
-
-{{< unstable >}}
-
-| Version | Package |
-|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------|
-| [420, 426] | [paimon-trino-420-{{< version >}}-plugin.tar.gz](https://repository.apache.org/snapshots/org/apache/paimon/paimon-trino-420/{{< version >}}/) |
-| [427, latest] | [paimon-trino-427-{{< version >}}-plugin.tar.gz](https://repository.apache.org/snapshots/org/apache/paimon/paimon-trino-427/{{< version >}}/) |
-
-{{< /unstable >}}
+[Download]({{< ref "project/download" >}})
You can also manually build a bundled jar from the source code. However, there are a few preliminary steps that need to be taken before compiling:
- To build from the source code, [clone the git repository]({{< trino_github_repo >}}).
-- Install JDK17 locally, and configure JDK17 as a global environment variable;
+- Install JDK21 locally, and configure JDK21 as a global environment variable;
Then,you can build bundled jar with the following command:
@@ -78,28 +64,17 @@ For example, if you want to use Hadoop 3.3.5-1, you can use the following comman
mvn clean install -DskipTests -Dhadoop.apache.version=3.3.5-1
```
-## Tmp Dir
-
-Paimon will unzip some jars to the tmp directory for codegen. By default, Trino will use `'/tmp'` as the temporary
-directory, but `'/tmp'` may be periodically deleted.
-
-You can configure this environment variable when Trino starts:
-```shell
--Djava.io.tmpdir=/path/to/other/tmpdir
-```
-
-Let Paimon use a secure temporary directory.
-
## Configure Paimon Catalog
### Install Paimon Connector
```bash
tar -zxf paimon-trino--{{< version >}}-plugin.tar.gz -C ${TRINO_HOME}/plugin
```
-the variable `trino-version` is module name, must be one of 420, 427.
-> NOTE: For JDK 17, when Deploying Trino, should add jvm options: `--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED`
+
+> NOTE: For JDK 21, when Deploying Trino, should add jvm options: `--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED`
### Configure
+
Catalogs are registered by creating a catalog properties file in the etc/catalog directory. For example, create etc/catalog/paimon.properties with the following contents to mount the paimon connector as the paimon catalog:
```
@@ -113,7 +88,7 @@ If you are using HDFS, choose one of the following ways to configure your HDFS:
- set environment variable HADOOP_CONF_DIR.
- configure `hadoop-conf-dir` in the properties.
-If you are using a hadoop filesystem, you can still use trino-hdfs and trino-hive to config it.
+If you are using a Hadoop filesystem, you can still use trino-hdfs and trino-hive to config it.
For example, if you use oss as a storage, you can write in `paimon.properties` according to [Trino Reference](https://trino.io/docs/current/connector/hive.html#hdfs-configuration):
```
@@ -186,9 +161,6 @@ SELECT * FROM paimon.test_db.orders
```
## Query with Time Traveling
-{{< tabs "time-travel-example" >}}
-
-{{< tab "version >=420" >}}
```sql
-- read the snapshot from specified timestamp
@@ -208,10 +180,15 @@ you have a tag named '1' based on snapshot 2, the statement `SELECT * FROM paimo
instead of snapshot 1.
{{< /hint >}}
-{{< /tab >}}
+## Insert
+```
+INSERT INTO paimon.test_db.orders VALUES (.....);
+```
-{{< /tabs >}}
+Supports:
+- primary key table with fixed bucket.
+- non-primary-key table with bucket -1.
## Trino to Paimon type mapping
@@ -319,3 +296,15 @@ All Trino's data types are available in package `io.trino.spi.type`.
+
+## Tmp Dir
+
+Paimon will unzip some jars to the tmp directory for codegen. By default, Trino will use `'/tmp'` as the temporary
+directory, but `'/tmp'` may be periodically deleted.
+
+You can configure this environment variable when Trino starts:
+```shell
+-Djava.io.tmpdir=/path/to/other/tmpdir
+```
+
+Let Paimon use a secure temporary directory.
diff --git a/docs/content/filesystems/hdfs.md b/docs/content/filesystems/hdfs.md
deleted file mode 100644
index ace26d1a84a0..000000000000
--- a/docs/content/filesystems/hdfs.md
+++ /dev/null
@@ -1,129 +0,0 @@
----
-title: "HDFS"
-weight: 2
-type: docs
-aliases:
-- /filesystems/hdfs.html
----
-
-
-# HDFS
-
-You don't need any additional dependencies to access HDFS because you have already taken care of the Hadoop dependencies.
-
-## HDFS Configuration
-
-For HDFS, the most important thing is to be able to read your HDFS configuration.
-
-{{< tabs "hdfs conf" >}}
-
-{{< tab "Flink/Trino/JavaAPI" >}}
-
-You may not have to do anything, if you are in a hadoop environment. Otherwise pick one of the following ways to
-configure your HDFS:
-
-1. Set environment variable `HADOOP_HOME` or `HADOOP_CONF_DIR`.
-2. Configure `'hadoop-conf-dir'` in the paimon catalog.
-3. Configure Hadoop options through prefix `'hadoop.'` in the paimon catalog.
-
-The first approach is recommended.
-
-If you do not want to include the value of the environment variable, you can configure `hadoop-conf-loader` to `option`.
-
-{{< /tab >}}
-
-{{< tab "Hive/Spark" >}}
-
-HDFS Configuration is available directly through the computation cluster, see cluster configuration of Hive and Spark for details.
-
-{{< /tab >}}
-
-{{< /tabs >}}
-
-## Hadoop-compatible file systems (HCFS)
-
-All Hadoop file systems are automatically available when the Hadoop libraries are on the classpath.
-
-This way, Paimon seamlessly supports all of Hadoop file systems implementing the `org.apache.hadoop.fs.FileSystem`
-interface, and all Hadoop-compatible file systems (HCFS).
-
-- HDFS
-- Alluxio (see configuration specifics below)
-- XtreemFS
-- …
-
-The Hadoop configuration has to have an entry for the required file system implementation in the `core-site.xml` file.
-
-For Alluxio support add the following entry into the core-site.xml file:
-
-```shell
-
- fs.alluxio.impl
- alluxio.hadoop.FileSystem
-
-```
-
-## Kerberos
-
-{{< tabs "Kerberos" >}}
-
-{{< tab "Flink" >}}
-
-It is recommended to use [Flink Kerberos Keytab](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/security/security-kerberos/).
-
-{{< /tab >}}
-
-{{< tab "Spark" >}}
-
-It is recommended to use [Spark Kerberos Keytab](https://spark.apache.org/docs/latest/security.html#using-a-keytab).
-
-{{< /tab >}}
-
-{{< tab "Hive" >}}
-
-An intuitive approach is to configure Hive's kerberos authentication.
-
-{{< /tab >}}
-
-{{< tab "Trino/JavaAPI" >}}
-
-Configure the following three options in your catalog configuration:
-
-- security.kerberos.login.keytab: Absolute path to a Kerberos keytab file that contains the user credentials.
- Please make sure it is copied to each machine.
-- security.kerberos.login.principal: Kerberos principal name associated with the keytab.
-- security.kerberos.login.use-ticket-cache: True or false, indicates whether to read from your Kerberos ticket cache.
-
-For JavaAPI:
-```
-SecurityContext.install(catalogOptions);
-```
-
-{{< /tab >}}
-
-{{< /tabs >}}
-
-## HDFS HA
-
-Ensure that `hdfs-site.xml` and `core-site.xml` contain the necessary [HA configuration](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html).
-
-## HDFS ViewFS
-
-Ensure that `hdfs-site.xml` and `core-site.xml` contain the necessary [ViewFs configuration](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/ViewFs.html).
diff --git a/docs/content/filesystems/oss.md b/docs/content/filesystems/oss.md
deleted file mode 100644
index b381350a5c9d..000000000000
--- a/docs/content/filesystems/oss.md
+++ /dev/null
@@ -1,120 +0,0 @@
----
-title: "OSS"
-weight: 3
-type: docs
-aliases:
-- /filesystems/oss.html
----
-
-
-# OSS
-
-{{< stable >}}
-
-Download [paimon-oss-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/{{< version >}}/paimon-oss-{{< version >}}.jar).
-
-{{< /stable >}}
-
-{{< unstable >}}
-
-Download [paimon-oss-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/{{< version >}}/).
-
-{{< /unstable >}}
-
-{{< tabs "oss" >}}
-
-{{< tab "Flink" >}}
-
-{{< hint info >}}
-If you have already configured [oss access through Flink](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/oss/) (Via Flink FileSystem),
-here you can skip the following configuration.
-{{< /hint >}}
-
-Put `paimon-oss-{{< version >}}.jar` into `lib` directory of your Flink home, and create catalog:
-
-```sql
-CREATE CATALOG my_catalog WITH (
- 'type' = 'paimon',
- 'warehouse' = 'oss:///',
- 'fs.oss.endpoint' = 'oss-cn-hangzhou.aliyuncs.com',
- 'fs.oss.accessKeyId' = 'xxx',
- 'fs.oss.accessKeySecret' = 'yyy'
-);
-```
-
-{{< /tab >}}
-
-{{< tab "Spark" >}}
-
-{{< hint info >}}
-If you have already configured oss access through Spark (Via Hadoop FileSystem), here you can skip the following configuration.
-{{< /hint >}}
-
-Place `paimon-oss-{{< version >}}.jar` together with `paimon-spark-{{< version >}}.jar` under Spark's jars directory, and start like
-
-```shell
-spark-sql \
- --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
- --conf spark.sql.catalog.paimon.warehouse=oss:/// \
- --conf spark.sql.catalog.paimon.fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com \
- --conf spark.sql.catalog.paimon.fs.oss.accessKeyId=xxx \
- --conf spark.sql.catalog.paimon.fs.oss.accessKeySecret=yyy
-```
-
-{{< /tab >}}
-
-{{< tab "Hive" >}}
-
-{{< hint info >}}
-If you have already configured oss access through Hive (Via Hadoop FileSystem), here you can skip the following configuration.
-{{< /hint >}}
-
-NOTE: You need to ensure that Hive metastore can access `oss`.
-
-Place `paimon-oss-{{< version >}}.jar` together with `paimon-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
-
-```sql
-SET paimon.fs.oss.endpoint=oss-cn-hangzhou.aliyuncs.com;
-SET paimon.fs.oss.accessKeyId=xxx;
-SET paimon.fs.oss.accessKeySecret=yyy;
-```
-
-And read table from hive metastore, table can be created by Flink or Spark, see [Catalog with Hive Metastore]({{< ref "flink/sql-ddl" >}})
-```sql
-SELECT * FROM test_table;
-SELECT COUNT(1) FROM test_table;
-```
-
-{{< /tab >}}
-
-{{< tab "Trino" >}}
-
-From version 0.8, paimon-trino uses trino filesystem as basic file read and write system. We strongly recommend you to use jindo-sdk in trino.
-
-You can find [How to config jindo sdk on trino](https://github.com/aliyun/alibabacloud-jindodata/blob/master/docs/user/4.x/4.6.x/4.6.12/oss/presto/jindosdk_on_presto.md) here.
-Please note that:
- * Use paimon to replace hive-hadoop2 when you decompress the plugin jar and find location to put in.
- * You can specify the `core-site.xml` in `paimon.properties` on configuration [hive.config.resources](https://trino.io/docs/current/connector/hive.html#hdfs-configuration).
- * Presto and Jindo use the same configuration method.
-
-
-{{< /tab >}}
-
-{{< /tabs >}}
diff --git a/docs/content/filesystems/overview.md b/docs/content/filesystems/overview.md
deleted file mode 100644
index 3de3c2ec500a..000000000000
--- a/docs/content/filesystems/overview.md
+++ /dev/null
@@ -1,60 +0,0 @@
----
-title: "Overview"
-weight: 1
-type: docs
-aliases:
-- /filesystems/overview.html
----
-
-
-# Overview
-
-Apache Paimon utilizes the same pluggable file systems as Apache Flink. Users can follow the
-[standard plugin mechanism](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/)
-to configure the plugin structure if using Flink as compute engine. However, for other engines like Spark
-or Hive, the provided opt jars (by Flink) may get conflicts and cannot be used directly. It is not convenient
-for users to fix class conflicts, thus Paimon provides the self-contained and engine-unified
-FileSystem pluggable jars for user to query tables from Spark/Hive side.
-
-## Supported FileSystems
-
-| FileSystem | URI Scheme | Pluggable | Description |
-|:------------------|:-----------|-----------|:-----------------------------------------------------------------------|
-| Local File System | file:// | N | Built-in Support |
-| HDFS | hdfs:// | N | Built-in Support, ensure that the cluster is in the hadoop environment |
-| Aliyun OSS | oss:// | Y | |
-| S3 | s3:// | Y | |
-
-## Dependency
-
-We recommend you to download the jar directly: [Download Link]({{< ref "project/download#filesystem-jars" >}}).
-
-You can also manually build bundled jar from the source code.
-
-To build from source code, [clone the git repository]({{< github_repo >}}).
-
-Build shaded jar with the following command.
-
-```bash
-mvn clean install -DskipTests
-```
-
-You can find the shaded jars under
-`./paimon-filesystems/paimon-${fs}/target/paimon-${fs}-{{< version >}}.jar`.
diff --git a/docs/content/filesystems/s3.md b/docs/content/filesystems/s3.md
deleted file mode 100644
index 3085d820b67e..000000000000
--- a/docs/content/filesystems/s3.md
+++ /dev/null
@@ -1,143 +0,0 @@
----
-title: "S3"
-weight: 4
-type: docs
-aliases:
-- /filesystems/s3.html
----
-
-
-# S3
-
-{{< stable >}}
-
-Download [paimon-s3-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/{{< version >}}/paimon-s3-{{< version >}}.jar).
-
-{{< /stable >}}
-
-{{< unstable >}}
-
-Download [paimon-s3-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-s3/{{< version >}}/).
-
-{{< /unstable >}}
-
-{{< tabs "oss" >}}
-
-{{< tab "Flink" >}}
-
-{{< hint info >}}
-If you have already configured [s3 access through Flink](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/s3/) (Via Flink FileSystem),
-here you can skip the following configuration.
-{{< /hint >}}
-
-Put `paimon-s3-{{< version >}}.jar` into `lib` directory of your Flink home, and create catalog:
-
-```sql
-CREATE CATALOG my_catalog WITH (
- 'type' = 'paimon',
- 'warehouse' = 's3:///',
- 's3.endpoint' = 'your-endpoint-hostname',
- 's3.access-key' = 'xxx',
- 's3.secret-key' = 'yyy'
-);
-```
-
-{{< /tab >}}
-
-{{< tab "Spark" >}}
-
-{{< hint info >}}
-If you have already configured s3 access through Spark (Via Hadoop FileSystem), here you can skip the following configuration.
-{{< /hint >}}
-
-Place `paimon-s3-{{< version >}}.jar` together with `paimon-spark-{{< version >}}.jar` under Spark's jars directory, and start like
-
-```shell
-spark-sql \
- --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
- --conf spark.sql.catalog.paimon.warehouse=s3:/// \
- --conf spark.sql.catalog.paimon.s3.endpoint=your-endpoint-hostname \
- --conf spark.sql.catalog.paimon.s3.access-key=xxx \
- --conf spark.sql.catalog.paimon.s3.secret-key=yyy
-```
-
-{{< /tab >}}
-
-{{< tab "Hive" >}}
-
-{{< hint info >}}
-If you have already configured s3 access through Hive ((Via Hadoop FileSystem)), here you can skip the following configuration.
-{{< /hint >}}
-
-NOTE: You need to ensure that Hive metastore can access `s3`.
-
-Place `paimon-s3-{{< version >}}.jar` together with `paimon-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like
-
-```sql
-SET paimon.s3.endpoint=your-endpoint-hostname;
-SET paimon.s3.access-key=xxx;
-SET paimon.s3.secret-key=yyy;
-```
-
-And read table from hive metastore, table can be created by Flink or Spark, see [Catalog with Hive Metastore]({{< ref "flink/sql-ddl" >}})
-```sql
-SELECT * FROM test_table;
-SELECT COUNT(1) FROM test_table;
-```
-
-{{< /tab >}}
-
-{{< tab "Trino" >}}
-
-Paimon use shared trino filesystem as basic read and write system.
-
-Please refer to [Trino S3](https://trino.io/docs/current/object-storage/file-system-s3.html) to config s3 filesystem in trino.
-
-{{< /tab >}}
-
-{{< /tabs >}}
-
-## S3 Complaint Object Stores
-
-The S3 Filesystem also support using S3 compliant object stores such as MinIO, Tencent's COS and IBM’s Cloud Object
-Storage. Just configure your endpoint to the provider of the object store service.
-
-```yaml
-s3.endpoint: your-endpoint-hostname
-```
-
-## Configure Path Style Access
-
-Some S3 compliant object stores might not have virtual host style addressing enabled by default, for example when using Standalone MinIO for testing purpose.
-In such cases, you will have to provide the property to enable path style access.
-
-```yaml
-s3.path.style.access: true
-```
-
-## S3A Performance
-
-[Tune Performance](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/performance.html) for `S3AFileSystem`.
-
-If you encounter the following exception:
-```shell
-Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool.
-```
-Try to configure this in catalog options: `fs.s3a.connection.maximum=1000`.
diff --git a/docs/content/flink/_index.md b/docs/content/flink/_index.md
index c39ff01d8760..6ec757fa520f 100644
--- a/docs/content/flink/_index.md
+++ b/docs/content/flink/_index.md
@@ -3,7 +3,7 @@ title: Engine Flink
icon:
bold: true
bookCollapseSection: true
-weight: 4
+weight: 5
---
+
+# Consumer ID
+
+Consumer id can help you accomplish the following two things:
+
+1. Safe consumption: When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in
+ the file system, and if there are consumers that still depend on this snapshot, then this snapshot will not be
+ deleted by expiration.
+2. Resume from breakpoint: When previous job is stopped, the newly started job can continue to consume from the previous
+ progress without resuming from the state.
+
+## Usage
+
+You can specify the `consumer-id` when streaming read table.
+
+The consumer will prevent expiration of the snapshot. In order to prevent too many snapshots caused by mistakes,
+you need to specify `'consumer.expiration-time'` to manage the lifetime of consumers.
+
+```sql
+ALTER TABLE t SET ('consumer.expiration-time' = '1 d');
+```
+
+Then, restart streaming write job of this table, expiration of consumers will be triggered in writing job.
+
+```sql
+SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.mode' = 'at-least-once') */;
+```
+
+## Ignore Progress
+
+Sometimes, you only want the feature of 'Safe Consumption'. You want to get a new snapshot progress when restarting the
+stream consumption job , you can enable the `'consumer.ignore-progress'` option.
+
+```sql
+SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.ignore-progress' = 'true') */;
+```
+
+The startup of this job will retrieve the snapshot that should be read again.
+
+## Consumer Mode
+
+By default, the consumption of snapshots is strictly aligned within the checkpoint to make 'Resume from breakpoint'
+feature exactly-once.
+
+But in some scenarios where you don't need 'Resume from breakpoint', or you don't need strict 'Resume from breakpoint',
+you can consider enabling `'consumer.mode' = 'at-least-once'` mode. This mode:
+1. Allow readers consume snapshots at different rates and record the slowest snapshot-id among all readers into the
+ consumer. It doesn't affect the checkpoint time and have good performance.
+2. This mode can provide more capabilities, such as watermark alignment.
+
+{{< hint >}}
+About `'consumer.mode'`, since the implementation of `exactly-once` mode and `at-least-once` mode are completely
+different, the state of flink is incompatible and cannot be restored from the state when switching modes.
+{{< /hint >}}
+
+## Reset Consumer
+
+You can reset or delete a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given
+consumer ID. First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer
+action job.
+
+Run the following command:
+
+{{< tabs "reset_consumer" >}}
+
+{{< tab "Flink SQL" >}}
+
+```sql
+CALL sys.reset_consumer(
+ `table` => 'database_name.table_name',
+ consumer_id => 'consumer_id',
+ next_snapshot_id =>
+);
+-- No next_snapshot_id if you want to delete the consumer
+```
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
+
+```bash
+/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ reset-consumer \
+ --warehouse \
+ --database \
+ --table \
+ --consumer_id \
+ [--next_snapshot ] \
+ [--catalog_conf [--catalog_conf ...]]
+
+## No next_snapshot if you want to delete the consumer
+```
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/docs/content/flink/expire-partition.md b/docs/content/flink/expire-partition.md
index 3acf6e59d58c..226017513fee 100644
--- a/docs/content/flink/expire-partition.md
+++ b/docs/content/flink/expire-partition.md
@@ -134,7 +134,7 @@ More options:
end-input.check-partition-expire
false
Boolean
-
Whether check partition expire after batch mode or bounded stream job finish.
+
Whether check partition expire after batch mode or bounded stream job finish.
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index ce8c8043ae40..8eb1786a08b3 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -67,14 +67,17 @@ All available procedures are listed below.
order_by => 'order_by',
options => 'options',
`where` => 'where',
- partition_idle_time => 'partition_idle_time')
To compact a table. Arguments:
@@ -85,6 +88,7 @@ All available procedures are listed below.
options(optional): additional dynamic options of the table.
where(optional): partition predicate(Can't be used together with "partitions"). Note: as where is a keyword,a pair of backticks need to add around like `where`.
partition_idle_time(optional): this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted. This argument can not be used with order compact.
+
compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
-- use partition filter
@@ -103,7 +107,8 @@ All available procedures are listed below.
including_tables => 'includingTables',
excluding_tables => 'excludingTables',
table_options => 'tableOptions',
- partition_idle_time => 'partitionIdleTime')
To compact databases. Arguments:
@@ -123,6 +129,7 @@ All available procedures are listed below.
excludingTables: to specify tables that are not compacted. You can use regular expression.
tableOptions: additional dynamic options of the table.
partition_idle_time: this is used to do a full compaction for partition which had not received any new data for 'partition_idle_time'. And only these partitions will be compacted.
+
compact_strategy(optional): this determines how to pick files to be merged, the default is determined by the runtime execution mode. 'full' strategy only supports batch mode. All files will be selected for merging. 'minor' strategy: Pick the set of files that need to be merged based on specified conditions.
@@ -221,6 +229,46 @@ All available procedures are listed below.
CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
+
+
replace_tag
+
+ -- Use named argument
+ -- replace tag with new time retained
+ CALL [catalog.]sys.replace_tag(`table` => 'identifier', tag => 'tagName', time_retained => 'timeRetained')
+ -- replace tag with new snapshot id and time retained
+ CALL [catalog.]sys.replace_tag(`table` => 'identifier', snapshot_id => 'snapshotId')
+ -- Use indexed argument
+ -- replace tag with new snapshot id and time retained
+ CALL [catalog.]sys.replace_tag('identifier', 'tagName', 'snapshotId', 'timeRetained')
+
+
+ To replace an existing tag with new tag info. Arguments:
+
table: the target table identifier. Cannot be empty.
+
tag: name of the existed tag. Cannot be empty.
+
snapshot(Long): id of the snapshot which the tag is based on, it is optional.
+
time_retained: The maximum time retained for the existing tag, it is optional.
- To perform "MERGE INTO" syntax. See merge_into action for
+ To perform "MERGE INTO" syntax. See merge_into action for
details of arguments.
@@ -339,11 +390,76 @@ All available procedures are listed below.
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
+
+
rollback_to_timestamp
+
+ -- for Flink 1.18
+ -- rollback to the snapshot which earlier or equal than timestamp.
+ CALL sys.rollback_to_timestamp('identifier', timestamp)
+ -- for Flink 1.19 and later
+ -- rollback to the snapshot which earlier or equal than timestamp.
+ CALL sys.rollback_to_timestamp(`table` => 'default.T', `timestamp` => timestamp)
+
+
+ To rollback to the snapshot which earlier or equal than timestamp. Argument:
+
identifier: the target table identifier. Cannot be empty.
+
timestamp (Long): Roll back to the snapshot which earlier or equal than timestamp.
+
+
+ -- for Flink 1.18
+ CALL sys.rollback_to_timestamp('default.T', 10)
+ -- for Flink 1.19 and later
+ CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 1730292023000)
+
+
+
+
rollback_to_watermark
+
+ -- for Flink 1.18
+ -- rollback to the snapshot which earlier or equal than watermark.
+ CALL sys.rollback_to_watermark('identifier', watermark)
+ -- for Flink 1.19 and later
+ -- rollback to the snapshot which earlier or equal than watermark.
+ CALL sys.rollback_to_watermark(`table` => 'default.T', `watermark` => watermark)
+
+
+ To rollback to the snapshot which earlier or equal than watermark. Argument:
+
identifier: the target table identifier. Cannot be empty.
+
watermark (Long): Roll back to the snapshot which earlier or equal than watermark.
+
+
+ -- for Flink 1.18
+ CALL sys.rollback_to_watermark('default.T', 1730292023000)
+ -- for Flink 1.19 and later
+ CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 1730292023000)
+
+
+
+
purge_files
+
+ -- for Flink 1.18
+ -- clear table with purge files directly.
+ CALL sys.purge_files('identifier')
+ -- for Flink 1.19 and later
+ -- clear table with purge files directly.
+ CALL sys.purge_files(`table` => 'default.T')
+
+
+ To clear table with purge files directly. Argument:
+
identifier: the target table identifier. Cannot be empty.
+
+
+ -- for Flink 1.18
+ CALL sys.purge_files('default.T')
+ -- for Flink 1.19 and later
+ CALL sys.purge_files(`table` => 'default.T')
+
+
expire_snapshots
-- Use named argument
- CALL [catalog.]sys.reset_consumer(
+ CALL [catalog.]sys.expire_snapshots(
`table` => 'identifier',
retain_max => 'retain_max',
retain_min => 'retain_min',
diff --git a/docs/content/flink/quick-start.md b/docs/content/flink/quick-start.md
index 62559065ec9a..e50acfe484e1 100644
--- a/docs/content/flink/quick-start.md
+++ b/docs/content/flink/quick-start.md
@@ -269,11 +269,16 @@ SELECT * FROM ....;
## Setting dynamic options
When interacting with the Paimon table, table options can be tuned without changing the options in the catalog. Paimon will extract job-level dynamic options and take effect in the current session.
-The dynamic option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts.
+The dynamic table option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts.
+The dynamic global option's key format is `${config_key}`. Global options will take effect for all the tables. Table options will override global options if there are conflicts.
For example:
```sql
+-- set scan.timestamp-millis=1697018249001 for all tables
+SET 'scan.timestamp-millis' = '1697018249001';
+SELECT * FROM T;
+
-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;
@@ -281,4 +286,10 @@ SELECT * FROM T;
-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;
+
+-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T1
+-- set scan.timestamp-millis=1697018249001 for others tables
+SET 'paimon.mycatalog.default.T1.scan.timestamp-millis' = '1697018249000';
+SET 'scan.timestamp-millis' = '1697018249001';
+SELECT * FROM T1 JOIN T2 ON xxxx;
```
diff --git a/docs/content/flink/savepoint.md b/docs/content/flink/savepoint.md
index 16139f0b0fc8..b9d353c1de33 100644
--- a/docs/content/flink/savepoint.md
+++ b/docs/content/flink/savepoint.md
@@ -41,12 +41,12 @@ metadata left. This is very safe, so we recommend using this feature to stop and
## Tag with Savepoint
-In Flink, we may consume from kafka and then write to paimon. Since flink's checkpoint only retains a limited number,
+In Flink, we may consume from Kafka and then write to Paimon. Since Flink's checkpoint only retains a limited number,
we will trigger a savepoint at certain time (such as code upgrades, data updates, etc.) to ensure that the state can
be retained for a longer time, so that the job can be restored incrementally.
-Paimon's snapshot is similar to flink's checkpoint, and both will automatically expire, but the tag feature of paimon
-allows snapshots to be retained for a long time. Therefore, we can combine the two features of paimon's tag and flink's
+Paimon's snapshot is similar to Flink's checkpoint, and both will automatically expire, but the tag feature of Paimon
+allows snapshots to be retained for a long time. Therefore, we can combine the two features of Paimon's tag and Flink's
savepoint to achieve incremental recovery of job from the specified savepoint.
{{< hint warning >}}
@@ -64,17 +64,17 @@ You can set `sink.savepoint.auto-tag` to `true` to enable the feature of automat
**Step 2: Trigger savepoint.**
-You can refer to [flink savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/savepoints/#operations)
+You can refer to [Flink savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/savepoints/#operations)
to learn how to configure and trigger savepoint.
**Step 3: Choose the tag corresponding to the savepoint.**
The tag corresponding to the savepoint will be named in the form of `savepoint-${savepointID}`. You can refer to
-[Tags Table]({{< ref "maintenance/system-tables#tags-table" >}}) to query.
+[Tags Table]({{< ref "concepts/system-tables#tags-table" >}}) to query.
**Step 4: Rollback the paimon table.**
-[Rollback]({{< ref "maintenance/manage-tags#rollback-to-tag" >}}) the paimon table to the specified tag.
+[Rollback]({{< ref "maintenance/manage-tags#rollback-to-tag" >}}) the Paimon table to the specified tag.
**Step 5: Restart from the savepoint.**
diff --git a/docs/content/flink/sql-alter.md b/docs/content/flink/sql-alter.md
index fe96ec413796..877995cc631b 100644
--- a/docs/content/flink/sql-alter.md
+++ b/docs/content/flink/sql-alter.md
@@ -1,6 +1,6 @@
---
title: "SQL Alter"
-weight: 6
+weight: 7
type: docs
aliases:
- /flink/sql-alter.html
@@ -78,6 +78,10 @@ If you use object storage, such as S3 or OSS, please use this syntax carefully,
The following SQL adds two columns `c1` and `c2` to table `my_table`.
+{{< hint info >}}
+To add a column in a row type, see [Changing Column Type](#changing-column-type).
+{{< /hint >}}
+
```sql
ALTER TABLE my_table ADD (c1 INT, c2 STRING);
```
@@ -99,6 +103,10 @@ otherwise this operation may fail, throws an exception like `The following colum
ALTER TABLE my_table DROP (c1, c2);
```
+{{< hint info >}}
+To drop a column in a row type, see [Changing Column Type](#changing-column-type).
+{{< /hint >}}
+
## Dropping Partitions
The following SQL drops the partitions of the paimon table.
@@ -114,6 +122,21 @@ ALTER TABLE my_table DROP PARTITION (`id` = 1), PARTITION (`id` = 2);
```
+## Adding Partitions
+
+The following SQL adds the partitions of the paimon table.
+
+For flink sql, you can specify the partial columns of partition columns, and you can also specify multiple partition values at the same time, only with metastore configured metastore.partitioned-table=true.
+
+```sql
+ALTER TABLE my_table ADD PARTITION (`id` = 1);
+
+ALTER TABLE my_table ADD PARTITION (`id` = 1, `name` = 'paimon');
+
+ALTER TABLE my_table ADD PARTITION (`id` = 1), PARTITION (`id` = 2);
+
+```
+
## Changing Column Nullability
The following SQL changes nullability of column `coupon_info`.
@@ -170,6 +193,14 @@ The following SQL changes type of column `col_a` to `DOUBLE`.
ALTER TABLE my_table MODIFY col_a DOUBLE;
```
+Paimon also supports changing columns of row type, array type, and map type.
+
+```sql
+-- col_a previously has type ARRAY
-
-
lastSkippedByPartitionAndStats
-
Gauge
-
Skipped table files by partition filter and value / key stats information in the last scan.
-
-
-
lastSkippedByWholeBucketFilesFilter
-
Gauge
-
Skipped table files by bucket level value filter (only primary key table) in the last scan.
-
lastScanSkippedTableFiles
Gauge
@@ -181,6 +171,16 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
Gauge
Number of buckets written in the last commit.
+
+
lastCompactionInputFileSize
+
Gauge
+
Total size of the input files for the last compaction.
+
+
+
lastCompactionOutputFileSize
+
Gauge
+
Total size of the output files for the last compaction.
+
@@ -232,23 +232,53 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
maxLevel0FileCount
Gauge
-
The maximum number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time.
+
The maximum number of level 0 files currently handled by this task. This value will become larger if asynchronous compaction cannot be done in time.
avgLevel0FileCount
Gauge
-
The average number of level 0 files currently handled by this writer. This value will become larger if asynchronous compaction cannot be done in time.
+
The average number of level 0 files currently handled by this task. This value will become larger if asynchronous compaction cannot be done in time.
compactionThreadBusy
Gauge
-
The maximum business of compaction threads in this parallelism. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time).
+
The maximum business of compaction threads in this task. Currently, there is only one compaction thread in each parallelism, so value of business ranges from 0 (idle) to 100 (compaction running all the time).
avgCompactionTime
Gauge
The average runtime of compaction threads, calculated based on recorded compaction time data in milliseconds. The value represents the average duration of compaction operations. Higher values indicate longer average compaction times, which may suggest the need for performance optimization.
+
+
compactionCompletedCount
+
Counter
+
The total number of compactions that have completed.
+
+
+
compactionQueuedCount
+
Counter
+
The total number of compactions that are queued/running.
+
+
+
maxCompactionInputSize
+
Gauge
+
The maximum input file size for this task's compaction.
+
+
+
avgCompactionInputSize/td>
+
Gauge
+
The average input file size for this task's compaction.
+
+
+
maxCompactionOutputSize
+
Gauge
+
The maximum output file size for this task's compaction.
+
+
+
avgCompactionOutputSize
+
Gauge
+
The average output file size for this task's compaction.
+
diff --git a/docs/content/maintenance/write-performance.md b/docs/content/maintenance/write-performance.md
index 03e734874c05..ade2c3353e3c 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -1,6 +1,6 @@
---
title: "Write Performance"
-weight: 2
+weight: 3
type: docs
aliases:
- /maintenance/write-performance.html
diff --git a/docs/content/migration/iceberg-compatibility.md b/docs/content/migration/iceberg-compatibility.md
new file mode 100644
index 000000000000..b6fcaa282615
--- /dev/null
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -0,0 +1,535 @@
+---
+title: "Iceberg Compatibility"
+weight: 4
+type: docs
+aliases:
+- /migration/iceberg-compatibility.html
+---
+
+
+# Iceberg Compatibility
+
+Paimon supports generating Iceberg compatible metadata,
+so that Paimon tables can be consumed directly by Iceberg readers.
+
+Set the following table options, so that Paimon tables can generate Iceberg compatible metadata.
+
+
+
+
+
Option
+
Default
+
Type
+
Description
+
+
+
+
+
metadata.iceberg.storage
+
disabled
+
Enum
+
+ When set, produce Iceberg metadata after a snapshot is committed, so that Iceberg readers can read Paimon's raw data files.
+
+
disabled: Disable Iceberg compatibility support.
+
table-location: Store Iceberg metadata in each table's directory.
+
hadoop-catalog: Store Iceberg metadata in a separate directory. This directory can be specified as the warehouse directory of an Iceberg Hadoop catalog.
+
hive-catalog: Not only store Iceberg metadata like hadoop-catalog, but also create Iceberg external table in Hive.
+
+
+
+
+
+
+For most SQL users, we recommend setting `'metadata.iceberg.storage' = 'hadoop-catalog'`
+or `'metadata.iceberg.storage' = 'hive-catalog'`,
+so that all tables can be visited as an Iceberg warehouse.
+For Iceberg Java API users, you might consider setting `'metadata.iceberg.storage' = 'table-location'`,
+so you can visit each table with its table path.
+
+## Append Tables
+
+Let's walk through a simple example, where we query Paimon tables with Iceberg connectors in Flink and Spark.
+Before trying out this example, make sure that your compute engine already supports Iceberg.
+Please refer to Iceberg's document if you haven't set up Iceberg.
+* Flink: [Preparation when using Flink SQL Client](https://iceberg.apache.org/docs/latest/flink/#preparation-when-using-flink-sql-client)
+* Spark: [Using Iceberg in Spark 3](https://iceberg.apache.org/docs/latest/spark-getting-started/#using-iceberg-in-spark-3)
+
+Let's now create a Paimon append only table with Iceberg compatibility enabled and insert some data.
+
+{{< tabs "create-paimon-append-only-table" >}}
+
+{{< tab "Flink SQL" >}}
+```sql
+CREATE CATALOG paimon_catalog WITH (
+ 'type' = 'paimon',
+ 'warehouse' = ''
+);
+
+CREATE TABLE paimon_catalog.`default`.cities (
+ country STRING,
+ name STRING
+) WITH (
+ 'metadata.iceberg.storage' = 'hadoop-catalog'
+);
+
+INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg');
+```
+{{< /tab >}}
+
+{{< tab "Spark SQL" >}}
+Start `spark-sql` with the following command line.
+
+```bash
+spark-sql --jars \
+ --conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
+ --conf spark.sql.catalog.paimon_catalog.warehouse= \
+ --packages org.apache.iceberg:iceberg-spark-runtime- \
+ --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
+ --conf spark.sql.catalog.iceberg_catalog.type=hadoop \
+ --conf spark.sql.catalog.iceberg_catalog.warehouse=/iceberg \
+ --conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
+ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
+```
+
+Run the following Spark SQL to create Paimon table and insert data.
+
+```sql
+CREATE TABLE paimon_catalog.`default`.cities (
+ country STRING,
+ name STRING
+) TBLPROPERTIES (
+ 'metadata.iceberg.storage' = 'hadoop-catalog'
+);
+
+INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+Now let's query this Paimon table with Iceberg connector.
+
+{{< tabs "query-paimon-append-only-table" >}}
+
+{{< tab "Flink SQL" >}}
+```sql
+CREATE CATALOG iceberg_catalog WITH (
+ 'type' = 'iceberg',
+ 'catalog-type' = 'hadoop',
+ 'warehouse' = '/iceberg',
+ 'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly see the result
+);
+
+SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
+/*
++----+--------------------------------+--------------------------------+
+| op | country | name |
++----+--------------------------------+--------------------------------+
+| +I | germany | berlin |
+| +I | germany | hamburg |
++----+--------------------------------+--------------------------------+
+*/
+```
+{{< /tab >}}
+
+{{< tab "Spark SQL" >}}
+```sql
+SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
+/*
+germany berlin
+germany hamburg
+*/
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+Let's insert more data and query again.
+
+{{< tabs "query-paimon-append-only-table-again" >}}
+
+{{< tab "Flink SQL" >}}
+```sql
+INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich');
+
+SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
+/*
++----+--------------------------------+--------------------------------+
+| op | country | name |
++----+--------------------------------+--------------------------------+
+| +I | germany | munich |
+| +I | germany | berlin |
+| +I | germany | hamburg |
++----+--------------------------------+--------------------------------+
+*/
+```
+{{< /tab >}}
+
+{{< tab "Spark SQL" >}}
+```sql
+INSERT INTO paimon_catalog.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich');
+
+SELECT * FROM iceberg_catalog.`default`.cities WHERE country = 'germany';
+/*
+germany munich
+germany berlin
+germany hamburg
+*/
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Primary Key Tables
+
+{{< tabs "paimon-primary-key-table" >}}
+
+{{< tab "Flink SQL" >}}
+```sql
+CREATE CATALOG paimon_catalog WITH (
+ 'type' = 'paimon',
+ 'warehouse' = ''
+);
+
+CREATE TABLE paimon_catalog.`default`.orders (
+ order_id BIGINT,
+ status STRING,
+ payment DOUBLE,
+ PRIMARY KEY (order_id) NOT ENFORCED
+) WITH (
+ 'metadata.iceberg.storage' = 'hadoop-catalog',
+ 'compaction.optimization-interval' = '1ms' -- ATTENTION: this option is only for testing, see "timeliness" section below for more information
+);
+
+INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'SUBMITTED', CAST(NULL AS DOUBLE)), (2, 'COMPLETED', 200.0), (3, 'SUBMITTED', CAST(NULL AS DOUBLE));
+
+CREATE CATALOG iceberg_catalog WITH (
+ 'type' = 'iceberg',
+ 'catalog-type' = 'hadoop',
+ 'warehouse' = '/iceberg',
+ 'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly see the result
+);
+
+SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
+/*
++----+----------------------+--------------------------------+--------------------------------+
+| op | order_id | status | payment |
++----+----------------------+--------------------------------+--------------------------------+
+| +I | 2 | COMPLETED | 200.0 |
++----+----------------------+--------------------------------+--------------------------------+
+*/
+
+INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'COMPLETED', 100.0);
+
+SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
+/*
++----+----------------------+--------------------------------+--------------------------------+
+| op | order_id | status | payment |
++----+----------------------+--------------------------------+--------------------------------+
+| +I | 1 | COMPLETED | 100.0 |
+| +I | 2 | COMPLETED | 200.0 |
++----+----------------------+--------------------------------+--------------------------------+
+*/
+```
+{{< /tab >}}
+
+{{< tab "Spark SQL" >}}
+Start `spark-sql` with the following command line.
+
+```bash
+spark-sql --jars \
+ --conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
+ --conf spark.sql.catalog.paimon_catalog.warehouse= \
+ --packages org.apache.iceberg:iceberg-spark-runtime- \
+ --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
+ --conf spark.sql.catalog.iceberg_catalog.type=hadoop \
+ --conf spark.sql.catalog.iceberg_catalog.warehouse=/iceberg \
+ --conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
+ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
+```
+
+Run the following Spark SQL to create Paimon table, insert/update data, and query with Iceberg catalog.
+
+```sql
+CREATE TABLE paimon_catalog.`default`.orders (
+ order_id BIGINT,
+ status STRING,
+ payment DOUBLE
+) TBLPROPERTIES (
+ 'primary-key' = 'order_id',
+ 'metadata.iceberg.storage' = 'hadoop-catalog',
+ 'compaction.optimization-interval' = '1ms' -- ATTENTION: this option is only for testing, see "timeliness" section below for more information
+);
+
+INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'SUBMITTED', CAST(NULL AS DOUBLE)), (2, 'COMPLETED', 200.0), (3, 'SUBMITTED', CAST(NULL AS DOUBLE));
+
+SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
+/*
+2 COMPLETED 200.0
+*/
+
+INSERT INTO paimon_catalog.`default`.orders VALUES (1, 'COMPLETED', 100.0);
+
+SELECT * FROM iceberg_catalog.`default`.orders WHERE status = 'COMPLETED';
+/*
+2 COMPLETED 200.0
+1 COMPLETED 100.0
+*/
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+Paimon primary key tables organize data files as LSM trees, so data files must be merged in memory before querying.
+However, Iceberg readers are not able to merge data files, so they can only query data files on the highest level of LSM trees.
+Data files on the highest level are produced by the full compaction process.
+So **to conclude, for primary key tables, Iceberg readers can only query data after full compaction**.
+
+By default, there is no guarantee on how frequently Paimon will perform full compaction.
+You can configure the following table option, so that Paimon is forced to perform full compaction after several commits.
+
+
+
+
+
Option
+
Default
+
Type
+
Description
+
+
+
+
+
compaction.optimization-interval
+
(none)
+
Duration
+
Full compaction will be constantly triggered per time interval. First compaction after the job starts will always be full compaction.
+
+
+
full-compaction.delta-commits
+
(none)
+
Integer
+
Full compaction will be constantly triggered after delta commits. Only implemented in Flink.
+
+
+
+
+Note that full compaction is a resource-consuming process, so the value of this table option should not be too small.
+We recommend full compaction to be performed once or twice per hour.
+
+## Hive Catalog
+
+When creating Paimon table, set `'metadata.iceberg.storage' = 'hive-catalog'`.
+This option value not only store Iceberg metadata like hadoop-catalog, but also create Iceberg external table in Hive.
+This Paimon table can be accessed from Iceberg Hive catalog later.
+
+To provide information about Hive metastore,
+you also need to set some (or all) of the following table options when creating Paimon table.
+
+
+
+
+
Option
+
Default
+
Type
+
Description
+
+
+
+
+
metadata.iceberg.uri
+
+
String
+
Hive metastore uri for Iceberg Hive catalog.
+
+
+
metadata.iceberg.hive-conf-dir
+
+
String
+
hive-conf-dir for Iceberg Hive catalog.
+
+
+
metadata.iceberg.hadoop-conf-dir
+
+
String
+
hadoop-conf-dir for Iceberg Hive catalog.
+
+
+
metadata.iceberg.manifest-compression
+
snappy
+
String
+
Compression for Iceberg manifest files.
+
+
+
metadata.iceberg.manifest-legacy-version
+
false
+
Boolean
+
Should use the legacy manifest version to generate Iceberg's 1.4 manifest files.
+
+## AWS Glue Catalog
+
+You can use Hive Catalog to connect AWS Glue metastore, you can use set `'metadata.iceberg.hive-client-class'` to
+`'com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient'`.
+
+> **Note:** You can use this [repo](https://github.com/promotedai/aws-glue-data-catalog-client-for-apache-hive-metastore) to build the required jar, include it in your path and configure the AWSCatalogMetastoreClient.
+## AWS Athena
+
+AWS Athena may use old manifest reader to read Iceberg manifest by names, we should let Paimon producing legacy Iceberg
+manifest list file, you can enable: `'metadata.iceberg.manifest-legacy-version'`.
+
+## DuckDB
+
+Duckdb may rely on files placed in the `root/data` directory, while Paimon is usually placed directly in the `root`
+directory, so you can configure this parameter for the table to achieve compatibility:
+`'data-file.path-directory' = 'data'`.
+
+## Trino Iceberg
+
+In this example, we use Trino Iceberg connector to access Paimon table through Iceberg Hive catalog.
+Before trying out this example, make sure that you have configured Trino Iceberg connector.
+See [Trino's document](https://trino.io/docs/current/connector/iceberg.html#general-configuration) for more information.
+
+Let's first create a Paimon table with Iceberg compatibility enabled.
+
+{{< tabs "paimon-append-only-table-trino-1" >}}
+
+{{< tab "Flink SQL" >}}
+```sql
+CREATE CATALOG paimon_catalog WITH (
+ 'type' = 'paimon',
+ 'warehouse' = ''
+);
+
+CREATE TABLE paimon_catalog.`default`.animals (
+ kind STRING,
+ name STRING
+) WITH (
+ 'metadata.iceberg.storage' = 'hive-catalog',
+ 'metadata.iceberg.uri' = 'thrift://:'
+);
+
+INSERT INTO paimon_catalog.`default`.animals VALUES ('mammal', 'cat'), ('mammal', 'dog'), ('reptile', 'snake'), ('reptile', 'lizard');
+```
+{{< /tab >}}
+
+{{< tab "Spark SQL" >}}
+Start `spark-sql` with the following command line.
+
+```bash
+spark-sql --jars \
+ --conf spark.sql.catalog.paimon_catalog=org.apache.paimon.spark.SparkCatalog \
+ --conf spark.sql.catalog.paimon_catalog.warehouse= \
+ --packages org.apache.iceberg:iceberg-spark-runtime- \
+ --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
+ --conf spark.sql.catalog.iceberg_catalog.type=hadoop \
+ --conf spark.sql.catalog.iceberg_catalog.warehouse=/iceberg \
+ --conf spark.sql.catalog.iceberg_catalog.cache-enabled=false \ # disable iceberg catalog caching to quickly see the result
+ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
+```
+
+Run the following Spark SQL to create Paimon table, insert/update data, and query with Iceberg catalog.
+
+```sql
+CREATE TABLE paimon_catalog.`default`.animals (
+ kind STRING,
+ name STRING
+) TBLPROPERTIES (
+ 'metadata.iceberg.storage' = 'hive-catalog',
+ 'metadata.iceberg.uri' = 'thrift://:'
+);
+
+INSERT INTO paimon_catalog.`default`.animals VALUES ('mammal', 'cat'), ('mammal', 'dog'), ('reptile', 'snake'), ('reptile', 'lizard');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+Start Trino using Iceberg catalog and query from Paimon table.
+
+```sql
+SELECT * FROM animals WHERE class = 'mammal';
+/*
+ kind | name
+--------+------
+ mammal | cat
+ mammal | dog
+*/
+```
+
+## Supported Types
+
+Paimon Iceberg compatibility currently supports the following data types.
+
+| Paimon Data Type | Iceberg Data Type |
+|------------------|-------------------|
+| `BOOLEAN` | `boolean` |
+| `INT` | `int` |
+| `BIGINT` | `long` |
+| `FLOAT` | `float` |
+| `DOUBLE` | `double` |
+| `DECIMAL` | `decimal` |
+| `CHAR` | `string` |
+| `VARCHAR` | `string` |
+| `BINARY` | `binary` |
+| `VARBINARY` | `binary` |
+| `DATE` | `date` |
+| `TIMESTAMP`* | `timestamp` |
+| `TIMESTAMP_LTZ`* | `timestamptz` |
+| `ARRAY` | `list` |
+| `MAP` | `map` |
+| `ROW` | `struct` |
+
+*: `TIMESTAMP` and `TIMESTAMP_LTZ` type only support precision from 4 to 6
+
+## Table Options
+
+
+
+
+
Option
+
Default
+
Type
+
Description
+
+
+
+
+
metadata.iceberg.compaction.min.file-num
+
10
+
Integer
+
Minimum number of Iceberg metadata files to trigger metadata compaction.
+
+
+
metadata.iceberg.compaction.max.file-num
+
50
+
Integer
+
If number of small Iceberg metadata files exceeds this limit, always trigger metadata compaction regardless of their total size.
+
+
+
diff --git a/docs/content/migration/upsert-to-partitioned.md b/docs/content/migration/upsert-to-partitioned.md
index b82bc942a626..d4fd00ae630b 100644
--- a/docs/content/migration/upsert-to-partitioned.md
+++ b/docs/content/migration/upsert-to-partitioned.md
@@ -26,6 +26,10 @@ under the License.
# Upsert To Partitioned
+{{< hint warning >}}
+__Note:__ Only Hive Engine can be used to query these upsert-to-partitioned tables.
+{{< /hint >}}
+
The [Tag Management]({{< ref "maintenance/manage-tags" >}}) will maintain the manifests and data files of the snapshot.
A typical usage is creating tags daily, then you can maintain the historical data of each day for batch reading.
diff --git a/docs/content/primary-key-table/changelog-producer.md b/docs/content/primary-key-table/changelog-producer.md
index bf7a23fae2a5..a9364ee9f07c 100644
--- a/docs/content/primary-key-table/changelog-producer.md
+++ b/docs/content/primary-key-table/changelog-producer.md
@@ -58,9 +58,11 @@ By specifying `'changelog-producer' = 'input'`, Paimon writers rely on their inp
## Lookup
-If your input can’t produce a complete changelog but you still want to get rid of the costly normalized operator, you may consider using the `'lookup'` changelog producer.
+If your input can’t produce a complete changelog but you still want to get rid of the costly normalized operator, you
+may consider using the `'lookup'` changelog producer.
-By specifying `'changelog-producer' = 'lookup'`, Paimon will generate changelog through `'lookup'` before committing the data writing.
+By specifying `'changelog-producer' = 'lookup'`, Paimon will generate changelog through `'lookup'` before committing
+the data writing (You can also enable [Async Compaction]({{< ref "primary-key-table/compaction#asynchronous-compaction" >}})).
{{< img src="/img/changelog-producer-lookup.png">}}
@@ -105,23 +107,37 @@ important for performance).
## Full Compaction
-If you think the resource consumption of 'lookup' is too large, you can consider using 'full-compaction' changelog producer,
-which can decouple data writing and changelog generation, and is more suitable for scenarios with high latency (For example, 10 minutes).
+You can also consider using 'full-compaction' changelog producer to generate changelog, and is more suitable for scenarios
+with large latency (For example, 30 minutes).
-By specifying `'changelog-producer' = 'full-compaction'`, Paimon will compare the results between full compactions and produce the differences as changelog. The latency of changelog is affected by the frequency of full compactions.
+1. By specifying `'changelog-producer' = 'full-compaction'`, Paimon will compare the results between full compactions and
+produce the differences as changelog. The latency of changelog is affected by the frequency of full compactions.
+2. By specifying `full-compaction.delta-commits` table property, full compaction will be constantly triggered after delta
+commits (checkpoints). This is set to 1 by default, so each checkpoint will have a full compression and generate a
+changelog.
-By specifying `full-compaction.delta-commits` table property, full compaction will be constantly triggered after delta commits (checkpoints). This is set to 1 by default, so each checkpoint will have a full compression and generate a change log.
+Generally speaking, the cost and consumption of full compaction are high, so we recommend using `'lookup'` changelog
+producer.
{{< img src="/img/changelog-producer-full-compaction.png">}}
{{< hint info >}}
-Full compaction changelog producer can produce complete changelog for any type of source. However it is not as efficient as the input changelog producer and the latency to produce changelog might be high.
+Full compaction changelog producer can produce complete changelog for any type of source. However it is not as
+efficient as the input changelog producer and the latency to produce changelog might be high.
{{< /hint >}}
Full-compaction changelog-producer supports `changelog-producer.row-deduplicate` to avoid generating -U, +U
changelog for the same record.
-(Note: Please increase `'execution.checkpointing.max-concurrent-checkpoints'` Flink configuration, this is very
-important for performance).
+## Changelog Merging
+
+For `input`, `lookup`, `full-compaction` 'changelog-producer'.
+
+If Flink's checkpoint interval is short (for example, 30 seconds) and the number of buckets is large, each snapshot may
+produce lots of small changelog files. Too many files may put a burden on the distributed storage cluster.
+
+In order to compact small changelog files into large ones, you can set the table option `changelog.precommit-compact = true`.
+Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer
+operator, which copies changelog files into large ones.
diff --git a/docs/content/primary-key-table/compaction.md b/docs/content/primary-key-table/compaction.md
index ada7e0289b35..bee8c16e46e9 100644
--- a/docs/content/primary-key-table/compaction.md
+++ b/docs/content/primary-key-table/compaction.md
@@ -76,7 +76,6 @@ In compaction, you can configure record-Level expire time to expire records, you
1. `'record-level.expire-time'`: time retain for records.
2. `'record-level.time-field'`: time field for record level expire.
-3. `'record-level.time-field-type'`: time field type for record level expire, it can be seconds-int,seconds-long or millis-long.
Expiration happens in compaction, and there is no strong guarantee to expire records in time.
diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md b/docs/content/primary-key-table/merge-engine/aggregation.md
index a9b9b5a38cb0..0cc6507f2b4c 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -3,7 +3,7 @@ title: "Aggregation"
weight: 3
type: docs
aliases:
-- /cdc-ingestion/merge-engin/aggregation.html
+- /primary-key-table/merge-engin/aggregation.html
---
+
+# Query Performance
+
+## Table Mode
+
+The table schema has the greatest impact on query performance. See [Table Mode]({{< ref "primary-key-table/table-mode" >}}).
+
+For Merge On Read table, the most important thing you should pay attention to is the number of buckets, which will limit
+the concurrency of reading data.
+
+For MOW (Deletion Vectors) or COW table or [Read Optimized]({{< ref "concepts/system-tables#read-optimized-table" >}}) table,
+There is no limit to the concurrency of reading data, and they can also utilize some filtering conditions for non-primary-key columns.
+
+## Data Skipping By Primary Key Filter
+
+For a regular bucketed table (For example, bucket = 5), the filtering conditions of the primary key will greatly
+accelerate queries and reduce the reading of a large number of files.
+
+## Data Skipping By File Index
+
+You can use file index to table with Deletion Vectors enabled, it filters files by index on the read side.
+
+```sql
+CREATE TABLE WITH (
+ 'deletion-vectors' = 'true',
+ 'file-index.bloom-filter.columns' = 'c1,c2',
+ 'file-index.bloom-filter.c1.items' = '200'
+);
+```
+
+Supported filter types:
+
+`Bloom Filter`:
+* `file-index.bloom-filter.columns`: specify the columns that need bloom filter index.
+* `file-index.bloom-filter..fpp` to config false positive probability.
+* `file-index.bloom-filter..items` to config the expected distinct items in one data file.
+
+`Bitmap`:
+* `file-index.bitmap.columns`: specify the columns that need bitmap index.
+
+`Bit-Slice Index Bitmap`
+* `file-index.bsi.columns`: specify the columns that need bsi index.
+
+More filter types will be supported...
+
+If you want to add file index to existing table, without any rewrite, you can use `rewrite_file_index` procedure. Before
+we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config
+`file-index..columns` to the table.
+
+How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}})
diff --git a/docs/content/primary-key-table/table-mode.md b/docs/content/primary-key-table/table-mode.md
index d7bc2efb9109..c8cce7c8bce5 100644
--- a/docs/content/primary-key-table/table-mode.md
+++ b/docs/content/primary-key-table/table-mode.md
@@ -110,7 +110,7 @@ If you don't want to use Deletion Vectors mode, you want to query fast enough in
older data, you can also:
1. Configure 'compaction.optimization-interval' when writing data.
-2. Query from [read-optimized system table]({{< ref "maintenance/system-tables#read-optimized-table" >}}). Reading from
+2. Query from [read-optimized system table]({{< ref "concepts/system-tables#read-optimized-table" >}}). Reading from
results of optimized files avoids merging records with the same key, thus improving reading performance.
You can flexibly balance query performance and data latency when reading.
diff --git a/docs/content/program-api/catalog-api.md b/docs/content/program-api/catalog-api.md
index d016cfa7b204..570577437d86 100644
--- a/docs/content/program-api/catalog-api.md
+++ b/docs/content/program-api/catalog-api.md
@@ -1,6 +1,6 @@
---
title: "Catalog API"
-weight: 4
+weight: 3
type: docs
aliases:
- /api/catalog-api.html
diff --git a/docs/content/program-api/flink-api.md b/docs/content/program-api/flink-api.md
index 7cf9d1932ce5..6ecac3909ced 100644
--- a/docs/content/program-api/flink-api.md
+++ b/docs/content/program-api/flink-api.md
@@ -26,12 +26,8 @@ under the License.
# Flink API
-{{< hint warning >}}
-We do not recommend using programming API. Paimon is designed for SQL first, unless you are a professional Flink developer, even if you do, it can be very difficult.
-
-We strongly recommend that you use Flink SQL or Spark SQL, or simply use SQL APIs in programs.
-
-The following documents are not detailed and are for reference only.
+{{< hint info >}}
+If possible, recommend using Flink SQL or Spark SQL, or simply use SQL APIs in programs.
{{< /hint >}}
## Dependency
diff --git a/docs/content/program-api/java-api.md b/docs/content/program-api/java-api.md
index 97f4ac5ae3dd..1bf0ce82f6b0 100644
--- a/docs/content/program-api/java-api.md
+++ b/docs/content/program-api/java-api.md
@@ -26,12 +26,8 @@ under the License.
# Java API
-{{< hint warning >}}
-We do not recommend using the Paimon API naked, unless you are a professional downstream ecosystem developer, and even if you do, there will be significant difficulties.
-
-If you are only using Paimon, we strongly recommend using computing engines such as Flink SQL or Spark SQL.
-
-The following documents are not detailed and are for reference only.
+{{< hint info >}}
+If possible, recommend using computing engines such as Flink SQL or Spark SQL.
{{< /hint >}}
## Dependency
diff --git a/docs/content/program-api/python-api.md b/docs/content/program-api/python-api.md
index fbaa6181124d..079170760b25 100644
--- a/docs/content/program-api/python-api.md
+++ b/docs/content/program-api/python-api.md
@@ -1,6 +1,6 @@
---
title: "Python API"
-weight: 3
+weight: 4
type: docs
aliases:
- /api/python-api.html
@@ -34,9 +34,9 @@ Java-based implementation will launch a JVM and use `py4j` to execute Java code
### SDK Installing
-SDK is published at [paimon-python](https://pypi.org/project/paimon-python/). You can install by
+SDK is published at [pypaimon](https://pypi.org/project/pypaimon/). You can install by
```shell
-pip install paimon-python
+pip install pypaimon
```
### Java Runtime Environment
@@ -67,7 +67,7 @@ classpath via one of the following ways:
```python
import os
-from paimon_python_java import constants
+from pypaimon.py4j import constants
os.environ[constants.PYPAIMON_JAVA_CLASSPATH] = '/path/to/jars/*'
```
@@ -81,7 +81,7 @@ You can set JVM args via one of the following ways:
```python
import os
-from paimon_python_java import constants
+from pypaimon.py4j import constants
os.environ[constants.PYPAIMON_JVM_ARGS] = 'arg1 arg2 ...'
```
@@ -98,7 +98,7 @@ Otherwise, you should set hadoop classpath via one of the following ways:
```python
import os
-from paimon_python_java import constants
+from pypaimon.py4j import constants
os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] = '/path/to/jars/*'
```
@@ -111,7 +111,7 @@ If you just want to test codes in local, we recommend to use [Flink Pre-bundled
Before coming into contact with the Table, you need to create a Catalog.
```python
-from paimon_python_java import Catalog
+from pypaimon.py4j import Catalog
# Note that keys and values are all string
catalog_options = {
@@ -121,6 +121,94 @@ catalog_options = {
catalog = Catalog.create(catalog_options)
```
+## Create Database & Table
+
+You can use the catalog to create table for writing data.
+
+### Create Database (optional)
+Table is located in a database. If you want to create table in a new database, you should create it.
+
+```python
+catalog.create_database(
+ name='database_name',
+ ignore_if_exists=True, # If you want to raise error if the database exists, set False
+ properties={'key': 'value'} # optional database properties
+)
+```
+
+### Create Schema
+
+Table schema contains fields definition, partition keys, primary keys, table options and comment.
+The field definition is described by `pyarrow.Schema`. All arguments except fields definition are optional.
+
+Generally, there are two ways to build `pyarrow.Schema`.
+
+First, you can use `pyarrow.schema` method directly, for example:
+
+```python
+import pyarrow as pa
+
+from pypaimon import Schema
+
+pa_schema = pa.schema([
+ ('dt', pa.string()),
+ ('hh', pa.string()),
+ ('pk', pa.int64()),
+ ('value', pa.string())
+])
+
+schema = Schema(
+ pa_schema=pa_schema,
+ partition_keys=['dt', 'hh'],
+ primary_keys=['dt', 'hh', 'pk'],
+ options={'bucket': '2'},
+ comment='my test table'
+)
+```
+
+See [Data Types]({{< ref "python-api#data-types" >}}) for all supported `pyarrow-to-paimon` data types mapping.
+
+Second, if you have some Pandas data, the `pa_schema` can be extracted from `DataFrame`:
+
+```python
+import pandas as pd
+import pyarrow as pa
+
+from pypaimon import Schema
+
+# Example DataFrame data
+data = {
+ 'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
+ 'hh': ['12', '15', '20'],
+ 'pk': [1, 2, 3],
+ 'value': ['a', 'b', 'c'],
+}
+dataframe = pd.DataFrame(data)
+
+# Get Paimon Schema
+record_batch = pa.RecordBatch.from_pandas(dataframe)
+schema = Schema(
+ pa_schema=record_batch.schema,
+ partition_keys=['dt', 'hh'],
+ primary_keys=['dt', 'hh', 'pk'],
+ options={'bucket': '2'},
+ comment='my test table'
+)
+```
+
+### Create Table
+
+After building table schema, you can create corresponding table:
+
+```python
+schema = ...
+catalog.create_table(
+ identifier='database_name.table_name',
+ schema=schema,
+ ignore_if_exists=True # If you want to raise error if the table exists, set False
+)
+```
+
## Get Table
The Table interface provides tools to read and write table.
@@ -131,35 +219,164 @@ table = catalog.get_table('database_name.table_name')
## Batch Read
-TableRead interface provides parallelly reading for multiple splits. You can set `'max-workers': 'N'` in `catalog_options`
-to set thread numbers when reading splits. `max-workers` is 1 by default, that means TableRead will read splits sequentially
+### Set Read Parallelism
+
+TableRead interface provides parallelly reading for multiple splits. You can set `'max-workers': 'N'` in `catalog_options`
+to set thread numbers for reading splits. `max-workers` is 1 by default, that means TableRead will read splits sequentially
if you doesn't set `max-workers`.
+### Get ReadBuilder and Perform pushdown
+
+A `ReadBuilder` is used to build reading utils and perform filter and projection pushdown.
+
```python
table = catalog.get_table('database_name.table_name')
-
-# 1. Create table scan and read
read_builder = table.new_read_builder()
+```
+
+You can use `PredicateBuilder` to build filters and pushdown them by `ReadBuilder`:
+
+```python
+# Example filter: ('f0' < 3 OR 'f1' > 6) AND 'f3' = 'A'
+
+predicate_builder = read_builder.new_predicate_builder()
+
+predicate1 = predicate_builder.less_than('f0', 3)
+predicate2 = predicate_builder.greater_than('f1', 6)
+predicate3 = predicate_builder.or_predicates([predicate1, predicate2])
+
+predicate4 = predicate_builder.equal('f3', 'A')
+predicate_5 = predicate_builder.and_predicates([predicate3, predicate4])
+
+read_builder = read_builder.with_filter(predicate_5)
+```
+
+See [Predicate]({{< ref "python-api#predicate" >}}) for all supported filters and building methods.
+
+You can also pushdown projection by `ReadBuilder`:
+
+```python
+# select f3 and f2 columns
+read_builder = read_builder.with_projection(['f3', 'f2'])
+```
+
+### Scan Plan
+
+Then you can step into Scan Plan stage to get `splits`:
+
+```python
table_scan = read_builder.new_scan()
-table_read = read_builder.new_read()
+splits = table_scan.splits()
+```
+
+### Read Splits
-# 2. Get splits
-splits = table_scan.plan().splits()
+Finally, you can read data from the `splits` to various data format.
-# 3. Read splits. Support 3 methods:
-# 3.1 Read as pandas.DataFrame
-dataframe = table_read.to_pandas(splits)
+#### Apache Arrow
-# 3.2 Read as pyarrow.Table
+This requires `pyarrow` to be installed.
+
+You can read all the data into a `pyarrow.Table`:
+
+```python
+table_read = read_builder.new_read()
pa_table = table_read.to_arrow(splits)
+print(pa_table)
+
+# pyarrow.Table
+# f0: int32
+# f1: string
+# ----
+# f0: [[1,2,3],[4,5,6],...]
+# f1: [["a","b","c"],["d","e","f"],...]
+```
+
+You can also read data into a `pyarrow.RecordBatchReader` and iterate record batches:
-# 3.3 Read as pyarrow.RecordBatchReader
-record_batch_reader = table_read.to_arrow_batch_reader(splits)
+```python
+table_read = read_builder.new_read()
+for batch in table_read.to_arrow_batch_reader(splits):
+ print(batch)
+
+# pyarrow.RecordBatch
+# f0: int32
+# f1: string
+# ----
+# f0: [1,2,3]
+# f1: ["a","b","c"]
+```
+
+#### Pandas
+
+This requires `pandas` to be installed.
+
+You can read all the data into a `pandas.DataFrame`:
+
+```python
+table_read = read_builder.new_read()
+df = table_read.to_pandas(splits)
+print(df)
+
+# f0 f1
+# 0 1 a
+# 1 2 b
+# 2 3 c
+# 3 4 d
+# ...
+```
+
+#### DuckDB
+
+This requires `duckdb` to be installed.
+
+You can convert the splits into an in-memory DuckDB table and query it:
+
+```python
+table_read = read_builder.new_read()
+duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
+
+print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
+# f0 f1
+# 0 1 a
+# 1 2 b
+# 2 3 c
+# 3 4 d
+# ...
+
+print(duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 = 1").fetchdf())
+# f0 f1
+# 0 1 a
+```
+
+#### Ray
+
+This requires `ray` to be installed.
+
+You can convert the splits into a Ray dataset and handle it by Ray API:
+
+```python
+table_read = read_builder.new_read()
+ray_dataset = table_read.to_ray(splits)
+
+print(ray_dataset)
+# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})
+
+print(ray_dataset.take(3))
+# [{'f0': 1, 'f1': 'a'}, {'f0': 2, 'f1': 'b'}, {'f0': 3, 'f1': 'c'}]
+
+print(ray_dataset.to_pandas())
+# f0 f1
+# 0 1 a
+# 1 2 b
+# 2 3 c
+# 3 4 d
+# ...
```
## Batch Write
-Paimon table write is Two-Phase Commit, you can write many times, but once committed, no more data can be write.
+Paimon table write is Two-Phase Commit, you can write many times, but once committed, no more data can be written.
{{< hint warning >}}
Currently, Python SDK doesn't support writing primary key table with `bucket=-1`.
@@ -170,12 +387,6 @@ table = catalog.get_table('database_name.table_name')
# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
-# By default, write data will be appended to table.
-# If you want to overwrite table:
-# write_builder.overwrite()
-# If you want to overwrite partition 'dt=2024-01-01':
-# write_builder.overwrite({'dt': '2024-01-01'})
-
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
@@ -199,7 +410,16 @@ table_commit.commit(commit_messages)
# 4. Close resources
table_write.close()
table_commit.close()
+```
+By default, the data will be appended to table. If you want to overwrite table, you should use `TableWrite#overwrite` API:
+
+```python
+# overwrite whole table
+write_builder.overwrite()
+
+# overwrite partition 'dt=2024-01-01'
+write_builder.overwrite({'dt': '2024-01-01'})
```
## Data Types
@@ -214,3 +434,25 @@ table_commit.close()
| pyarrow.float64() | DOUBLE |
| pyarrow.string() | STRING |
| pyarrow.boolean() | BOOLEAN |
+
+## Predicate
+
+| Predicate kind | Predicate method |
+|:----------------------|:----------------------------------------------|
+| p1 and p2 | PredicateBuilder.and_predicates([p1, p2]) |
+| p1 or p2 | PredicateBuilder.or_predicates([p1, p2]) |
+| f = literal | PredicateBuilder.equal(f, literal) |
+| f != literal | PredicateBuilder.not_equal(f, literal) |
+| f < literal | PredicateBuilder.less_than(f, literal) |
+| f <= literal | PredicateBuilder.less_or_equal(f, literal) |
+| f > literal | PredicateBuilder.greater_than(f, literal) |
+| f >= literal | PredicateBuilder.greater_or_equal(f, literal) |
+| f is null | PredicateBuilder.is_null(f) |
+| f is not null | PredicateBuilder.is_not_null(f) |
+| f.startswith(literal) | PredicateBuilder.startswith(f, literal) |
+| f.endswith(literal) | PredicateBuilder.endswith(f, literal) |
+| f.contains(literal) | PredicateBuilder.contains(f, literal) |
+| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
+| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
+| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
+
diff --git a/docs/content/project/download.md b/docs/content/project/download.md
index 5e49811076a6..23d0112b09a2 100644
--- a/docs/content/project/download.md
+++ b/docs/content/project/download.md
@@ -49,13 +49,8 @@ This documentation is a guide for downloading Paimon Jars.
| Hive 2.3 | [paimon-hive-connector-2.3-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-hive-connector-2.3/{{< version >}}/) |
| Hive 2.2 | [paimon-hive-connector-2.2-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-hive-connector-2.2/{{< version >}}/) |
| Hive 2.1 | [paimon-hive-connector-2.1-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-hive-connector-2.1/{{< version >}}/) |
-| Hive 2.1-cdh-6.3 | [paimon-hive-connector-2.1-cdh-6.3-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-hive-connector-2.1-cdh-6.3/{{< version >}}/) |
-| Presto 0.236 | [paimon-presto-0.236-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-presto-0.236/{{< version >}}/) |
-| Presto 0.268 | [paimon-presto-0.268-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-presto-0.268/{{< version >}}/) |
-| Presto 0.273 | [paimon-presto-0.273-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-presto-0.273/{{< version >}}/) |
-| Presto SQL 332 | [paimon-prestosql-332-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-prestosql-332/{{< version >}}/) |
-| Trino 420 | [paimon-trino-420-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-trino-420/{{< version >}}/) |
-| Trino 427 | [paimon-trino-427-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-trino-427/{{< version >}}/) |
+| Hive 2.1-cdh-6.3 | [paimon-hive-connector-2.1-cdh-6.3-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-hive-connector-2.1-cdh-6.3/{{< version >}}/) | |
+| Trino 440 | [paimon-trino-440-{{< version >}}-plugin.tar.gz](https://repository.apache.org/content/repositories/snapshots/org/apache/paimon/paimon-trino-440/{{< version >}}/) |
{{< /unstable >}}
@@ -79,7 +74,6 @@ This documentation is a guide for downloading Paimon Jars.
| Hive 2.2 | [paimon-hive-connector-2.2-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-2.2/{{< version >}}/paimon-hive-connector-2.2-{{< version >}}.jar) |
| Hive 2.1 | [paimon-hive-connector-2.1-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-2.1/{{< version >}}/paimon-hive-connector-2.1-{{< version >}}.jar) |
| Hive 2.1-cdh-6.3 | [paimon-hive-connector-2.1-cdh-6.3-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-2.1-cdh-6.3/{{< version >}}/paimon-hive-connector-2.1-cdh-6.3-{{< version >}}.jar) |
-| Presto | [Download from master](https://paimon.apache.org/docs/master/project/download/) |
| Trino | [Download from master](https://paimon.apache.org/docs/master/project/download/) |
{{< /stable >}}
diff --git a/docs/content/project/roadmap.md b/docs/content/project/roadmap.md
index 2f6b63af00a1..34628e28c80f 100644
--- a/docs/content/project/roadmap.md
+++ b/docs/content/project/roadmap.md
@@ -26,16 +26,6 @@ under the License.
# Roadmap
-## Native Format IO
-
-Integrate native Parquet & ORC reader & writer.
-
-## Deletion Vectors (Merge On Write)
-
-1. Primary Key Table Deletion Vectors Mode supports async compaction.
-2. Append Table supports DELETE & UPDATE with Deletion Vectors Mode. (Now only Spark SQL)
-3. Optimize lookup performance for HDD disk.
-
## Flink Lookup Join
Support Flink Custom Data Distribution Lookup Join to reach large-scale data lookup join.
@@ -44,51 +34,24 @@ Support Flink Custom Data Distribution Lookup Join to reach large-scale data loo
Introduce a mode to produce Iceberg snapshots.
-## Branch
-
-Branch production ready.
-
-## Changelog life cycle decouple
-
-Changelog life cycle decouple supports none changelog-producer.
-
-## Partition Mark Done
-
-Support partition mark done.
-
-## Default File Format
-
-- Default compression is ZSTD with level 1.
-- Parquet supports filter push down.
-- Parquet supports arrow with row type element.
-- Parquet becomes default file format.
-
## Variant Type
Support Variant Type with Spark 4.0 and Flink 2.0. Unlocking support for semi-structured data.
-## Bucketed Join
-
-Support Bucketed Join with Spark SQL to reduce shuffler in Join.
-
## File Index
Add more index:
-1. Bitmap
-2. Inverse
-## Column Family
+1. Inverse
+
+## Vector Compaction
-Support Column Family for super Wide Table.
+Support Vector Compaction for super Wide Table.
-## View & Function support
+## Function support
-Paimon Catalog supports views and functions.
+Paimon Catalog supports functions.
## Files Schema Evolution Ingestion
Introduce a files Ingestion with Schema Evolution.
-
-## Foreign Key Join
-
-Explore Foreign Key Join solution.
diff --git a/docs/content/spark/_index.md b/docs/content/spark/_index.md
index 24661e56f25a..07128574b0e7 100644
--- a/docs/content/spark/_index.md
+++ b/docs/content/spark/_index.md
@@ -3,7 +3,7 @@ title: Engine Spark
icon:
bold: true
bookCollapseSection: true
-weight: 5
+weight: 6
---
+*/}}
+
+
+
+
Key
+
Default
+
Type
+
Description
+
+
+
+
+
field-delimiter
+
","
+
String
+
Optional field delimiter character for CSV (',' by default).
Specify client cache key, multiple elements separated by commas.
"ugi": the Hadoop UserGroupInformation instance that represents the current user using the cache.
"user_name" similar to UGI but only includes the user's name determined by UserGroupInformation#getUserName.
"conf": name of an arbitrary configuration. The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a "conf:" prefix which is followed by the configuration name. E.g. specifying "conf:a.b.c" will add "a.b.c" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.
-
-
format-table.enabled
-
false
-
Boolean
-
Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations.
-
hadoop-conf-dir
(none)
@@ -71,5 +65,12 @@
you can set this option to true.
It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.
+
+
--table_mapping
+
The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix".
+
+
+
--table_prefix_db
+
The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".
+
--table_prefix
-
The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".
+
The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".
+
+
+
--table_suffix_db
+
The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".
--table_suffix
-
The suffix of all Paimon tables to be synchronized. The usage is same as "--table_prefix".
+
The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix".
--including_tables
diff --git a/docs/static/img/unaware-bucket-topo.png b/docs/static/img/unaware-bucket-topo.png
index 73bc862053fd..f530fc4a225c 100644
Binary files a/docs/static/img/unaware-bucket-topo.png and b/docs/static/img/unaware-bucket-topo.png differ
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index 0cf40ad9faae..b3925a0a769e 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -22,6 +22,7 @@
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -48,6 +49,7 @@
import java.io.OutputStream;
import java.time.Instant;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -56,6 +58,8 @@
/** Utilities for creating Arrow objects. */
public class ArrowUtils {
+ static final String PARQUET_FIELD_ID = "PARQUET:field_id";
+
public static VectorSchemaRoot createVectorSchemaRoot(
RowType rowType, BufferAllocator allocator) {
return createVectorSchemaRoot(rowType, allocator, true);
@@ -69,7 +73,9 @@ public static VectorSchemaRoot createVectorSchemaRoot(
f ->
toArrowField(
allowUpperCase ? f.name() : f.name().toLowerCase(),
- f.type()))
+ f.id(),
+ f.type(),
+ 0))
.collect(Collectors.toList());
return VectorSchemaRoot.create(new Schema(fields), allocator);
}
@@ -78,40 +84,105 @@ public static FieldVector createVector(
DataField dataField, BufferAllocator allocator, boolean allowUpperCase) {
return toArrowField(
allowUpperCase ? dataField.name() : dataField.name().toLowerCase(),
- dataField.type())
+ dataField.id(),
+ dataField.type(),
+ 0)
.createVector(allocator);
}
- public static Field toArrowField(String fieldName, DataType dataType) {
+ public static Field toArrowField(String fieldName, int fieldId, DataType dataType, int depth) {
FieldType fieldType = dataType.accept(ArrowFieldTypeConversion.ARROW_FIELD_TYPE_VISITOR);
+ fieldType =
+ new FieldType(
+ fieldType.isNullable(),
+ fieldType.getType(),
+ fieldType.getDictionary(),
+ Collections.singletonMap(PARQUET_FIELD_ID, String.valueOf(fieldId)));
List children = null;
if (dataType instanceof ArrayType) {
- children =
- Collections.singletonList(
- toArrowField(
- ListVector.DATA_VECTOR_NAME,
- ((ArrayType) dataType).getElementType()));
+ Field field =
+ toArrowField(
+ ListVector.DATA_VECTOR_NAME,
+ fieldId,
+ ((ArrayType) dataType).getElementType(),
+ depth + 1);
+ FieldType typeInner = field.getFieldType();
+ field =
+ new Field(
+ field.getName(),
+ new FieldType(
+ typeInner.isNullable(),
+ typeInner.getType(),
+ typeInner.getDictionary(),
+ Collections.singletonMap(
+ PARQUET_FIELD_ID,
+ String.valueOf(
+ SpecialFields.getArrayElementFieldId(
+ fieldId, depth + 1)))),
+ field.getChildren());
+ children = Collections.singletonList(field);
} else if (dataType instanceof MapType) {
MapType mapType = (MapType) dataType;
- children =
- Collections.singletonList(
- new Field(
- MapVector.DATA_VECTOR_NAME,
- // data vector, key vector and value vector CANNOT be null
- new FieldType(false, Types.MinorType.STRUCT.getType(), null),
- Arrays.asList(
- toArrowField(
- MapVector.KEY_NAME,
- mapType.getKeyType().notNull()),
- toArrowField(
- MapVector.VALUE_NAME,
- mapType.getValueType().notNull()))));
+
+ Field keyField =
+ toArrowField(
+ MapVector.KEY_NAME, fieldId, mapType.getKeyType().notNull(), depth + 1);
+ FieldType keyType = keyField.getFieldType();
+ keyField =
+ new Field(
+ keyField.getName(),
+ new FieldType(
+ keyType.isNullable(),
+ keyType.getType(),
+ keyType.getDictionary(),
+ Collections.singletonMap(
+ PARQUET_FIELD_ID,
+ String.valueOf(
+ SpecialFields.getMapKeyFieldId(
+ fieldId, depth + 1)))),
+ keyField.getChildren());
+
+ Field valueField =
+ toArrowField(
+ MapVector.VALUE_NAME,
+ fieldId,
+ mapType.getValueType().notNull(),
+ depth + 1);
+ FieldType valueType = valueField.getFieldType();
+ valueField =
+ new Field(
+ valueField.getName(),
+ new FieldType(
+ valueType.isNullable(),
+ valueType.getType(),
+ valueType.getDictionary(),
+ Collections.singletonMap(
+ PARQUET_FIELD_ID,
+ String.valueOf(
+ SpecialFields.getMapValueFieldId(
+ fieldId, depth + 1)))),
+ valueField.getChildren());
+
+ FieldType structType =
+ new FieldType(
+ false,
+ Types.MinorType.STRUCT.getType(),
+ null,
+ Collections.singletonMap(PARQUET_FIELD_ID, String.valueOf(fieldId)));
+ Field mapField =
+ new Field(
+ MapVector.DATA_VECTOR_NAME,
+ // data vector, key vector and value vector CANNOT be null
+ structType,
+ Arrays.asList(keyField, valueField));
+
+ children = Collections.singletonList(mapField);
} else if (dataType instanceof RowType) {
RowType rowType = (RowType) dataType;
- children =
- rowType.getFields().stream()
- .map(f -> toArrowField(f.name(), f.type()))
- .collect(Collectors.toList());
+ children = new ArrayList<>();
+ for (DataField field : rowType.getFields()) {
+ children.add(toArrowField(field.name(), field.id(), field.type(), 0));
+ }
}
return new Field(fieldName, fieldType, children);
}
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
index 9c1a55ec33ea..eef53009cec3 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
@@ -158,6 +158,7 @@ public ArrowFieldWriterFactory visit(MapType mapType) {
ArrowFieldWriterFactory valueWriterFactory = mapType.getValueType().accept(this);
return fieldVector -> {
MapVector mapVector = (MapVector) fieldVector;
+ mapVector.reAlloc();
List keyValueVectors = mapVector.getDataVector().getChildrenFromFields();
return new ArrowFieldWriters.MapWriter(
fieldVector,
diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
new file mode 100644
index 000000000000..319df13ba10b
--- /dev/null
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow;
+
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Random;
+
+/** Test for {@link ArrowUtils}. */
+public class ArrowUtilsTest {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testParquetFieldId() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.INT());
+ schemaBuilder.column("f2", DataTypes.SMALLINT());
+ schemaBuilder.column("f3", DataTypes.STRING());
+ schemaBuilder.column("f4", DataTypes.DOUBLE());
+ schemaBuilder.column("f5", DataTypes.STRING());
+ schemaBuilder.column("F6", DataTypes.STRING());
+ schemaBuilder.column("f7", DataTypes.BOOLEAN());
+ schemaBuilder.column("f8", DataTypes.DATE());
+ schemaBuilder.column("f10", DataTypes.TIMESTAMP(6));
+ schemaBuilder.column("f11", DataTypes.DECIMAL(7, 2));
+ schemaBuilder.column("f12", DataTypes.BYTES());
+ schemaBuilder.column("f13", DataTypes.FLOAT());
+ schemaBuilder.column("f14", DataTypes.BINARY(10));
+ schemaBuilder.column("f15", DataTypes.VARBINARY(10));
+ schemaBuilder.column(
+ "f16",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "f0", DataTypes.INT()),
+ DataTypes.FIELD(1, "f1", DataTypes.SMALLINT()),
+ DataTypes.FIELD(2, "f2", DataTypes.STRING()),
+ DataTypes.FIELD(3, "f3", DataTypes.DOUBLE()),
+ DataTypes.FIELD(4, "f4", DataTypes.BOOLEAN()),
+ DataTypes.FIELD(5, "f5", DataTypes.DATE()),
+ DataTypes.FIELD(6, "f6", DataTypes.TIMESTAMP(6)),
+ DataTypes.FIELD(7, "f7", DataTypes.DECIMAL(7, 2)),
+ DataTypes.FIELD(8, "f8", DataTypes.BYTES()),
+ DataTypes.FIELD(9, "f9", DataTypes.FLOAT()),
+ DataTypes.FIELD(10, "f10", DataTypes.BINARY(10)))));
+
+ RowType rowType = schemaBuilder.build().rowType();
+
+ List fields =
+ ArrowUtils.createVectorSchemaRoot(rowType, new RootAllocator())
+ .getSchema()
+ .getFields();
+
+ for (int i = 0; i < 16; i++) {
+ Assertions.assertThat(
+ Integer.parseInt(
+ fields.get(i).getMetadata().get(ArrowUtils.PARQUET_FIELD_ID)))
+ .isEqualTo(i);
+ }
+
+ fields = fields.get(15).getChildren().get(0).getChildren();
+ for (int i = 16; i < 26; i++) {
+ Assertions.assertThat(
+ Integer.parseInt(
+ fields.get(i - 16)
+ .getMetadata()
+ .get(ArrowUtils.PARQUET_FIELD_ID)))
+ .isEqualTo(i);
+ }
+ }
+}
diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
index c726283f0044..aef589d91242 100644
--- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
@@ -910,8 +910,9 @@ private boolean isVectorizedWithDv(RecordReader.RecordIterator iter
private Object[] randomRowValues(boolean[] nullable) {
Object[] values = new Object[18];
- values[0] = BinaryString.fromString(StringUtils.getRandomString(RND, 10, 10));
- values[1] = BinaryString.fromString(StringUtils.getRandomString(RND, 1, 20));
+ // The orc char reader will trim the string. See TreeReaderFactory.CharTreeReader
+ values[0] = BinaryString.fromString(StringUtils.getRandomString(RND, 9, 9) + "A");
+ values[1] = BinaryString.fromString(StringUtils.getRandomString(RND, 1, 19) + "A");
values[2] = RND.nextBoolean();
values[3] = randomBytes(10, 10);
values[4] = randomBytes(1, 20);
diff --git a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
index b07cdef8465e..8bfe4b6c9c03 100644
--- a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
+++ b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
@@ -77,7 +77,7 @@ public Result run() {
String sinkPathConfig =
BenchmarkGlobalConfiguration.loadConfiguration()
- .getString(BenchmarkOptions.SINK_PATH);
+ .get(BenchmarkOptions.SINK_PATH);
if (sinkPathConfig == null) {
throw new IllegalArgumentException(
BenchmarkOptions.SINK_PATH.key() + " must be set");
diff --git a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java
index e4a5cfa570c7..041637c2dd2f 100644
--- a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java
+++ b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java
@@ -18,7 +18,12 @@
package org.apache.paimon.benchmark.metric.cpu;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashMap;
diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java
new file mode 100644
index 000000000000..9a64322e0bde
--- /dev/null
+++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.benchmark.cache;
+
+import org.apache.paimon.benchmark.Benchmark;
+import org.apache.paimon.io.cache.Cache;
+import org.apache.paimon.io.cache.CacheKey;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.options.MemorySize;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Benchmark for measure the performance for cache. */
+public class CacheManagerBenchmark {
+ @TempDir Path tempDir;
+
+ @Test
+ public void testCache() throws Exception {
+ Benchmark benchmark =
+ new Benchmark("cache-benchmark", 100)
+ .setNumWarmupIters(1)
+ .setOutputPerIteration(true);
+ File file1 = new File(tempDir.toFile(), "cache-benchmark1");
+ assertThat(file1.createNewFile()).isTrue();
+ CacheKey key1 = CacheKey.forPageIndex(new RandomAccessFile(file1, "r"), 0, 0);
+
+ File file2 = new File(tempDir.toFile(), "cache-benchmark2");
+ assertThat(file2.createNewFile()).isTrue();
+ CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);
+
+ for (Cache.CacheType cacheType : Cache.CacheType.values()) {
+ CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1);
+ benchmark.addCase(
+ String.format("cache-%s", cacheType.toString()),
+ 5,
+ () -> {
+ try {
+ final int count = 10;
+ for (int i = 0; i < count; i++) {
+ cacheManager.getPage(
+ i < count / 2 ? key1 : key2,
+ key -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return new byte[6];
+ },
+ key -> {});
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ benchmark.run();
+ }
+}
diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java
index 430a207f5c36..653bfee6cc00 100644
--- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java
+++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java
@@ -27,11 +27,13 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.Pair;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@@ -47,6 +49,15 @@ abstract class AbstractLookupBenchmark {
new RowCompactedSerializer(RowType.of(new IntType()));
private final GenericRow reusedKey = new GenericRow(1);
+ protected static List> getCountBloomList() {
+ List> countBloomList = new ArrayList<>();
+ for (Integer recordCount : RECORD_COUNT_LIST) {
+ countBloomList.add(Arrays.asList(recordCount, false));
+ countBloomList.add(Arrays.asList(recordCount, true));
+ }
+ return countBloomList;
+ }
+
protected byte[][] generateSequenceInputs(int start, int end) {
int count = end - start;
byte[][] result = new byte[count][6];
@@ -74,7 +85,12 @@ protected byte[] intToByteArray(int value) {
}
protected Pair writeData(
- Path tempDir, CoreOptions options, byte[][] inputs, int valueLength, boolean sameValue)
+ Path tempDir,
+ CoreOptions options,
+ byte[][] inputs,
+ int valueLength,
+ boolean sameValue,
+ boolean bloomFilterEnabled)
throws IOException {
byte[] value1 = new byte[valueLength];
byte[] value2 = new byte[valueLength];
@@ -86,8 +102,11 @@ protected Pair writeData(
new CacheManager(MemorySize.ofMebiBytes(10)),
keySerializer.createSliceComparator());
- File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
- LookupStoreWriter writer = factory.createWriter(file, null);
+ String name =
+ String.format(
+ "%s-%s-%s", options.lookupLocalFileType(), valueLength, bloomFilterEnabled);
+ File file = new File(tempDir.toFile(), UUID.randomUUID() + "-" + name);
+ LookupStoreWriter writer = factory.createWriter(file, createBloomFiler(bloomFilterEnabled));
int i = 0;
for (byte[] input : inputs) {
if (sameValue) {
@@ -104,4 +123,11 @@ protected Pair writeData(
LookupStoreFactory.Context context = writer.close();
return Pair.of(file.getAbsolutePath(), context);
}
+
+ private BloomFilter.Builder createBloomFiler(boolean enabled) {
+ if (!enabled) {
+ return null;
+ }
+ return BloomFilter.builder(5000000, 0.01);
+ }
}
diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java
index 6327d703afe0..2d8de84327d4 100644
--- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java
+++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java
@@ -49,25 +49,38 @@
public class LookupReaderBenchmark extends AbstractLookupBenchmark {
private static final int QUERY_KEY_COUNT = 10000;
private final int recordCount;
+ private final boolean bloomFilterEnabled;
@TempDir Path tempDir;
- public LookupReaderBenchmark(int recordCount) {
- this.recordCount = recordCount;
+ public LookupReaderBenchmark(List