Skip to content

Commit

Permalink
Merge branch 'master' into dev/support_parallel_close
Browse files Browse the repository at this point in the history
  • Loading branch information
neuyilan committed Oct 16, 2024
2 parents 9239267 + 3825f43 commit 11108f0
Show file tree
Hide file tree
Showing 266 changed files with 7,481 additions and 1,375 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################

name: UTCase and ITCase Non Flink on JDK 11
name: UTCase and ITCase Others on JDK 11

on:
issue_comment:
Expand Down Expand Up @@ -52,6 +52,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -pl '!paimon-e2e-tests,!org.apache.paimon:paimon-hive-connector-3.1' -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone
test_modules="!paimon-e2e-tests,!org.apache.paimon:paimon-hive-connector-3.1,"
for suffix in 3.5 3.4 3.3 3.2 common; do
test_modules+="!org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 1C -B clean install -pl "${test_modules}" -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
63 changes: 63 additions & 0 deletions .github/workflows/utitcase-spark-scala2.13.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

name: UTCase and ITCase Spark on Scala 2.13

on:
push:
pull_request:
paths-ignore:
- 'docs/**'
- '**/*.md'

env:
JDK_VERSION: 8

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
cancel-in-progress: true

jobs:
build:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up JDK ${{ env.JDK_VERSION }}
uses: actions/setup-java@v2
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Spark
run: mvn -T 1C -B clean install -DskipTests -Pscala-2.13
- name: Test Spark
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
for suffix in common 3.5 3.4 3.3 3.2; do
test_modules+="org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 1C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pscala-2.13
env:
MAVEN_OPTS: -Xmx4096m
63 changes: 63 additions & 0 deletions .github/workflows/utitcase-spark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

name: UTCase and ITCase Spark

on:
push:
pull_request:
paths-ignore:
- 'docs/**'
- '**/*.md'

env:
JDK_VERSION: 8

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
cancel-in-progress: true

jobs:
build:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up JDK ${{ env.JDK_VERSION }}
uses: actions/setup-java@v2
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Spark
run: mvn -T 1C -B clean install -DskipTests
- name: Test Spark
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
for suffix in common 3.5 3.4 3.3 3.2; do
test_modules+="org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 1C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
9 changes: 7 additions & 2 deletions .github/workflows/utitcase.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################

name: UTCase and ITCase Non Flink
name: UTCase and ITCase Others

on:
push:
Expand Down Expand Up @@ -53,6 +53,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -pl '!paimon-e2e-tests' -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone
test_modules="!paimon-e2e-tests,"
for suffix in 3.5 3.4 3.3 3.2 common; do
test_modules+="!org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 1C -B clean install -pl "${test_modules}" -Pskip-paimon-flink-tests -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ target
.DS_Store
*.ipr
*.iws
.java-version
dependency-reduced-pom.xml
120 changes: 120 additions & 0 deletions docs/content/flink/consumer-id.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
---
title: "Consumer ID"
weight: 5
type: docs
aliases:
- /flink/consumer-id.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Consumer ID

Consumer id can help you accomplish the following two things:

1. Safe consumption: When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in
the file system, and if there are consumers that still depend on this snapshot, then this snapshot will not be
deleted by expiration.
2. Resume from breakpoint: When previous job is stopped, the newly started job can continue to consume from the previous
progress without resuming from the state.

## Usage

You can specify the `consumer-id` when streaming read table.

The consumer will prevent expiration of the snapshot. In order to prevent too many snapshots caused by mistakes,
you need to specify `'consumer.expiration-time'` to manage the lifetime of consumers.

```sql
ALTER TABLE t SET ('consumer.expiration-time' = '1 d');
```

Then, restart streaming write job of this table, expiration of consumers will be triggered in writing job.

```sql
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.mode' = 'at-least-once') */;
```

## Ignore Progress

Sometimes, you only want the feature of 'Safe Consumption'. You want to get a new snapshot progress when restarting the
stream consumption job , you can enable the `'consumer.ignore-progress'` option.

```sql
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.ignore-progress' = 'true') */;
```

