Skip to content

Commit

Permalink
[cdc] Support type mapping option bigint-unsigned-to-bigint to map My…
Browse files Browse the repository at this point in the history
…SQL BIGINT UNSIGNED related types to BIGINT (#2454)
  • Loading branch information
yuzelin authored Dec 5, 2023
1 parent d4a60c1 commit 995947e
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 12 deletions.
10 changes: 7 additions & 3 deletions docs/content/cdc-ingestion/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be ignored, `RENAME COLUMN` w
3. You can use type mapping option `to-string` (Use `--type_mapping`) to map all MySQL data type to STRING.
4. You can use type mapping option `char-to-string` (Use `--type_mapping`) to map MySQL CHAR(length)/VARCHAR(length) types to STRING.
5. You can use type mapping option `longtext-to-bytes` (Use `--type_mapping`) to map MySQL LONGTEXT types to BYTES.
6. MySQL BIT(1) type will be mapped to Boolean.
7. When using Hive catalog, MySQL TIME type will be mapped to STRING.
8. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary value is passed as bytes in binlog, so it
6. MySQL `BIGINT UNSIGNED`, `BIGINT UNSIGNED ZEROFILL`, `SERIAL` will be mapped to `DECIMAL(20, 0)` by default. You can
use type mapping option `bigint-unsigned-to-bigint` (Use `--type_mapping`) to map these types to Paimon `BIGINT`, but there
is potential data overflow because `BIGINT UNSIGNED` can store up to 20 digits integer value but Paimon `BIGINT` can only
store up to 19 digits integer value. So you should ensure the overflow won't occur when using this option.
7. MySQL BIT(1) type will be mapped to Boolean.
8. When using Hive catalog, MySQL TIME type will be mapped to STRING.
9. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary value is passed as bytes in binlog, so it
should be mapped to byte type (BYTES or VARBINARY). We choose VARBINARY because it can retain the length information.

## Setting Custom Job Name
Expand Down
14 changes: 13 additions & 1 deletion docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,19 @@
</tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type. Currently, only support option "to-string": maps all MySQL types to STRING.</td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Supported options:
<ul>
<li>"tinyint1-not-bool": maps MySQL TINYINT(1) to TINYINT instead of BOOLEAN.</li>
<li>"to-nullable": ignores all NOT NULL constraints (except for primary keys).
This is used to solve the problem that Flink cannot accept the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x' operation.
</li>
<li>"to-string": maps all MySQL types to STRING.</li>
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
</ul>
</td>
</tr>
<tr>
<td><h5>--kafka_conf</h5></td>
Expand Down
14 changes: 13 additions & 1 deletion docs/layouts/shortcodes/generated/kafka_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,19 @@
</tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type. Currently, only support option "to-string": maps all MySQL types to STRING.</td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Supported options:
<ul>
<li>"tinyint1-not-bool": maps MySQL TINYINT(1) to TINYINT instead of BOOLEAN.</li>
<li>"to-nullable": ignores all NOT NULL constraints (except for primary keys).
This is used to solve the problem that Flink cannot accept the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x' operation.
</li>
<li>"to-string": maps all MySQL types to STRING.</li>
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
</ul>
</td>
</tr>
<tr>
<td><h5>--computed_column</h5></td>
Expand Down
1 change: 1 addition & 0 deletions docs/layouts/shortcodes/generated/mysql_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<li>"to-string": maps all MySQL types to STRING.</li>
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
</ul>
</td>
</tr>
Expand Down
1 change: 1 addition & 0 deletions docs/layouts/shortcodes/generated/mysql_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<li>"to-string": maps all MySQL types to STRING.</li>
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
</ul>
</td>
</tr>
Expand Down
14 changes: 13 additions & 1 deletion docs/layouts/shortcodes/generated/pulsar_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,19 @@
</tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type. Currently, only support option "to-string": maps all MySQL types to STRING.</td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Supported options:
<ul>
<li>"tinyint1-not-bool": maps MySQL TINYINT(1) to TINYINT instead of BOOLEAN.</li>
<li>"to-nullable": ignores all NOT NULL constraints (except for primary keys).
This is used to solve the problem that Flink cannot accept the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x' operation.
</li>
<li>"to-string": maps all MySQL types to STRING.</li>
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
</ul>
</td>
</tr>
<tr>
<td><h5>--pulsar_conf</h5></td>
Expand Down
14 changes: 13 additions & 1 deletion docs/layouts/shortcodes/generated/pulsar_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,19 @@
</tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type. Currently, only support option "to-string": maps all MySQL types to STRING.</td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Supported options:
<ul>
<li>"tinyint1-not-bool": maps MySQL TINYINT(1) to TINYINT instead of BOOLEAN.</li>
<li>"to-nullable": ignores all NOT NULL constraints (except for primary keys).
This is used to solve the problem that Flink cannot accept the MySQL 'ALTER TABLE ADD COLUMN column type NOT NULL DEFAULT x' operation.
</li>
<li>"to-string": maps all MySQL types to STRING.</li>
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
<li>"bigint-unsigned-to-bigint": maps MySQL BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL to BIGINT. You should ensure overflow won't occur when using this option.</li>
</ul>
</td>
</tr>
<tr>
<td><h5>--computed_column</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ public static TypeMapping parse(String[] rawOptions) {
* <li>TO_STRING: maps all MySQL types to STRING.
* <li>CHAR_TO_STRING: maps MySQL CHAR(length)/VARCHAR(length) types to STRING.
* <li>LONGTEXT_TO_BYTES: maps MySQL LONGTEXT types to BYTES.
* <li>BIGINT_UNSIGNED_TO_BIGINT: maps MySQL BIGINT UNSIGNED types to Paimon BIGINT. Notice
* that there is potential overflow risk, and users should ensure the overflow won't
* occur.
* </ul>
*/
public enum TypeMappingMode {
TINYINT1_NOT_BOOL,
TO_NULLABLE,
TO_STRING,
CHAR_TO_STRING,
LONGTEXT_TO_BYTES;
LONGTEXT_TO_BYTES,
BIGINT_UNSIGNED_TO_BIGINT;

private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS =
Arrays.stream(TypeMappingMode.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.BIGINT_UNSIGNED_TO_BIGINT;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.CHAR_TO_STRING;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.LONGTEXT_TO_BYTES;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
Expand Down Expand Up @@ -207,7 +208,9 @@ public static DataType toDataType(
case BIGINT_UNSIGNED:
case BIGINT_UNSIGNED_ZEROFILL:
case SERIAL:
return DataTypes.DECIMAL(20, 0);
return typeMapping.containsMode(BIGINT_UNSIGNED_TO_BIGINT)
? DataTypes.BIGINT()
: DataTypes.DECIMAL(20, 0);
case FLOAT:
case FLOAT_UNSIGNED:
case FLOAT_UNSIGNED_ZEROFILL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.IntStream;

import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.BIGINT_UNSIGNED_TO_BIGINT;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.CHAR_TO_STRING;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.LONGTEXT_TO_BYTES;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
Expand Down Expand Up @@ -435,8 +436,7 @@ public void testIgnoreNotNull() throws Exception {
}
}

// --------------------------------------- char-to-string
// ---------------------------------------
// -------------------------------------- char-to-string --------------------------------------

@Test
@Timeout(60)
Expand Down Expand Up @@ -502,6 +502,8 @@ public void testCharToString() throws Exception {
}
}

// ------------------------------------- longtext-to-bytes -------------------------------------

@Test
@Timeout(60)
public void testLongtextToBytes() throws Exception {
Expand Down Expand Up @@ -568,4 +570,77 @@ public void testLongtextToBytes() throws Exception {
Collections.singletonList("pk"));
}
}

// --------------------------------- bigint-unsigned-to-bigint ---------------------------------

@Test
@Timeout(60)
public void testBigintUnsignedToBigint() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "bigint_unsigned_to_bigint_test");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
.withMode(COMBINED.configString())
.withTypeMappingModes(BIGINT_UNSIGNED_TO_BIGINT.configString())
.build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable("t1");
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT().notNull()
},
new String[] {"pk", "v1", "v2", "v3"});
waitForResult(
Collections.singletonList("+I[1, 12345, 56789, 123456789]"),
table,
rowType,
Collections.singletonList("pk"));

