Skip to content

Commit

Permalink
[docs] Split up multi-command Flink SQL code listings into separate c…
Browse files Browse the repository at this point in the history
…ode listings

- Split up Flink SQL code blocks
- Harmonized code listing titles
- Harmonized sections across deployment options

Issue alibaba#243
  • Loading branch information
michaelkoepf committed Dec 23, 2024
1 parent 1879ddb commit a8c61c9
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 79 deletions.
28 changes: 19 additions & 9 deletions website/docs/engine-flink/ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ sidebar_position: 2

## Create Catalog
Fluss supports creating and managing tables through the Fluss Catalog.
```sql
```sql title="Flink SQL"
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'fluss-server-1:9123'
);
```

```sql title="Flink SQL"
USE CATALOG fluss_catalog;
```
The following properties can be set if using the Fluss catalog:
Expand All @@ -29,18 +31,24 @@ The following introduced statements assuming the the current catalog is switched

By default, FlussCatalog will use the `fluss` database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default `fluss` database:

```sql
```sql title="Flink SQL"
CREATE DATABASE my_db;
```

```sql title="Flink SQL"
USE my_db;
```

## Drop Database

To delete a database, this will drop all the tables in the database as well:

```sql
```sql title="Flink SQL"
-- Flink doesn't allow drop current database, switch to Fluss default database
USE fluss;
```

```sql title="Flink SQL"
-- drop the database
DROP DATABASE my_db;
```
Expand All @@ -50,7 +58,7 @@ DROP DATABASE my_db;
### PrimaryKey Table

The following SQL statement will create a [PrimaryKey Table](table-design/table-types/pk-table.md) with a primary key consisting of shop_id and user_id.
```sql
```sql title="Flink SQL"
CREATE TABLE my_pk_table (
shop_id BIGINT,
user_id BIGINT,
Expand All @@ -66,7 +74,7 @@ CREATE TABLE my_pk_table (

The following SQL statement creates a [Log Table](table-design/table-types/log-table.md) by not specifying primary key clause.

```sql
```sql title="Flink SQL"
CREATE TABLE my_log_table (
order_id BIGINT,
item_id BIGINT,
Expand All @@ -86,7 +94,7 @@ Currently, Fluss only supports one partitioned field with `STRING` type.
Currently, partitioned table must enable auto partition and set auto partition time unit.
:::

```sql
```sql title="Flink SQL"
CREATE TABLE my_part_pk_table (
dt STRING,
shop_id BIGINT,
Expand All @@ -103,7 +111,7 @@ CREATE TABLE my_part_pk_table (

The following SQL statement creates a Partitioned Log Table in Fluss.

```sql
```sql title="Flink SQL"
CREATE TABLE my_part_log_table (
order_id BIGINT,
item_id BIGINT,
Expand Down Expand Up @@ -131,7 +139,7 @@ The supported option in "with" parameters when creating a table are as follows:

To create a table with the same schema, partitioning, and table properties as another table, use `CREATE TABLE LIKE`.

```sql
```sql title="Flink SQL"
-- there is a temporary datagen table
CREATE TEMPORARY TABLE datagen (
user_id BIGINT,
Expand All @@ -143,7 +151,9 @@ CREATE TEMPORARY TABLE datagen (
'connector' = 'datagen',
'rows-per-second' = '10'
);
```

