Skip to content

Commit

Permalink
[cdc] Support longtext data type mapping to bytes (apache#2281)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Nov 30, 2023
1 parent c8a9333 commit 28583f5
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 5 deletions.
7 changes: 4 additions & 3 deletions docs/content/cdc-ingestion/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be ignored, `RENAME COLUMN` w
2. You can use type mapping option `to-nullable` (Use `--type-mapping`) to ignore all NOT NULL constraints (except primary keys).
3. You can use type mapping option `to-string` (Use `--type-mapping`) to map all MySQL data type to STRING.
4. You can use type mapping option `char-to-string` (Use `--type-mapping`) to map MySQL CHAR(length)/VARCHAR(length) types to STRING.
5. MySQL BIT(1) type will be mapped to Boolean.
6. When using Hive catalog, MySQL TIME type will be mapped to STRING.
7. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary value is passed as bytes in binlog, so it
5. You can use type mapping option `longtext-to-bytes` (Use `--type-mapping`) to map MySQL LONGTEXT types to BYTES.
6. MySQL BIT(1) type will be mapped to Boolean.
7. When using Hive catalog, MySQL TIME type will be mapped to STRING.
8. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary value is passed as bytes in binlog, so it
should be mapped to byte type (BYTES or VARBINARY). We choose VARBINARY because it can retain the length information.

## Setting Custom Job Name
Expand Down
1 change: 1 addition & 0 deletions docs/layouts/shortcodes/generated/mysql_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
</li>
<li>"to-string": maps all MySQL types to STRING.</li>
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
</ul>
</td>
</tr>
Expand Down
1 change: 1 addition & 0 deletions docs/layouts/shortcodes/generated/mysql_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
</li>
<li>"to-string": maps all MySQL types to STRING.</li>
<li>"char-to-string": maps MySQL CHAR(length)/VARCHAR(length) types to STRING.</li>
<li>"longtext-to-bytes": maps MySQL LONGTEXT types to BYTES.</li>
</ul>
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ public static TypeMapping parse(String[] rawOptions) {
* <li>TO_NULLABLE: ignores all NOT NULL constraints (except for primary keys).
* <li>TO_STRING: maps all MySQL types to STRING.
* <li>CHAR_TO_STRING: maps MySQL CHAR(length)/VARCHAR(length) types to STRING.
* <li>LONGTEXT_TO_BYTES: maps MySQL LONGTEXT types to BYTES.
* </ul>
*/
public enum TypeMappingMode {
TINYINT1_NOT_BOOL,
TO_NULLABLE,
TO_STRING,
CHAR_TO_STRING;
CHAR_TO_STRING,
LONGTEXT_TO_BYTES;

private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS =
Arrays.stream(TypeMappingMode.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.CHAR_TO_STRING;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.LONGTEXT_TO_BYTES;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;

Expand Down Expand Up @@ -272,7 +273,6 @@ public static DataType toDataType(
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
case LONGTEXT:
case JSON:
case ENUM:
case GEOMETRY:
Expand All @@ -291,6 +291,10 @@ public static DataType toDataType(
return length == null || length == 0
? DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH)
: DataTypes.VARBINARY(length);
case LONGTEXT:
return typeMapping.containsMode(LONGTEXT_TO_BYTES)
? DataTypes.BYTES()
: DataTypes.STRING();
case TINYBLOB:
case BLOB:
case MEDIUMBLOB:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.CHAR_TO_STRING;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.LONGTEXT_TO_BYTES;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
Expand Down Expand Up @@ -500,4 +501,71 @@ public void testCharToString() throws Exception {
Collections.singletonList("pk"));
}
}

@Test
@Timeout(60)
public void testLongtextToBytes() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "longtext_to_bytes_test");

MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
.withMode(COMBINED.configString())
.withTypeMappingModes(LONGTEXT_TO_BYTES.configString())
.build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable("t1");

try (Statement statement = getStatement()) {
statement.executeUpdate("USE longtext_to_bytes_test");

// test schema evolution
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull()
},
new String[] {"pk", "v1"});
waitForResult(
Collections.singletonList("+I[1, 1]"),
table,
rowType,
Collections.singletonList("pk"));

statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 LONGTEXT");
statement.executeUpdate(
"INSERT INTO t1 VALUES (2, '2', 'This is an example of a long text string, meant to demonstrate the usage of the LONGTEXT data type in SQL databases.')");

rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.VARCHAR(10).notNull(),
DataTypes.BYTES()
},
new String[] {"pk", "v1", "v2"});
waitForResult(
Arrays.asList(
"+I[1, 1, NULL]",
"+I[2, 2, [84, 104, 105, 115, 32, 105, 115, 32, 97, 110, 32, 101, 120, 97, 109, 112, 108, 101, 32, 111, 102, 32, 97, 32, 108, 111, 110, 103, 32, 116, 101, 120, 116, 32, 115, 116, 114, 105, 110, 103, 44, 32, 109, 101, 97, 110, 116, 32, 116, 111, 32, 100, 101, 109, 111, 110, 115, 116, 114, 97, 116, 101, 32, 116, 104, 101, 32, 117, 115, 97, 103, 101, 32, 111, 102, 32, 116, 104, 101, 32, 76, 79, 78, 71, 84, 69, 88, 84, 32, 100, 97, 116, 97, 32, 116, 121, 112, 101, 32, 105, 110, 32, 83, 81, 76, 32, 100, 97, 116, 97, 98, 97, 115, 101, 115, 46]]"),
table,
rowType,
Collections.singletonList("pk"));

// test newly created table
statement.executeUpdate(
"CREATE TABLE _new_table (pk INT, v LONGTEXT, PRIMARY KEY (pk))");
statement.executeUpdate("INSERT INTO _new_table VALUES (1, 'Paimon')");

waitingTables("_new_table");
waitForResult(
Collections.singletonList("+I[1, [80, 97, 105, 109, 111, 110]]"),
getFileStoreTable("_new_table"),
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.BYTES()},
new String[] {"pk", "v"}),
Collections.singletonList("pk"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,18 @@ CREATE TABLE t1 (
);

INSERT INTO t1 VALUES (1, '1');

-- ################################################################################
-- testLongtextToBytes
-- ################################################################################

CREATE DATABASE longtext_to_bytes_test;
USE longtext_to_bytes_test;

CREATE TABLE t1 (
pk INT,
v1 VARCHAR(10) NOT NULL,
PRIMARY KEY (pk)
);

INSERT INTO t1 VALUES (1, '1');

0 comments on commit 28583f5

Please sign in to comment.