try (Statement statement = getStatement()) {
statement.executeUpdate("USE bigint_unsigned_to_bigint_test");

// test schema evolution
statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v4 BIGINT UNSIGNED");
statement.executeUpdate(
"INSERT INTO t1 VALUES (2, 23456, 67890, 234567890, 1234567890)");

rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT().notNull(),
DataTypes.BIGINT(),
},
new String[] {"pk", "v1", "v2", "v3", "v4"});
waitForResult(
Arrays.asList(
"+I[1, 12345, 56789, 123456789, NULL]",
"+I[2, 23456, 67890, 234567890, 1234567890]"),
table,
rowType,
Collections.singletonList("pk"));

// test newly created table
statement.executeUpdate(
"CREATE TABLE _new_table (pk INT, v BIGINT UNSIGNED, PRIMARY KEY (pk))");
statement.executeUpdate("INSERT INTO _new_table VALUES (1, 1234567890)");

waitingTables("_new_table");
waitForResult(
Collections.singletonList("+I[1, 1234567890]"),
getFileStoreTable("_new_table"),
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.BIGINT()},
new String[] {"pk", "v"}),
Collections.singletonList("pk"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,21 @@ CREATE TABLE t1 (
PRIMARY KEY (pk)
);

INSERT INTO t1 VALUES (1, '1');
INSERT INTO t1 VALUES (1, '1');

-- ################################################################################
-- testBigIntUnsignedToBigInt
-- ################################################################################

CREATE DATABASE bigint_unsigned_to_bigint_test;
USE bigint_unsigned_to_bigint_test;

CREATE TABLE t1 (
pk INT,
v1 BIGINT UNSIGNED,
v2 BIGINT UNSIGNED ZEROFILL,
v3 SERIAL,
PRIMARY KEY (pk)
);

INSERT INTO t1 VALUES (1, 12345, 56789, 123456789);

0 comments on commit 995947e

Please sign in to comment.