Skip to content

Commit

Permalink
[Doc](Flink-Connect)update flink connect 1.6.0 doc (apache#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Apr 2, 2024
1 parent 45ee426 commit 83cfd18
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 26 deletions.
34 changes: 21 additions & 13 deletions docs/ecosystem/flink-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ under the License.
| 1.3.0 | 1.16 | 1.0+ | 8 | - |
| 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- |
| 1.5.2 | 1.15,1.16,1.17,1.18 | 1.0+ | 8 |- |
| 1.6.0 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 |- |

## USE

Expand All @@ -57,7 +58,7 @@ Add flink-doris-connector
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>1.5.2</version>
<version>1.6.0</version>
</dependency>
```

Expand Down Expand Up @@ -320,19 +321,24 @@ ON a.city = c.city
| password | -- | Y | Password to access Doris |
| auto-redirect | true | N | Whether to redirect StreamLoad requests. After being turned on, StreamLoad will be written through FE, and BE information will no longer be displayed. |
| doris.request.retries | 3 | N | Number of retries to send requests to Doris |
| doris.request.connect.timeout.ms | 30000 | N | Connection timeout for sending requests to Doris |
| doris.request.read.timeout.ms | 30000 | N | Read timeout for sending requests to Doris |
| doris.request.connect.timeout | 30s | N | Connection timeout for sending requests to Doris |
| doris.request.read.timeout | 30s | N | Read timeout for sending requests to Doris |

### Source configuration item

| Key | Default Value | Required | Comment |
| ----------------------------- | ------------------ | -------- | ------------------------------------------------------------ |
| doris.request.query.timeout.s | 3600 | N | The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit |
| doris.request.tablet.size | Integer. MAX_VALUE | N | The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris. |
| doris.request.query.timeout | 21600s | N | The timeout time for querying Doris, the default value is 6 hour |
| doris.request.tablet.size | 1 | N | The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris. |
| doris.batch.size | 1024 | N | The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay. |
| doris.exec.mem.limit | 2147483648 | N | Memory limit for a single query. The default is 2GB, in bytes |
| doris.exec.mem.limit | 8192mb | N | Memory limit for a single query. The default is 8GB, in bytes |
| doris.deserialize.arrow.async | FALSE | N | Whether to support asynchronous conversion of Arrow format to RowBatch needed for flink-doris-connector iterations |
| doris.deserialize.queue.size | 64 | N | Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true |

#### DataStream proprietary configuration items

| Key | Default Value | Required | Comment |
| ----------------------------- | ------------------ | -------- | ------------------------------------------------------------ |
| doris.read.field | -- | N | Read the list of column names of the Doris table, separated by commas |
| doris.filter.query | -- | N | The expression to filter the read data, this expression is transparently passed to Doris. Doris uses this expression to complete source-side data filtering. For example age=18. |

Expand Down Expand Up @@ -387,8 +393,10 @@ ON a.city = c.city
| VARCHAR | STRING |
| STRING | STRING |
| DECIMALV2 | DECIMAL |
| TIME | DOUBLE |
| HLL | Unsupported datatype |
| ARRAY | ARRAY |
| MAP | MAP |
| JSON | STRING |


## Flink write Metrics
Where the metrics value of type Counter is the cumulative value of the imported task from the beginning to the current time, you can observe each metric in each table in the Flink Webui metrics.
Expand Down Expand Up @@ -526,10 +534,10 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source;
| --table-suffix | Same as above, the suffix name of the Doris table. |
| --including-tables | For MySQL tables that need to be synchronized, you can use "" to separate multiple tables and support regular expressions. For example --including-tables table1 |
| --excluding-tables | For tables that do not need to be synchronized, the usage is the same as above. |
| --mysql-conf | MySQL CDCSource configuration, for example --mysql-conf hostname=127.0.0.1, you can find it [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html) View all configurations MySQL-CDC, where hostname/username/password/database-name is required. When the synchronized library table contains a non-primary key table, `scan.incremental.snapshot.chunk.key-column` must be set, and only one field of non-null type can be selected. <br/>For example: `scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`, different database table columns are separated by `,`. |
| --oracle-conf | Oracle CDCSource configuration, for example --oracle-conf hostname=127.0.0.1, you can find [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html) View all configurations Oracle-CDC, where hostname/username/password/database-name/schema-name is required. |
| --postgres-conf | Postgres CDCSource configuration, e.g. --postgres-conf hostname=127.0.0.1, you can find [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html) View all configurations Postgres-CDC where hostname/username/password/database-name/schema-name/slot.name is required. |
| --sqlserver-conf | SQLServer CDCSource configuration, for example --sqlserver-conf hostname=127.0.0.1, you can find it [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/sqlserver-cdc.html) View all configurations SQLServer-CDC, where hostname/username/password/database-name/schema-name is required. |
| --mysql-conf | MySQL CDCSource configuration, for example --mysql-conf hostname=127.0.0.1, you can find it [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/) View all configurations MySQL-CDC, where hostname/username/password/database-name is required. When the synchronized library table contains a non-primary key table, `scan.incremental.snapshot.chunk.key-column` must be set, and only one field of non-null type can be selected. <br/>For example: `scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`, different database table columns are separated by `,`. |
| --oracle-conf | Oracle CDCSource configuration, for example --oracle-conf hostname=127.0.0.1, you can find [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/oracle-cdc/) View all configurations Oracle-CDC, where hostname/username/password/database-name/schema-name is required. |
| --postgres-conf | Postgres CDCSource configuration, e.g. --postgres-conf hostname=127.0.0.1, you can find [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/) View all configurations Postgres-CDC where hostname/username/password/database-name/schema-name/slot.name is required. |
| --sqlserver-conf | SQLServer CDCSource configuration, for example --sqlserver-conf hostname=127.0.0.1, you can find it [here](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/sqlserver-cdc/) View all configurations SQLServer-CDC, where hostname/username/password/database-name/schema-name is required. |
| --sink-conf | All configurations of Doris Sink can be found [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9) View the complete configuration items. |
| --table-conf | The configuration items of the Doris table(The exception is table-buckets, non-properties attributes), that is, the content contained in properties. For example `--table-conf replication_num=1`, and the `--table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"` option specifies the number of buckets for different tables based on the order of regular expressions. If there is no match, the table is created with the default setting of BUCKETS AUTO. |
| --ignore-default-value | Turn off the default value of synchronizing mysql table structure. It is suitable for synchronizing mysql data to doris when the field has a default value but the actual inserted data is null. Reference [here](https://github.com/apache/doris-flink-connector/pull/152) |
Expand Down Expand Up @@ -779,7 +787,7 @@ You can search for the log `abort transaction response` in TaskManager and deter
14. **org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. "xx" encountered at row x, column xx**
This problem is mainly caused by the conditional varchar/string type, which needs to be quoted. The correct way to write it is xxx = ''xxx''. In this way, the Flink SQL parser will interpret two consecutive single quotes as one single quote character instead of The end of the string, and the concatenated string is used as the value of the attribute. For example: `t1 >= '2024-01-01'` can be written as `'doris.filter.query' = 't1 >=''2024-01-01'''`.
This problem is mainly caused by the conditional varchar/string type, which needs to be quoted. The correct way to write it is xxx = ''xxx''. In this way, the Flink SQL parser will interpret two consecutive single quotes as one single quote character instead of The end of the string, and the concatenated string is used as the value of the attribute. For example: `t1 >= '2024-01-01'` can be written as `'doris.filter.query' = 't1 >=''2024-01-01'''`. After Connector 1.6.0, FlinkSQL can implement automatic predicate and projection pushdown.
15. **Failed to connect to backend: http://host:webserver_port, and BE is still alive**
Expand Down
Loading

0 comments on commit 83cfd18

Please sign in to comment.