From 29babfb78d3ae17acc86414f1f2ec4a0a945f47d Mon Sep 17 00:00:00 2001 From: Michael Koepf <47541996+michaelkoepf@users.noreply.github.com> Date: Mon, 23 Dec 2024 18:26:41 +0100 Subject: [PATCH] [docs] Split up multi-command Flink SQL code listings into separate code listings - Split up Flink SQL code blocks - Harmonized code listing titles - Harmonized sections across deployment options Issue #244 --- website/docs/engine-flink/ddl.md | 28 ++++++---- website/docs/engine-flink/getting-started.md | 26 ++++++--- website/docs/engine-flink/lookups.md | 18 +++++-- website/docs/engine-flink/reads.md | 49 +++++++++++++---- website/docs/engine-flink/writes.md | 26 ++++++--- .../deploying-distributed-cluster.md | 8 +-- .../install-deploy/deploying-local-cluster.md | 8 +-- website/docs/maintenance/configuration.md | 6 +-- website/docs/quickstart/flink.md | 54 +++++++++---------- .../integrate-data-lakes/paimon.md | 6 ++- .../data-distribution/partitioning.md | 2 +- 11 files changed, 152 insertions(+), 79 deletions(-) diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md index 183d6943..3c18004b 100644 --- a/website/docs/engine-flink/ddl.md +++ b/website/docs/engine-flink/ddl.md @@ -7,12 +7,14 @@ sidebar_position: 2 ## Create Catalog Fluss supports creating and managing tables through the Fluss Catalog. -```sql +```sql title="Flink SQL" CREATE CATALOG fluss_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'fluss-server-1:9123' ); +``` +```sql title="Flink SQL" USE CATALOG fluss_catalog; ``` The following properties can be set if using the Fluss catalog: @@ -29,8 +31,11 @@ The following introduced statements assuming the the current catalog is switched By default, FlussCatalog will use the `fluss` database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default `fluss` database: -```sql +```sql title="Flink SQL" CREATE DATABASE my_db; +``` + +```sql title="Flink SQL" USE my_db; ``` @@ -38,9 +43,12 @@ USE my_db; To delete a database, this will drop all the tables in the database as well: -```sql +```sql title="Flink SQL" -- Flink doesn't allow drop current database, switch to Fluss default database USE fluss; +``` + +```sql title="Flink SQL" -- drop the database DROP DATABASE my_db; ``` @@ -50,7 +58,7 @@ DROP DATABASE my_db; ### PrimaryKey Table The following SQL statement will create a [PrimaryKey Table](table-design/table-types/pk-table.md) with a primary key consisting of shop_id and user_id. -```sql +```sql title="Flink SQL" CREATE TABLE my_pk_table ( shop_id BIGINT, user_id BIGINT, @@ -66,7 +74,7 @@ CREATE TABLE my_pk_table ( The following SQL statement creates a [Log Table](table-design/table-types/log-table.md) by not specifying primary key clause. -```sql +```sql title="Flink SQL" CREATE TABLE my_log_table ( order_id BIGINT, item_id BIGINT, @@ -86,7 +94,7 @@ Currently, Fluss only supports one partitioned field with `STRING` type. Currently, partitioned table must enable auto partition and set auto partition time unit. ::: -```sql +```sql title="Flink SQL" CREATE TABLE my_part_pk_table ( dt STRING, shop_id BIGINT, @@ -103,7 +111,7 @@ CREATE TABLE my_part_pk_table ( The following SQL statement creates a Partitioned Log Table in Fluss. -```sql +```sql title="Flink SQL" CREATE TABLE my_part_log_table ( order_id BIGINT, item_id BIGINT, @@ -131,7 +139,7 @@ The supported option in "with" parameters when creating a table are as follows: To create a table with the same schema, partitioning, and table properties as another table, use `CREATE TABLE LIKE`. -```sql +```sql title="Flink SQL" -- there is a temporary datagen table CREATE TEMPORARY TABLE datagen ( user_id BIGINT, @@ -143,7 +151,9 @@ CREATE TEMPORARY TABLE datagen ( 'connector' = 'datagen', 'rows-per-second' = '10' ); +``` +```sql title="Flink SQL" -- creates Fluss table which derives the metadata from the temporary table excluding options CREATE TABLE my_table LIKE datagen (EXCLUDING OPTIONS); ``` @@ -154,7 +164,7 @@ For more details, refer to the [Flink CREATE TABLE](https://nightlies.apache.org To delete a table, run: -```sql +```sql title="Flink SQL" DROP TABLE my_table; ``` diff --git a/website/docs/engine-flink/getting-started.md b/website/docs/engine-flink/getting-started.md index 13b63e8e..6ef50679 100644 --- a/website/docs/engine-flink/getting-started.md +++ b/website/docs/engine-flink/getting-started.md @@ -70,7 +70,7 @@ To quickly stop the cluster and all running components, you can use the provided ## Creating a Catalog You can use the following SQL statement to create a catalog. -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE CATALOG fluss_catalog WITH ( 'type'='fluss', 'bootstrap.servers' = 'localhost:9123' @@ -86,9 +86,11 @@ CREATE CATALOG fluss_catalog WITH ( ::: ## Creating a Table -```sql title="Flink SQL Client" +```sql title="Flink SQL" USE CATALOG `fluss_catalog`; +``` +```sql title="Flink SQL" CREATE TABLE pk_table ( shop_id BIGINT, user_id BIGINT, @@ -102,12 +104,17 @@ CREATE TABLE pk_table ( ## Data Writing To append new data to a table, you can use `INSERT INTO` in batch mode or streaming mode: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- Execute the flink job in batch mode for current session context SET 'execution.runtime-mode' = 'batch'; +``` + +```sql title="Flink SQL" -- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau'; +``` +```sql title="Flink SQL" INSERT INTO pk_table VALUES (1234, 1234, 1, 1), (12345, 12345, 2, 2), @@ -115,14 +122,14 @@ INSERT INTO pk_table VALUES ``` To update data record with the primary key (1234, 1234) in a Flink streaming job, use the UPDATE statement as follows: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- should run in batch mode UPDATE pk_table SET total_amount = 4 WHERE shop_id = 1234 and user_id = 1234; ``` To delete the data record with primary key `(12345, 12345)`, use `DELETE FROM`: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- should run in batch mode DELETE FROM pk_table WHERE shop_id = 12345 and user_id = 12345; ``` @@ -130,21 +137,24 @@ DELETE FROM pk_table WHERE shop_id = 12345 and user_id = 12345; ## Data Reading To retrieve data with the primary key `(1234, 1234)`, you can perform a point query by applying a filter on the primary key: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- should run in batch mode SELECT * FROM pk_table WHERE shop_id = 1234 and user_id = 1234; ``` To preview a subset of the data in a table, you can use a `LIMIT` clause. -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- should run in batch mode SELECT * FROM pk_table LIMIT 10; ``` Fluss supports processing incremental data reading in flink streaming jobs: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- Submit the flink job in streaming mode for current session. SET 'execution.runtime-mode' = 'streaming'; +``` + +```sql title="Flink SQL" -- reading changelogs from the primary-key table from beginning. SELECT * FROM pk_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */; ``` diff --git a/website/docs/engine-flink/lookups.md b/website/docs/engine-flink/lookups.md index 1d0b0ad0..4087b3b7 100644 --- a/website/docs/engine-flink/lookups.md +++ b/website/docs/engine-flink/lookups.md @@ -13,7 +13,7 @@ Flink lookup joins are important because they enable efficient, real-time enrich ## Examples 1. Create two tables. -```sql +```sql title="Flink SQL" CREATE TABLE `fluss_catalog`.`my_db`.`orders` ( `o_orderkey` INT NOT NULL, `o_custkey` INT NOT NULL, @@ -26,8 +26,9 @@ CREATE TABLE `fluss_catalog`.`my_db`.`orders` ( `o_comment` STRING NOT NULL, PRIMARY KEY (o_orderkey) NOT ENFORCED ); +``` - +```sql title="Flink SQL" CREATE TABLE `fluss_catalog`.`my_db`.`customer` ( `c_custkey` INT NOT NULL, `c_name` STRING NOT NULL, @@ -42,10 +43,15 @@ CREATE TABLE `fluss_catalog`.`my_db`.`customer` ( ``` 2. Perform lookup join. -```sql +```sql title="Flink SQL" USE CATALOG `fluss_catalog`; +``` + +```sql title="Flink SQL" USE my_db; +``` +```sql title="Flink SQL" CREATE TEMPORARY TABLE lookup_join_sink ( order_key INT NOT NULL, @@ -53,7 +59,9 @@ CREATE TEMPORARY TABLE lookup_join_sink customer_name STRING NOT NULL, customer_address STRING NOT NULL ) WITH ('connector' = 'blackhole'); +``` +```sql title="Flink SQL" -- look up join in asynchronous mode. INSERT INTO lookup_join_sink SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address` @@ -62,8 +70,9 @@ FROM LEFT JOIN `customer` FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON `o`.`o_custkey` = `c`.`c_custkey`; +``` - +```sql title="Flink SQL" -- look up join in synchronous mode. INSERT INTO lookup_join_sink SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address` @@ -72,7 +81,6 @@ FROM LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */ FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON `o`.`o_custkey` = `c`.`c_custkey`; - ``` ## Lookup Options diff --git a/website/docs/engine-flink/reads.md b/website/docs/engine-flink/reads.md index 6a90fb28..63cc59f0 100644 --- a/website/docs/engine-flink/reads.md +++ b/website/docs/engine-flink/reads.md @@ -5,10 +5,12 @@ sidebar_position: 4 # Flink Reads Fluss support streaming and batch read with [Apache Flink](https://flink.apache.org/)'s SQL & Table API. Execute the following SQL command to switch execution mode from streaming to batch, and vice versa: -```sql +```sql title="Flink SQL" -- Execute the flink job in streaming mode for current session context SET 'execution.runtime-mode' = 'streaming'; +``` +```sql title="Flink SQL" -- Execute the flink job in batch mode for current session context SET 'execution.runtime-mode' = 'batch'; ``` @@ -19,13 +21,16 @@ By default, Streaming read produces the latest snapshot on the table upon first Fluss by default ensures that your startup is properly processed with all data included. Fluss Source in streaming mode is unbounded, like a queue that never ends. -```sql +```sql title="Flink SQL" SET 'execution.runtime-mode' = 'streaming'; +``` + +```sql title="Flink SQL" SELECT * FROM my_table ; ``` You can also do streaming read without reading the snapshot data, you can use `latest` scan mode, which only reads the changelogs (or logs) from the latest offset: -```sql +```sql title="Flink SQL" SELECT * FROM my_table /*+ OPTIONS('scan.startup.mode' = 'latest') */; ``` @@ -36,7 +41,7 @@ The Fluss sources supports limiting reads for both primary-key tables and log ta #### Example 1. Create a table and prepare data -```sql +```sql title="Flink SQL" CREATE TABLE log_table ( `c_custkey` INT NOT NULL, `c_name` STRING NOT NULL, @@ -47,7 +52,9 @@ CREATE TABLE log_table ( `c_mktsegment` STRING NOT NULL, `c_comment` STRING NOT NULL ); +``` +```sql title="Flink SQL" INSERT INTO log_table VALUES (1, 'Customer1', 'IVhzIApeRb ot,c,E', 15, '25-989-741-2988', 711.56, 'BUILDING', 'comment1'), (2, 'Customer2', 'XSTf4,NCwDVaWNe6tEgvwfmRchLXak', 13, '23-768-687-3665', 121.65, 'AUTOMOBILE', 'comment2'), @@ -56,11 +63,16 @@ VALUES (1, 'Customer1', 'IVhzIApeRb ot,c,E', 15, '25-989-741-2988', 711.56, 'BUI ``` 2. Query from table. -```sql +```sql title="Flink SQL" -- Execute the flink job in batch mode for current session context SET 'execution.runtime-mode' = 'batch'; +``` + +```sql title="Flink SQL" SET 'sql-client.execution.result-mode' = 'tableau'; +``` +```sql title="Flink SQL" SELECT * FROM log_table LIMIT 10; ``` @@ -71,7 +83,7 @@ The Fluss source supports point queries for primary-key tables, allowing you to #### Example 1. Create a table and prepare data -```sql +```sql title="Flink SQL" CREATE TABLE pk_table ( `c_custkey` INT NOT NULL, `c_name` STRING NOT NULL, @@ -83,6 +95,9 @@ CREATE TABLE pk_table ( `c_comment` STRING NOT NULL, PRIMARY KEY (c_custkey) NOT ENFORCED ); +``` + +```sql title="Flink SQL" INSERT INTO pk_table VALUES (1, 'Customer1', 'IVhzIApeRb ot,c,E', 15, '25-989-741-2988', 711.56, 'BUILDING', 'comment1'), (2, 'Customer2', 'XSTf4,NCwDVaWNe6tEgvwfmRchLXak', 13, '23-768-687-3665', 121.65, 'AUTOMOBILE', 'comment2'), @@ -91,21 +106,33 @@ VALUES (1, 'Customer1', 'IVhzIApeRb ot,c,E', 15, '25-989-741-2988', 711.56, 'BUI ``` 2. Query from table. -```sql +```sql title="Flink SQL" -- Execute the flink job in batch mode for current session context SET 'execution.runtime-mode' = 'batch'; +``` + +```sql title="Flink SQL" SET 'sql-client.execution.result-mode' = 'tableau'; +``` +```sql title="Flink SQL" SELECT * FROM pk_table WHERE c_custkey = 1; ``` ### Aggregations The Fluss source support pushdown count aggregation for the log table in batch mode. It is useful to preview the total number of the log table; -```sql +```sql title="Flink SQL" -- Execute the flink job in batch mode for current session context SET 'execution.runtime-mode' = 'batch'; +``` + +```sql title="Flink SQL" +-- Execute the flink job in batch mode for current session context SET 'sql-client.execution.result-mode' = 'tableau'; +``` +```sql title="Flink SQL" +-- Execute the flink job in batch mode for current session context SELECT COUNT(*) FROM log_table; ``` @@ -121,17 +148,19 @@ The scan startup mode enables you to specify the starting point for data consump You can dynamically apply the scan parameters via SQL hints. For instance, the following SQL statement temporarily sets the `scan.startup.mode` to latest when consuming the `log_table` table. -```sql +```sql title="Flink SQL" SELECT * FROM log_table /*+ OPTIONS('scan.startup.mode' = 'latest') */; ``` Also, the following SQL statement temporarily sets the `scan.startup.mode` to timestamp when consuming the `log_table` table. -```sql +```sql title="Flink SQL" -- timestamp mode with microseconds. SELECT * FROM log_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '1678883047356') */; +``` +```sql title="Flink SQL" -- timestamp mode with a time string format SELECT * FROM log_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', diff --git a/website/docs/engine-flink/writes.md b/website/docs/engine-flink/writes.md index 6fef0e45..963ab5a0 100644 --- a/website/docs/engine-flink/writes.md +++ b/website/docs/engine-flink/writes.md @@ -15,7 +15,7 @@ They support both streaming and batch modes and are compatible with primary-key ### Appending Data to the Log Table #### Create a Log table. -```sql +```sql title="Flink SQL" CREATE TABLE log_table ( order_id BIGINT, item_id BIGINT, @@ -25,14 +25,16 @@ CREATE TABLE log_table ( ``` #### Insert data into the Log table. -```sql +```sql title="Flink SQL" CREATE TEMPORARY TABLE source ( order_id BIGINT, item_id BIGINT, amount INT, address STRING ) WITH ('connector' = 'datagen'); +``` +```sql title="Flink SQL" INSERT INTO log_table SELECT * FROM source; ``` @@ -41,7 +43,7 @@ SELECT * FROM source; ### Perform Data Upserts to the PrimaryKey Table. #### Create a primary key table. -```sql +```sql title="Flink SQL" CREATE TABLE pk_table ( shop_id BIGINT, user_id BIGINT, @@ -52,28 +54,32 @@ CREATE TABLE pk_table ( ``` #### Updates All Columns -```sql +```sql title="Flink SQL" CREATE TEMPORARY TABLE source ( shop_id BIGINT, user_id BIGINT, num_orders INT, total_amount INT ) WITH ('connector' = 'datagen'); +``` +```sql title="Flink SQL" INSERT INTO pk_table SELECT * FROM source; ``` #### Partial Updates -```sql +```sql title="Flink SQL" CREATE TEMPORARY TABLE source ( shop_id BIGINT, user_id BIGINT, num_orders INT, total_amount INT ) WITH ('connector' = 'datagen'); +``` +```sql title="Flink SQL" -- only partial-update the num_orders column INSERT INTO pk_table (shop_id, user_id, num_orders) SELECT shop_id, user_id, num_orders FROM source; @@ -84,9 +90,12 @@ SELECT shop_id, user_id, num_orders FROM source; Fluss supports deleting data for primary-key tables in batch mode via `DELETE FROM` statement. Currently, only single data deletions based on the primary key are supported. * the primary key table -```sql +```sql title="Flink SQL" -- DELETE statement requires batch mode SET 'execution.runtime-mode' = 'batch'; +``` + +```sql title="Flink SQL" -- The condition must include all primary key equality conditions. DELETE FROM pk_table WHERE shop_id = 10000 and user_id = 123456; ``` @@ -94,9 +103,12 @@ DELETE FROM pk_table WHERE shop_id = 10000 and user_id = 123456; ## UPDATE Fluss enables data updates for primary-key tables in batch mode using the `UPDATE` statement. Currently, only single-row updates based on the primary key are supported. -```sql +```sql title="Flink SQL" -- Execute the flink job in batch mode for current session context SET execution.runtime-mode = batch; +``` + +```sql title="Flink SQL" -- The condition must include all primary key equality conditions. UPDATE pk_table SET total_amount = 2 WHERE shop_id = 10000 and user_id = 123456; ``` \ No newline at end of file diff --git a/website/docs/install-deploy/deploying-distributed-cluster.md b/website/docs/install-deploy/deploying-distributed-cluster.md index d316baa2..7e46f3b8 100644 --- a/website/docs/install-deploy/deploying-distributed-cluster.md +++ b/website/docs/install-deploy/deploying-distributed-cluster.md @@ -128,14 +128,14 @@ You can start a Flink standalone cluster refer to [Flink Environment Preparation #### Add catalog In Flink SQL client, a catalog is created and named by executing the following query: -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE CATALOG fluss_catalog WITH ( 'type'='fluss', 'bootstrap.servers' = '192.168.10.1:9123' ); ``` -#### Do more with Fluss +#### What's next? -After the catalog is created, you can use Flink SQL Client to do more with Fluss, for example, create a table, insert data, query data, etc. -More details please refer to [Flink Getting Started](/docs/engine-flink/getting-started/). +After the catalog is created, you can use Flink and the Flink SQL Client to do more with Fluss, for example, create a table, insert data, query data, etc. +For more details, please refer to [Getting Started with Flink Engine](/docs/engine-flink/getting-started/) (you can skip the first steps and start [here](/docs/engine-flink/getting-started/#creating-a-table)). \ No newline at end of file diff --git a/website/docs/install-deploy/deploying-local-cluster.md b/website/docs/install-deploy/deploying-local-cluster.md index 0173e12f..4cfaecdf 100644 --- a/website/docs/install-deploy/deploying-local-cluster.md +++ b/website/docs/install-deploy/deploying-local-cluster.md @@ -56,14 +56,14 @@ You can start a Flink standalone cluster refer to [Flink Environment Preparation #### Add catalog In Flink SQL client, a catalog is created and named by executing the following query: -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE CATALOG fluss_catalog WITH ( 'type'='fluss', 'bootstrap.servers' = 'localhost:9123' ); ``` -#### Do more with Fluss +#### What's next? -After the catalog is created, you can use Flink SQL Client to do more with Fluss, for example, create a table, insert data, query data, etc. -More details please refer to [Flink Getting started](/docs/engine-flink/getting-started/) \ No newline at end of file +After the catalog is created, you can use Flink and the Flink SQL Client to do more with Fluss, for example, create a table, insert data, query data, etc. +For more details, please refer to [Getting Started with Flink Engine](/docs/engine-flink/getting-started/) (you can skip the first steps and start [here](/docs/engine-flink/getting-started/#creating-a-table)). \ No newline at end of file diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 7f3ad146..adc151ca 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -179,7 +179,7 @@ during the Fluss cluster working. When you create a table in Fluss by Flink, you can specify the table configuration by Flink in with options, like: -```sql +```sql title="Flink SQL" CREATE TABLE my_table ( id INT, name STRING, @@ -230,7 +230,7 @@ Currently, we don't support alter table configuration by Flink. This will be sup 1. When you create a table in Fluss by Flink, you can specify the client configuration by Flink in with options, like: -```sql +```sql title="Flink SQL" CREATE TABLE my_table ( id INT, name STRING, @@ -242,7 +242,7 @@ CREATE TABLE my_table ( 2. Also, you can change the client configuration use [Flink SQL Hints](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/hints/#dynamic-table-options) like: -```sql +```sql title="Flink SQL" INSERT INTO my_table /*+ OPTIONS('client.writer.acks' = '0') */ SELECT * FROM my_source diff --git a/website/docs/quickstart/flink.md b/website/docs/quickstart/flink.md index cdf14114..39e86847 100644 --- a/website/docs/quickstart/flink.md +++ b/website/docs/quickstart/flink.md @@ -120,7 +120,7 @@ This command automatically starts all the containers defined in the Docker Compo Run ```shell -docker ps +docker container ls ``` to check whether all containers are running properly. @@ -143,35 +143,35 @@ docker compose exec jobmanager ./sql-client To simplify this guide, three temporary tables have been pre-created with `faker` connector to generate data. You can view their schemas by running the following commands: -```sql title="Flink SQL Client" +```sql title="Flink SQL" SHOW CREATE TABLE source_customer; ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" SHOW CREATE TABLE source_order; ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" SHOW CREATE TABLE source_nation; ``` ## Create Fluss Tables ### Create Fluss Catalog Use the following SQL to create a Fluss catalog: -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE CATALOG my_fluss WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123' ); ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" USE CATALOG my_fluss; ``` ### Create Tables Running the following SQL to create Fluss tables to be used in this guide: -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE TABLE fluss_order ( `order_key` BIGINT, `cust_key` INT NOT NULL, @@ -184,7 +184,7 @@ CREATE TABLE fluss_order ( ); ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE TABLE fluss_customer ( `cust_key` INT NOT NULL, `name` STRING, @@ -196,7 +196,7 @@ CREATE TABLE fluss_customer ( ); ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE TABLE `fluss_nation` ( `nation_key` INT NOT NULL, `name` STRING, @@ -204,7 +204,7 @@ CREATE TABLE `fluss_nation` ( ); ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE TABLE enriched_orders ( `order_key` BIGINT, `cust_key` INT NOT NULL, @@ -224,7 +224,7 @@ CREATE TABLE enriched_orders ( ## Streaming into Fluss First, run the following SQL to sync data from source tables to Fluss tables: -```sql title="Flink SQL Client" +```sql title="Flink SQL" EXECUTE STATEMENT SET BEGIN INSERT INTO fluss_nation SELECT * FROM `default_catalog`.`default_database`.source_nation; @@ -236,7 +236,7 @@ END; Fluss primary-key tables support high QPS point lookup queries on primary keys. Performing a [lookup join](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/joins/#lookup-join) is really efficient and you can use it to enrich the `fluss_orders` table with information from the `fluss_customer` and `fluss_nation` primary-key tables. -```sql title="Flink SQL Client" +```sql title="Flink SQL" INSERT INTO enriched_orders SELECT o.order_key, o.cust_key, @@ -261,17 +261,17 @@ LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n` You can now perform real-time analytics directly on Fluss tables. For instance, to calculate the number of orders placed by a specific customer, you can execute the following SQL query to obtain instant, real-time results. -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau'; ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- switch to batch mode SET 'execution.runtime-mode' = 'batch'; ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- use limit to query the enriched_orders table SELECT * FROM enriched_orders LIMIT 2; ``` @@ -287,7 +287,7 @@ SELECT * FROM enriched_orders LIMIT 2; ``` If you are interested in a specific customer, you can retrieve their details by performing a lookup on the `cust_key`. -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- lookup by primary key SELECT * FROM fluss_customer WHERE `cust_key` = 1; ``` @@ -305,12 +305,12 @@ SELECT * FROM fluss_customer WHERE `cust_key` = 1; You can use `UPDATE` and `DELETE` statements to update/delete rows on Fluss tables. ### Update -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- update by primary key UPDATE fluss_customer SET `name` = 'fluss_updated' WHERE `cust_key` = 1; ``` Then you can `lookup` the specific row: -```sql title="Flink SQL Client" +```sql title="Flink SQL" SELECT * FROM fluss_customer WHERE `cust_key` = 1; ``` **Sample Output** @@ -324,11 +324,11 @@ SELECT * FROM fluss_customer WHERE `cust_key` = 1; Notice that the `name` column has been updated to `fluss_updated`. ### Delete -```sql title="Flink SQL Client +```sql title="Flink SQL" DELETE FROM fluss_customer WHERE `cust_key` = 1; ``` The following SQL query should return an empty result. -```sql title="Flink SQL Client" +```sql title="Flink SQL" SELECT * FROM fluss_customer WHERE `cust_key` = 1; ``` @@ -347,7 +347,7 @@ By default, tables are created with data lake integration disabled, meaning the To enable lakehouse functionality as a tiered storage solution for a table, you must create the table with the configuration option `table.datalake.enabled = true`. Return to the `SQL client` and execute the following SQL statement to create a table with data lake integration enabled: -```sql title="Flink SQL Client" +```sql title="Flink SQL" CREATE TABLE datalake_enriched_orders ( `order_key` BIGINT, `cust_key` INT NOT NULL, @@ -365,12 +365,12 @@ CREATE TABLE datalake_enriched_orders ( ``` Next, perform streaming data writing into the **datalake-enabled** table, `datalake_enriched_orders`: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- switch to streaming mode SET 'execution.runtime-mode' = 'streaming'; ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- insert tuples into datalake_enriched_orders INSERT INTO datalake_enriched_orders SELECT o.order_key, @@ -401,12 +401,12 @@ If you wish to query only the data stored in Paimon—offering high-performance This approach also enables all the optimizations and features of a Flink Paimon table source, including [system table](https://paimon.apache.org/docs/master/concepts/system-tables/) such as `datalake_enriched_orders$lake$snapshots`. To query the snapshots directly from Paimon, use the following SQL: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- switch to batch mode SET 'execution.runtime-mode' = 'batch'; ``` -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- query snapshots in paimon SELECT snapshot_id, total_record_count FROM datalake_enriched_orders$lake$snapshots; ``` @@ -422,7 +422,7 @@ SELECT snapshot_id, total_record_count FROM datalake_enriched_orders$lake$snapsh **Note:** Make sure to wait for the checkpoints (~30s) to complete before querying the snapshots, otherwise the result will be empty. Run the following SQL to do analytics on Paimon data: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- to sum prices of all orders in paimon SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake; ``` @@ -436,7 +436,7 @@ SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake; ``` To achieve results with sub-second data freshness, you can query the table directly, which seamlessly unifies data from both Fluss and Paimon: -```sql title="Flink SQL Client" +```sql title="Flink SQL" -- to sum prices of all orders in fluss and paimon SELECT sum(total_price) as sum_price FROM datalake_enriched_orders; ``` diff --git a/website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md b/website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md index ce38479c..83a34971 100644 --- a/website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md +++ b/website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md @@ -34,10 +34,12 @@ SQL shows how to do that: ```sql title="Flink SQL" -- assume we have a table named `orders` - -- read from paimon SELECT COUNT(*) FROM orders$lake; +``` +```sql title="Flink SQL" +-- assume we have a table named `orders` -- we can also query the system tables SELECT * FROM orders$lake$snapshots; ``` @@ -79,7 +81,9 @@ Then, you can query the `orders` table by StarRocks: ```sql title="StarRocks SQL" -- the table is in database `fluss` SELECT COUNT(*) FROM paimon_catalog.fluss.orders; +``` +```sql title="StarRocks SQL" -- query the system tables, to know the snapshots of the table SELECT * FROM paimon_catalog.fluss.enriched_orders$snapshots; ``` diff --git a/website/docs/table-design/data-distribution/partitioning.md b/website/docs/table-design/data-distribution/partitioning.md index 1be806b4..3d6b6027 100644 --- a/website/docs/table-design/data-distribution/partitioning.md +++ b/website/docs/table-design/data-distribution/partitioning.md @@ -22,7 +22,7 @@ For partitioned tables, Fluss supports auto partitioning creation. Partitions ca ## Auto Partitioning Options ### Example The auto-partitioning rules are configured through table options. The following example demonstrates creating a table named `site_access` that supports automatic partitioning using Flink SQL. -```sql +```sql title="Flink SQL" CREATE TABLE site_access( event_day STRING, site_id INT,