From 83cfd18a41391d66657a9c9bb7d34954feb0e70a Mon Sep 17 00:00:00 2001
From: wudi <676366545@qq.com>
Date: Tue, 2 Apr 2024 15:13:01 +0800
Subject: [PATCH] [Doc](Flink-Connect)update flink connect 1.6.0 doc (#490)
---
docs/ecosystem/flink-doris-connector.md | 34 ++++++++++++-------
.../ecosystem/flink-doris-connector.md | 34 ++++++++++++-------
2 files changed, 42 insertions(+), 26 deletions(-)
diff --git a/docs/ecosystem/flink-doris-connector.md b/docs/ecosystem/flink-doris-connector.md
index 52738e1238c1b..bcc61b637ba29 100644
--- a/docs/ecosystem/flink-doris-connector.md
+++ b/docs/ecosystem/flink-doris-connector.md
@@ -45,6 +45,7 @@ under the License.
| 1.3.0 | 1.16 | 1.0+ | 8 | - |
| 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- |
| 1.5.2 | 1.15,1.16,1.17,1.18 | 1.0+ | 8 |- |
+| 1.6.0 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 |- |
## USE
@@ -57,7 +58,7 @@ Add flink-doris-connector
org.apache.doris
flink-doris-connector-1.16
- 1.5.2
+ 1.6.0
```
@@ -320,19 +321,24 @@ ON a.city = c.city
| password | -- | Y | Password to access Doris |
| auto-redirect | true | N | Whether to redirect StreamLoad requests. After being turned on, StreamLoad will be written through FE, and BE information will no longer be displayed. |
| doris.request.retries | 3 | N | Number of retries to send requests to Doris |
-| doris.request.connect.timeout.ms | 30000 | N | Connection timeout for sending requests to Doris |
-| doris.request.read.timeout.ms | 30000 | N | Read timeout for sending requests to Doris |
+| doris.request.connect.timeout | 30s | N | Connection timeout for sending requests to Doris |
+| doris.request.read.timeout | 30s | N | Read timeout for sending requests to Doris |
### Source configuration item
| Key | Default Value | Required | Comment |
| ----------------------------- | ------------------ | -------- | ------------------------------------------------------------ |
-| doris.request.query.timeout.s | 3600 | N | The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit |
-| doris.request.tablet.size | Integer. MAX_VALUE | N | The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris. |
+| doris.request.query.timeout | 21600s | N | The timeout time for querying Doris, the default value is 6 hour |
+| doris.request.tablet.size | 1 | N | The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris. |
| doris.batch.size | 1024 | N | The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay. |
-| doris.exec.mem.limit | 2147483648 | N | Memory limit for a single query. The default is 2GB, in bytes |
+| doris.exec.mem.limit | 8192mb | N | Memory limit for a single query. The default is 8GB, in bytes |
| doris.deserialize.arrow.async | FALSE | N | Whether to support asynchronous conversion of Arrow format to RowBatch needed for flink-doris-connector iterations |
| doris.deserialize.queue.size | 64 | N | Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true |
+
+#### DataStream proprietary configuration items
+
+| Key | Default Value | Required | Comment |
+| ----------------------------- | ------------------ | -------- | ------------------------------------------------------------ |
| doris.read.field | -- | N | Read the list of column names of the Doris table, separated by commas |
| doris.filter.query | -- | N | The expression to filter the read data, this expression is transparently passed to Doris. Doris uses this expression to complete source-side data filtering. For example age=18. |
@@ -387,8 +393,10 @@ ON a.city = c.city
| VARCHAR | STRING |
| STRING | STRING |
| DECIMALV2 | DECIMAL |
-| TIME | DOUBLE |
-| HLL | Unsupported datatype |
+| ARRAY | ARRAY |
+| MAP | MAP |
+| JSON | STRING |
+
## Flink write Metrics
Where the metrics value of type Counter is the cumulative value of the imported task from the beginning to the current time, you can observe each metric in each table in the Flink Webui metrics.
@@ -526,10 +534,10 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source;
| --table-suffix | Same as above, the suffix name of the Doris table. |
| --including-tables | For MySQL tables that need to be synchronized, you can use "|" to separate multiple tables and support regular expressions. For example --including-tables table1 |
| --excluding-tables | For tables that do not need to be synchronized, the usage is the same as above. |
-| --mysql-conf | MySQL CDCSource configuration, for example --mysql-conf hostname=127.0.0.1, you can find it [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html) View all configurations MySQL-CDC, where hostname/username/password/database-name is required. When the synchronized library table contains a non-primary key table, `scan.incremental.snapshot.chunk.key-column` must be set, and only one field of non-null type can be selected.
For example: `scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`, different database table columns are separated by `,`. |
-| --oracle-conf | Oracle CDCSource configuration, for example --oracle-conf hostname=127.0.0.1, you can find [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html) View all configurations Oracle-CDC, where hostname/username/password/database-name/schema-name is required. |
-| --postgres-conf | Postgres CDCSource configuration, e.g. --postgres-conf hostname=127.0.0.1, you can find [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html) View all configurations Postgres-CDC where hostname/username/password/database-name/schema-name/slot.name is required. |
-| --sqlserver-conf | SQLServer CDCSource configuration, for example --sqlserver-conf hostname=127.0.0.1, you can find it [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/sqlserver-cdc.html) View all configurations SQLServer-CDC, where hostname/username/password/database-name/schema-name is required. |
+| --mysql-conf | MySQL CDCSource configuration, for example --mysql-conf hostname=127.0.0.1, you can find it [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/) View all configurations MySQL-CDC, where hostname/username/password/database-name is required. When the synchronized library table contains a non-primary key table, `scan.incremental.snapshot.chunk.key-column` must be set, and only one field of non-null type can be selected.
For example: `scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`, different database table columns are separated by `,`. |
+| --oracle-conf | Oracle CDCSource configuration, for example --oracle-conf hostname=127.0.0.1, you can find [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/oracle-cdc/) View all configurations Oracle-CDC, where hostname/username/password/database-name/schema-name is required. |
+| --postgres-conf | Postgres CDCSource configuration, e.g. --postgres-conf hostname=127.0.0.1, you can find [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/) View all configurations Postgres-CDC where hostname/username/password/database-name/schema-name/slot.name is required. |
+| --sqlserver-conf | SQLServer CDCSource configuration, for example --sqlserver-conf hostname=127.0.0.1, you can find it [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/sqlserver-cdc/) View all configurations SQLServer-CDC, where hostname/username/password/database-name/schema-name is required. |
| --sink-conf | All configurations of Doris Sink can be found [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9) View the complete configuration items. |
| --table-conf | The configuration items of the Doris table(The exception is table-buckets, non-properties attributes), that is, the content contained in properties. For example `--table-conf replication_num=1`, and the `--table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"` option specifies the number of buckets for different tables based on the order of regular expressions. If there is no match, the table is created with the default setting of BUCKETS AUTO. |
| --ignore-default-value | Turn off the default value of synchronizing mysql table structure. It is suitable for synchronizing mysql data to doris when the field has a default value but the actual inserted data is null. Reference [here](https://github.com/apache/doris-flink-connector/pull/152) |
@@ -779,7 +787,7 @@ You can search for the log `abort transaction response` in TaskManager and deter
14. **org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. "xx" encountered at row x, column xx**
-This problem is mainly caused by the conditional varchar/string type, which needs to be quoted. The correct way to write it is xxx = ''xxx''. In this way, the Flink SQL parser will interpret two consecutive single quotes as one single quote character instead of The end of the string, and the concatenated string is used as the value of the attribute. For example: `t1 >= '2024-01-01'` can be written as `'doris.filter.query' = 't1 >=''2024-01-01'''`.
+This problem is mainly caused by the conditional varchar/string type, which needs to be quoted. The correct way to write it is xxx = ''xxx''. In this way, the Flink SQL parser will interpret two consecutive single quotes as one single quote character instead of The end of the string, and the concatenated string is used as the value of the attribute. For example: `t1 >= '2024-01-01'` can be written as `'doris.filter.query' = 't1 >=''2024-01-01'''`. After Connector 1.6.0, FlinkSQL can implement automatic predicate and projection pushdown.
15. **Failed to connect to backend: http://host:webserver_port, and BE is still alive**
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md
index 578c81c817699..7da42fa5db204 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/flink-doris-connector.md
@@ -47,6 +47,7 @@ under the License.
| 1.3.0 | 1.16 | 1.0+ | 8 | - |
| 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- |
| 1.5.2 | 1.15,1.16,1.17,1.18 | 1.0+ | 8 |- |
+| 1.6.0 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 |- |
## 使用
@@ -59,7 +60,7 @@ under the License.
org.apache.doris
flink-doris-connector-1.16
- 1.5.2
+ 1.6.0
```
@@ -323,22 +324,27 @@ ON a.city = c.city
| password | -- | Y | 访问 Doris 的密码 |
| auto-redirect | true | N | 是否重定向StreamLoad请求。开启后StreamLoad将通过FE写入,不再显示获取BE信息 |
| doris.request.retries | 3 | N | 向 Doris 发送请求的重试次数 |
-| doris.request.connect.timeout.ms | 30000 | N | 向 Doris 发送请求的连接超时时间 |
-| doris.request.read.timeout.ms | 30000 | N | 向 Doris 发送请求的读取超时时间 |
+| doris.request.connect.timeout | 30s | N | 向 Doris 发送请求的连接超时时间 |
+| doris.request.read.timeout | 30s | N | 向 Doris 发送请求的读取超时时间 |
### Source 配置项
| Key | Default Value | Required | Comment |
| ----------------------------- | ------------------ | -------- | ------------------------------------------------------------ |
-| doris.request.query.timeout.s | 3600 | N | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 |
-| doris.request.tablet.size | Integer. MAX_VALUE | N | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 |
+| doris.request.query.timeout | 21600s | N | 查询 Doris 的超时时间,默认值为6小时 |
+| doris.request.tablet.size | 1 | N | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 |
| doris.batch.size | 1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
-| doris.exec.mem.limit | 2147483648 | N | 单个查询的内存限制。默认为 2GB,单位为字节 |
+| doris.exec.mem.limit | 8192mb | N | 单个查询的内存限制。默认为 8GB,单位为字节 |
| doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
| doris.deserialize.queue.size | 64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
+
+#### DataStream 专有配置项
+| Key | Default Value | Required | Comment |
+| ----------------------------- | ------------------ | -------- | ------------------------------------------------------------ |
| doris.read.field | -- | N | 读取 Doris 表的列名列表,多列之间使用逗号分隔 |
| doris.filter.query | -- | N | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。比如 age=18。 |
+
### Sink 配置项
| Key | Default Value | Required | Comment |
@@ -390,8 +396,10 @@ ON a.city = c.city
| VARCHAR | STRING |
| STRING | STRING |
| DECIMALV2 | DECIMAL |
-| TIME | DOUBLE |
-| HLL | Unsupported datatype |
+| ARRAY | ARRAY |
+| MAP | MAP |
+| JSON | STRING |
+
## Flink 写入指标
其中Counter类型的指标值为导入任务从开始到当前的累加值,可以在Flink Webui metrics中观察各表的各项指标。
@@ -527,10 +535,10 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source;
| --table-suffix | 同上,Doris表的后缀名。 |
| --including-tables | 需要同步的MySQL表,可以使用"\|" 分隔多个表,并支持正则表达式。 比如--including-tables table1 |
| --excluding-tables | 不需要同步的表,用法同上。 |
-| --mysql-conf | MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1 ,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html)查看所有配置MySQL-CDC,其中hostname/username/password/database-name 是必需的。同步的库表中含有非主键表时,必须设置 `scan.incremental.snapshot.chunk.key-column`,且只能选择非空类型的一个字段。
例如:`scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`,不同的库表列之间用`,`隔开。 |
-| --oracle-conf | Oracle CDCSource 配置,例如--oracle-conf hostname=127.0.0.1,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html)查看所有配置Oracle-CDC,其中hostname/username/password/database-name/schema-name 是必需的。 |
-| --postgres-conf | Postgres CDCSource 配置,例如--postgres-conf hostname=127.0.0.1 ,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html)查看所有配置Postgres-CDC,其中hostname/username/password/database-name/schema-name/slot.name 是必需的。 |
-| --sqlserver-conf | SQLServer CDCSource 配置,例如--sqlserver-conf hostname=127.0.0.1 ,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/sqlserver-cdc.html)查看所有配置SQLServer-CDC,其中hostname/username/password/database-name/schema-name 是必需的。 |
+| --mysql-conf | MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1 ,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/)查看所有配置MySQL-CDC,其中hostname/username/password/database-name 是必需的。同步的库表中含有非主键表时,必须设置 `scan.incremental.snapshot.chunk.key-column`,且只能选择非空类型的一个字段。
例如:`scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`,不同的库表列之间用`,`隔开。 |
+| --oracle-conf | Oracle CDCSource 配置,例如--oracle-conf hostname=127.0.0.1,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/oracle-cdc/)查看所有配置Oracle-CDC,其中hostname/username/password/database-name/schema-name 是必需的。 |
+| --postgres-conf | Postgres CDCSource 配置,例如--postgres-conf hostname=127.0.0.1 ,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/)查看所有配置Postgres-CDC,其中hostname/username/password/database-name/schema-name/slot.name 是必需的。 |
+| --sqlserver-conf | SQLServer CDCSource 配置,例如--sqlserver-conf hostname=127.0.0.1 ,您可以在[这里](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/sqlserver-cdc/)查看所有配置SQLServer-CDC,其中hostname/username/password/database-name/schema-name 是必需的。 |
| --sink-conf | Doris Sink 的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9)查看完整的配置项。 |
| --table-conf | Doris表的配置项,即properties中包含的内容(其中table-buckets例外,非properties属性)。 例如 `--table-conf replication_num=1`, 而 `--table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"`表示按照正则表达式顺序指定不同表的buckets数量,如果没有匹配到则采用BUCKETS AUTO建表。 |
| --ignore-default-value | 关闭同步mysql表结构的默认值。适用于同步mysql数据到doris时,字段有默认值,但实际插入数据为null情况。参考[#152](https://github.com/apache/doris-flink-connector/pull/152) |
@@ -779,7 +787,7 @@ Flink在数据导入时,如果有脏数据,比如字段格式、长度等问
14. **使用doris.filter.query出现org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "xx" at line x, column xx**
-出现这个问题主要是条件varchar/string类型,需要加引号导致的,正确写法是 xxx = ''xxx'',这样Flink SQL 解析器会将两个连续的单引号解释为一个单引号字符,而不是字符串的结束,并将拼接后的字符串作为属性的值。比如说:`t1 >= '2024-01-01'`,可以写成`'doris.filter.query' = 't1 >=''2024-01-01'''`。
+出现这个问题主要是条件varchar/string类型,需要加引号导致的,正确写法是 xxx = ''xxx'',这样Flink SQL 解析器会将两个连续的单引号解释为一个单引号字符,而不是字符串的结束,并将拼接后的字符串作为属性的值。比如说:`t1 >= '2024-01-01'`,可以写成`'doris.filter.query' = 't1 >=''2024-01-01'''`。Connector1.6.0之后 FlinkSQL可以实现自动谓词和投影下推。
15. **如果出现Failed to connect to backend: http://host:webserver_port, 并且Be还是活着的**