```sql title="Flink SQL"
-- creates Fluss table which derives the metadata from the temporary table excluding options
CREATE TABLE my_table LIKE datagen (EXCLUDING OPTIONS);
```
Expand All @@ -154,7 +164,7 @@ For more details, refer to the [Flink CREATE TABLE](https://nightlies.apache.org

To delete a table, run:

```sql
```sql title="Flink SQL"
DROP TABLE my_table;
```

Expand Down
26 changes: 18 additions & 8 deletions website/docs/engine-flink/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ To quickly stop the cluster and all running components, you can use the provided

## Creating a Catalog
You can use the following SQL statement to create a catalog.
```sql title="Flink SQL Client"
```sql title="Flink SQL"
CREATE CATALOG fluss_catalog WITH (
'type'='fluss',
'bootstrap.servers' = 'localhost:9123'
Expand All @@ -86,9 +86,11 @@ CREATE CATALOG fluss_catalog WITH (
:::

## Creating a Table
```sql title="Flink SQL Client"
```sql title="Flink SQL"
USE CATALOG `fluss_catalog`;
```

```sql title="Flink SQL"
CREATE TABLE pk_table (
shop_id BIGINT,
user_id BIGINT,
Expand All @@ -102,49 +104,57 @@ CREATE TABLE pk_table (

## Data Writing
To append new data to a table, you can use `INSERT INTO` in batch mode or streaming mode:
```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- Execute the flink job in batch mode for current session context
SET 'execution.runtime-mode' = 'batch';
```

```sql title="Flink SQL"
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';
```

```sql title="Flink SQL"
INSERT INTO pk_table VALUES
(1234, 1234, 1, 1),
(12345, 12345, 2, 2),
(123456, 123456, 3, 3);
```
To update data record with the primary key (1234, 1234) in a Flink streaming job, use the UPDATE statement as follows:

```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- should run in batch mode
UPDATE pk_table SET total_amount = 4 WHERE shop_id = 1234 and user_id = 1234;
```

To delete the data record with primary key `(12345, 12345)`, use `DELETE FROM`:

```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- should run in batch mode
DELETE FROM pk_table WHERE shop_id = 12345 and user_id = 12345;
```

## Data Reading

To retrieve data with the primary key `(1234, 1234)`, you can perform a point query by applying a filter on the primary key:
```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- should run in batch mode
SELECT * FROM pk_table WHERE shop_id = 1234 and user_id = 1234;
```

To preview a subset of the data in a table, you can use a `LIMIT` clause.
```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- should run in batch mode
SELECT * FROM pk_table LIMIT 10;
```

Fluss supports processing incremental data reading in flink streaming jobs:
```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- Submit the flink job in streaming mode for current session.
SET 'execution.runtime-mode' = 'streaming';
```

```sql title="Flink SQL"
-- reading changelogs from the primary-key table from beginning.
SELECT * FROM pk_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
```
Expand Down
18 changes: 13 additions & 5 deletions website/docs/engine-flink/lookups.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Flink lookup joins are important because they enable efficient, real-time enrich

## Examples
1. Create two tables.
```sql
```sql title="Flink SQL"
CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
`o_orderkey` INT NOT NULL,
`o_custkey` INT NOT NULL,
Expand All @@ -26,8 +26,9 @@ CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
`o_comment` STRING NOT NULL,
PRIMARY KEY (o_orderkey) NOT ENFORCED
);
```


```sql title="Flink SQL"
CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
Expand All @@ -42,18 +43,25 @@ CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
```

2. Perform lookup join.
```sql
```sql title="Flink SQL"
USE CATALOG `fluss_catalog`;
```

```sql title="Flink SQL"
USE my_db;
```

```sql title="Flink SQL"
CREATE TEMPORARY TABLE lookup_join_sink
(
order_key INT NOT NULL,
order_totalprice DECIMAL(15, 2) NOT NULL,
customer_name STRING NOT NULL,
customer_address STRING NOT NULL
) WITH ('connector' = 'blackhole');
```

```sql title="Flink SQL"
-- look up join in asynchronous mode.
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
Expand All @@ -62,8 +70,9 @@ FROM
LEFT JOIN `customer`
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;
```


```sql title="Flink SQL"
-- look up join in synchronous mode.
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
Expand All @@ -72,7 +81,6 @@ FROM
LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;

```

## Lookup Options
Expand Down
Loading

0 comments on commit a8c61c9

Please sign in to comment.