From 25e936889ee75012199431615c7ffb2c272a8708 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 17 Jan 2024 11:23:55 +0800 Subject: [PATCH] Column increase quote --- .../flink/catalog/doris/DorisSystem.java | 11 +++++-- .../flink/sink/schema/SchemaChangeHelper.java | 21 +++++++++++--- .../sink/schema/SchemaChangeManager.java | 2 +- .../flink/sink/schema/SchemaManagerTest.java | 29 +++++++++++++++++-- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 269affa27..45266e5b1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.catalog.doris; import org.apache.flink.annotation.Public; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.commons.compress.utils.Lists; @@ -253,7 +254,7 @@ private static void buildColumn(StringBuilder sql, FieldSchema field, boolean is .append("',"); } - private static String quoteComment(String comment) { + public static String quoteComment(String comment) { if (comment == null) { return ""; } else { @@ -266,10 +267,16 @@ private static List identifier(List name) { return result; } - private static String identifier(String name) { + public static String identifier(String name) { return "`" + name + "`"; } + public static String quoteTableIdentifier(String tableIdentifier) { + String[] dbTable = tableIdentifier.split("\\."); + Preconditions.checkArgument(dbTable.length == 2); + return identifier(dbTable[0]) + "." + identifier(dbTable[1]); + } + private static String quoteProperties(String name) { return "'" + name + "'"; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 5b9901f2e..580b9901a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -21,6 +21,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.compress.utils.Lists; +import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; import java.util.List; @@ -103,23 +104,35 @@ public static String buildAddColumnDDL(String tableIdentifier, FieldSchema field String type = fieldSchema.getTypeString(); String defaultValue = fieldSchema.getDefaultValue(); String comment = fieldSchema.getComment(); - String addDDL = String.format(ADD_DDL, tableIdentifier, name, type); + String addDDL = + String.format( + ADD_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(name), + type); if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) { addDDL = addDDL + " DEFAULT " + defaultValue; } if (!StringUtils.isNullOrWhitespaceOnly(comment)) { - addDDL = addDDL + " COMMENT " + comment; + addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) + "'"; } return addDDL; } public static String buildDropColumnDDL(String tableIdentifier, String columName) { - return String.format(DROP_DDL, tableIdentifier, columName); + return String.format( + DROP_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(columName)); } public static String buildRenameColumnDDL( String tableIdentifier, String oldColumnName, String newColumnName) { - return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName); + return String.format( + RENAME_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(oldColumnName), + DorisSystem.identifier(newColumnName)); } public static String buildColumnExistsQuery(String database, String table, String column) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 8860f6a07..979e35307 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -226,7 +226,7 @@ private String authHeader() { .getBytes(StandardCharsets.UTF_8))); } - private String getTableIdentifier(String database, String table) { + private static String getTableIdentifier(String database, String table) { return String.format("%s.%s", database, table); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java index 0f764894f..977f8da74 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.schema; +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.sink.HttpEntityMock; @@ -27,6 +28,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicStatusLine; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.MockedStatic; @@ -94,13 +96,36 @@ public void setUp() throws IOException { public void testColumnExists() throws IOException, IllegalArgumentException { entityMock.setValue(queryResponse); boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age"); - System.out.println(columnExists); + Assert.assertEquals(true, columnExists); } @Test public void testColumnNotExists() throws IOException, IllegalArgumentException { entityMock.setValue(queryNoExistsResponse); boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age1"); - System.out.println(columnExists); + Assert.assertEquals(false, columnExists); + } + + @Test + public void testAddColumn() { + FieldSchema field = new FieldSchema("col", "int", "comment \"'sdf'"); + String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field); + Assert.assertEquals( + "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 'comment \"\\'sdf\\''", + addColumnDDL); + } + + @Test + public void testDropColumn() { + String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL("test.test_flink", "col"); + Assert.assertEquals("ALTER TABLE `test`.`test_flink` DROP COLUMN `col`", dropColumnDDL); + } + + @Test + public void testRenameColumn() { + String renameColumnDDL = + SchemaChangeHelper.buildRenameColumnDDL("test.test_flink", "col", "col_new"); + Assert.assertEquals( + "ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` `col_new`", renameColumnDDL); } }