The startup of this job will retrieve the snapshot that should be read again.

## Consumer Mode

By default, the consumption of snapshots is strictly aligned within the checkpoint to make 'Resume from breakpoint'
feature exactly-once.

But in some scenarios where you don't need 'Resume from breakpoint', or you don't need strict 'Resume from breakpoint',
you can consider enabling `'consumer.mode' = 'at-least-once'` mode. This mode:
1. Allow readers consume snapshots at different rates and record the slowest snapshot-id among all readers into the
consumer. It doesn't affect the checkpoint time and have good performance.
2. This mode can provide more capabilities, such as watermark alignment.

{{< hint >}}
About `'consumer.mode'`, since the implementation of `exactly-once` mode and `at-least-once` mode are completely
different, the state of flink is incompatible and cannot be restored from the state when switching modes.
{{< /hint >}}

## Rest Consumer

You can reset or delete a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given
consumer ID. First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer
action job.

Run the following command:

{{< tabs "reset_consumer" >}}

{{< tab "Flink SQL" >}}

```sql
CALL sys.reset_consumer(
`table` => 'database_name.table_name',
consumer_id => 'consumer_id',
next_snapshot_id => <snapshot_id>
);
-- No next_snapshot_id if you want to delete the consumer
```
{{< /tab >}}

{{< tab "Flink Action" >}}

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
reset-consumer \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
--consumer_id <consumer-id> \
[--next_snapshot <next-snapshot-id>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

## No next_snapshot if you want to delete the consumer
```
{{< /tab >}}
{{< /tabs >}}
21 changes: 19 additions & 2 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,20 @@ All available procedures are listed below.
CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
</td>
</tr>
<tr>
<td>expire_tags</td>
<td>
CALL [catalog.]sys.expire_tags('identifier', 'older_than')
</td>
<td>
To expire tags by time. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>older_than: tagCreateTime before which tags will be removed.</li>
</td>
<td>
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
</td>
</tr>
<tr>
<td>merge_into</td>
<td>
Expand All @@ -241,10 +255,13 @@ All available procedures are listed below.
matched_upsert_setting => 'matchedUpsertSetting',<br/>
not_matched_insert_condition => 'notMatchedInsertCondition',<br/>
not_matched_insert_values => 'notMatchedInsertValues',<br/>
matched_delete_condition => 'matchedDeleteCondition') <br/><br/>
matched_delete_condition => 'matchedDeleteCondition',<br/>
not_matched_by_source_upsert_condition => 'notMatchedBySourceUpsertCondition',<br/>
not_matched_by_source_upsert_setting => 'notMatchedBySourceUpsertSetting',<br/>
not_matched_by_source_delete_condition => 'notMatchedBySourceDeleteCondition') <br/><br/>
</td>
<td>
To perform "MERGE INTO" syntax. See <a href="/how-to/writing-tables#merging-into-table">merge_into action</a> for
To perform "MERGE INTO" syntax. See <a href="/flink/action-jars#merging-into-table">merge_into action</a> for
details of arguments.
</td>
<td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/flink/sql-alter.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "SQL Alter"
weight: 6
weight: 7
type: docs
aliases:
- /flink/sql-alter.html
Expand Down
3 changes: 3 additions & 0 deletions docs/content/flink/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ Paimon will automatically collect the statistics of the data file for speeding u
The statistics collector mode can be configured by `'metadata.stats-mode'`, by default is `'truncate(16)'`.
You can configure the field level by setting `'fields.{field_name}.stats-mode'`.

For the stats mode of `none`, we suggest that you configure `metadata.stats-dense-store` = `true`, which will
significantly reduce the storage size of the manifest.

### Field Default Value

Paimon table currently supports setting default values for fields in table properties by `'fields.item_id.default-value'`,
Expand Down
2 changes: 1 addition & 1 deletion docs/content/flink/sql-lookup.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "SQL Lookup"
weight: 5
weight: 6
type: docs
aliases:
- /flink/sql-lookup.html
Expand Down
Loading

0 comments on commit 11108f0

Please sign in to comment.