The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to set up the OceanBase CDC connector to run SQL queries against OceanBase.
In order to set up the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.5-SNAPSHOT</version>
</dependency>
If you want to use OceanBase JDBC driver to connect to the enterprise edition database, you should also include the following dependency in your class path.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>2.4.2</version>
</dependency>
Download link is available only for stable releases.
Download flink-sql-connector-oceanbase-cdc-2.5-SNAPSHOT.jar and put it under <FLINK_HOME>/lib/
.
Note: flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as flink-sql-connector-oceanbase-cdc-2.2.1.jar, the released version will be available in the Maven central warehouse.
For JDBC driver, the cdc jar above already contains MySQL JDBC driver 5.1.47, which is our recommended version. Due to the license issue, we can not include the OceanBase JDBC driver in the cdc jar. If you need to use it, you can download it from here and put it under <FLINK_HOME>/lib/
, you also need to set the start option jdbc.driver
to com.oceanbase.jdbc.Driver
.
-
Set up the OceanBase cluster following the doc.
-
Create a user with password in
sys
tenant, this user is used in OceanBase LogProxy.mysql -h${host} -P${port} -uroot mysql> SHOW TENANT; mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}'; mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
-
Create a user in the tenant you want to monitor, this is used to read data for snapshot and change event.
-
For users of OceanBase Community Edition, you need to get the
rootserver-list
. You can use the following command to get the value:mysql> show parameters like 'rootservice_list';
For users of OceanBase Enterprise Edition, you need to get the
config-url
. You can use the following command to get the value:mysql> show parameters like 'obconfig_url';
-
Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the quick start.
The OceanBase CDC table can be defined as following:
-- checkpoint every 3000 milliseconds
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- register a OceanBase table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'user@test_tenant#cluster_name',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = '^test_db$',
'table-name' = '^orders$',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'logproxy.host' = '127.0.0.1',
'logproxy.port' = '2983',
'working-mode' = 'memory'
);
-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;
If you want to use OceanBase Oracle mode, you need to add the OceanBase jdbc jar file to Flink and set up the enterprise edition of oblogproxy, then you can create a table in Flink as following:
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'user@test_tenant#cluster_name',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = '^test_db$',
'table-name' = '^orders$',
'hostname' = '127.0.0.1',
'port' = '2881',
'compatible-mode' = 'oracle',
'jdbc.driver' = 'com.oceanbase.jdbc.Driver',
'config-url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx',
'logproxy.host' = '127.0.0.1',
'logproxy.port' = '2983',
'working-mode' = 'memory'
);
You can also try the quickstart tutorial that sync data from OceanBase to Elasticsearch, please refer Flink CDC Tutorial for more information.
The OceanBase CDC Connector contains some options for both sql and stream api as the following sheet.
Note: The connector supports two ways to specify the table list to listen to, and will get the union of the results when both way are used at the same time.
- Use
database-name
andtable-name
to match database and table names in regex. As theobcdc
(formerliboblog
) only supportsfnmatch
now, we can't use regex directly to filter change events, so these two options can only be used ininitial
startup mode. - Use
table-list
to match the exact value of database and table names.
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'oceanbase-cdc' . |
scan.startup.mode | required | (none) | String | Specify the startup mode for OceanBase CDC consumer, valid enumerations are
'initial' ,'latest-offset' or 'timestamp' .
|
scan.startup.timestamp | optional | (none) | Long | Timestamp in seconds of the start point, only used for 'timestamp' startup mode. |
username | required | (none) | String | Username to be used when connecting to OceanBase. |
password | required | (none) | String | Password to be used when connecting to OceanBase. |
tenant-name | required | (none) | String | Tenant name of OceanBase to monitor, should be exact value. |
database-name | optional | (none) | String | Database name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode. |
table-name | optional | (none) | String | Table name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode. |
table-list | optional | (none) | String | List of full names of tables, separated by commas, e.g. "db1.table1, db2.table2". |
hostname | optional | (none) | String | IP address or hostname of the OceanBase database server or OceanBase Proxy server. |
port | optional | (none) | Integer | Integer port number to connect to OceanBase. It can be the SQL port of OceanBase server, which is 2881 by default, or the port of OceanBase proxy service, which is 2883 by default. |
connect.timeout | optional | 30s | Duration | The maximum time that the connector should wait after trying to connect to the OceanBase database server before timing out. |
server-time-zone | optional | +00:00 | String | The session timezone which controls how temporal types are converted to STRING in OceanBase. Can be UTC offset in format "±hh:mm", or named time zones if the time zone information tables in the mysql database have been created and populated. |
logproxy.host | required | (none) | String | Hostname or IP address of OceanBase log proxy service. |
logproxy.port | required | (none) | Integer | Port number of OceanBase log proxy service. |
logproxy.client.id | optional | By rule. | String | Id of a log proxy client connection, will be in format {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant} by default. |
rootserver-list | optional | (none) | String | The semicolon-separated list of OceanBase root servers in format `ip:rpc_port:sql_port`, required for OceanBase CE. |
config-url | optional | (none) | String | The url to get the server info from the config server, required for OceanBase EE. |
working-mode | optional | storage | String | Working mode of `obcdc` in LogProxy, can be `storage` or `memory`. |
compatible-mode | optional | mysql | String | Compatible mode of OceanBase, can be `mysql` or `oracle`. |
jdbc.driver | optional | com.mysql.jdbc.Driver | String | JDBC driver class for snapshot reading. |
jdbc.properties.* | optional | (none) | String | Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'. |
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
Key | DataType | Description |
---|---|---|
tenant_name | STRING NOT NULL | Name of the tenant that contains the row. |
database_name | STRING NOT NULL | Name of the database that contains the row. |
table_name | STRING NOT NULL | Name of the table that contains the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE products (
tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'user@test_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = '^test_db$',
'table-name' = '^orders$',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'logproxy.host' = '127.0.0.1',
'logproxy.port' = '2983',
'working-mode' = 'memory'
);
The OceanBase CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with at-least-once processing.
OceanBase is a kind of distributed database whose log files are distributed on different servers. As there is no position information like MySQL binlog offset, we can only use timestamp as the position mark. In order to ensure the completeness of reading data, liboblog
(a C++ library to read OceanBase log record) might read some log data before the given timestamp. So in this way we may read duplicate data whose timestamp is around the start point, and only 'at-least-once' can be guaranteed.
The config option scan.startup.mode
specifies the startup mode for OceanBase CDC consumer. The valid enumerations are:
initial
: Performs an initial snapshot on the monitored table upon first startup, and continue to read the latest commit log.latest-offset
: Never to perform snapshot on the monitored table upon first startup and just read the latest commit log since the connector is started.timestamp
: Never to perform snapshot on the monitored table upon first startup and just read the commit log from the givenscan.startup.timestamp
.
The OceanBase CDC Connector using oblogclient to consume commit log from OceanBase LogProxy.
The OceanBase CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception {
ResolvedSchema resolvedSchema =
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT().notNull()),
Column.physical("name", DataTypes.STRING().notNull())),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
RowType physicalDataType =
(RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType();
TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(physicalDataType);
String serverTimeZone = "+00:00";
OceanBaseDeserializationSchema<RowData> deserializer =
RowDataOceanBaseDeserializationSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setResultTypeInfo(resultTypeInfo)
.setServerTimeZone(ZoneId.of(serverTimeZone))
.build();
SourceFunction<RowData> oceanBaseSource =
OceanBaseSource.<RowData>builder()
.rsList("127.0.0.1:2882:2881")
.startupMode(StartupMode.INITIAL)
.username("user@test_tenant")
.password("pswd")
.tenantName("test_tenant")
.databaseName("^test_db$")
.tableName("^test_table$")
.hostname("127.0.0.1")
.port(2881)
.compatibleMode("mysql")
.jdbcDriver("com.mysql.jdbc.Driver")
.logProxyHost("127.0.0.1")
.logProxyPort(2983)
.serverTimeZone(serverTimeZone)
.deserializer(deserializer)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.addSource(oceanBaseSource).print().setParallelism(1);
env.execute("Print OceanBase Snapshot + Change Events");
}
}
OceanBase type | Flink SQL type | NOTE |
---|---|---|
BOOLEAN TINYINT(1) BIT(1) |
BOOLEAN | |
TINYINT | TINYINT | |
SMALLINT TINYINT UNSIGNED |
SMALLINT | |
INT MEDIUMINT SMALLINT UNSIGNED |
INT | |
BIGINT INT UNSIGNED |
BIGINT | |
BIGINT UNSIGNED | DECIMAL(20, 0) | |
REAL FLOAT |
FLOAT | |
DOUBLE | DOUBLE | |
NUMERIC(p, s) DECIMAL(p, s) where p <= 38 |
DECIMAL(p, s) | |
NUMERIC(p, s) DECIMAL(p, s) where 38 < p <=65 |
STRING | DECIMAL is equivalent to NUMERIC. The precision for DECIMAL data type is up to 65 in OceanBase, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss. |
DATE | DATE | |
TIME [(p)] | TIME [(p)] | |
DATETIME [(p)] | TIMESTAMP [(p)] | |
TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) | |
BIT(n) | BINARY(⌈n/8⌉) | |
BINARY(n) | BINARY(n) | |
VARBINARY(N) | VARBINARY(N) | |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT |
STRING | |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB |
BYTES | |
YEAR | INT | |
ENUM | STRING | |
SET | ARRAY<STRING> | As the SET data type in OceanBase is a string object that can have zero or more values, it should always be mapped to an array of string |
JSON | STRING | The JSON data type will be converted into STRING with JSON format in Flink. |
OceanBase type | Flink SQL type | NOTE |
---|---|---|
NUMBER(1) | BOOLEAN | |
NUMBER(p, s <= 0), p - s < 3 | TINYINT | |
NUMBER(p, s <= 0), p - s < 5 | SMALLINT | |
NUMBER(p, s <= 0), p - s < 10 | INT | |
NUMBER(p, s <= 0), p - s < 19 | BIGINT | |
NUMBER(p, s <= 0), 19 <=p - s <=38 | DECIMAL(p - s, 0) | |
NUMBER(p, s > 0) | DECIMAL(p, s) | |
NUMBER(p, s <= 0), p - s> 38 | STRING | |
FLOAT BINARY_FLOAT |
FLOAT | |
BINARY_DOUBLE | DOUBLE | |
DATE TIMESTAMP [(p)] |
TIMESTAMP [(p)] | |
CHAR(n) NCHAR(n) VARCHAR(n) VARCHAR2(n) NVARCHAR2(n) CLOB |
STRING | |
RAW BLOB ROWID |
BYTES |