Skip to content

Commit

Permalink
[Improve](cdc) Column add quote in schema change (apache#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Jan 17, 2024
1 parent 800755e commit 46de849
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.catalog.doris;

import org.apache.flink.annotation.Public;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.commons.compress.utils.Lists;
Expand Down Expand Up @@ -253,7 +254,7 @@ private static void buildColumn(StringBuilder sql, FieldSchema field, boolean is
.append("',");
}

private static String quoteComment(String comment) {
public static String quoteComment(String comment) {
if (comment == null) {
return "";
} else {
Expand All @@ -266,10 +267,16 @@ private static List<String> identifier(List<String> name) {
return result;
}

private static String identifier(String name) {
public static String identifier(String name) {
return "`" + name + "`";
}

public static String quoteTableIdentifier(String tableIdentifier) {
String[] dbTable = tableIdentifier.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);
return identifier(dbTable[0]) + "." + identifier(dbTable[1]);
}

private static String quoteProperties(String name) {
return "'" + name + "'";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;

import java.util.List;
Expand Down Expand Up @@ -103,23 +104,35 @@ public static String buildAddColumnDDL(String tableIdentifier, FieldSchema field
String type = fieldSchema.getTypeString();
String defaultValue = fieldSchema.getDefaultValue();
String comment = fieldSchema.getComment();
String addDDL = String.format(ADD_DDL, tableIdentifier, name, type);
String addDDL =
String.format(
ADD_DDL,
DorisSystem.quoteTableIdentifier(tableIdentifier),
DorisSystem.identifier(name),
type);
if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
addDDL = addDDL + " DEFAULT " + defaultValue;
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
addDDL = addDDL + " COMMENT " + comment;
addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) + "'";
}
return addDDL;
}

public static String buildDropColumnDDL(String tableIdentifier, String columName) {
return String.format(DROP_DDL, tableIdentifier, columName);
return String.format(
DROP_DDL,
DorisSystem.quoteTableIdentifier(tableIdentifier),
DorisSystem.identifier(columName));
}

public static String buildRenameColumnDDL(
String tableIdentifier, String oldColumnName, String newColumnName) {
return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName);
return String.format(
RENAME_DDL,
DorisSystem.quoteTableIdentifier(tableIdentifier),
DorisSystem.identifier(oldColumnName),
DorisSystem.identifier(newColumnName));
}

public static String buildColumnExistsQuery(String database, String table, String column) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private String authHeader() {
.getBytes(StandardCharsets.UTF_8)));
}

private String getTableIdentifier(String database, String table) {
private static String getTableIdentifier(String database, String table) {
return String.format("%s.%s", database, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ public void testGenerateRenameDDLSql() {
List<String> ddlSqls =
SchemaChangeHelper.generateRenameDDLSql(
table, oldColumnName, newColumnName, originFieldSchemaMap);
Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink RENAME COLUMN c3 c33");
Assert.assertEquals(
ddlSqls.get(0), "ALTER TABLE `test`.`test_sink` RENAME COLUMN `c3` `c33`");
}

@Test
public void testGenerateDDLSql() {
SchemaChangeHelper.compareSchema(updateFieldSchemaMap, originFieldSchemaMap);
List<String> ddlSqls = SchemaChangeHelper.generateDDLSql("test.test_sink");
Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink ADD COLUMN c4 BIGINT");
Assert.assertEquals(
ddlSqls.get(1), "ALTER TABLE test.test_sink ADD COLUMN c5 DATETIMEV2(0)");
ddlSqls.get(0), "ALTER TABLE `test`.`test_sink` ADD COLUMN `c4` BIGINT");
Assert.assertEquals(
ddlSqls.get(1), "ALTER TABLE `test`.`test_sink` ADD COLUMN `c5` DATETIMEV2(0)");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.schema;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.HttpEntityMock;
Expand All @@ -27,6 +28,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicStatusLine;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
Expand Down Expand Up @@ -94,13 +96,36 @@ public void setUp() throws IOException {
public void testColumnExists() throws IOException, IllegalArgumentException {
entityMock.setValue(queryResponse);
boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age");
System.out.println(columnExists);
Assert.assertEquals(true, columnExists);
}

@Test
public void testColumnNotExists() throws IOException, IllegalArgumentException {
entityMock.setValue(queryNoExistsResponse);
boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age1");
System.out.println(columnExists);
Assert.assertEquals(false, columnExists);
}

@Test
public void testAddColumn() {
FieldSchema field = new FieldSchema("col", "int", "comment \"'sdf'");
String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 'comment \"\\'sdf\\''",
addColumnDDL);
}

@Test
public void testDropColumn() {
String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL("test.test_flink", "col");
Assert.assertEquals("ALTER TABLE `test`.`test_flink` DROP COLUMN `col`", dropColumnDDL);
}

@Test
public void testRenameColumn() {
String renameColumnDDL =
SchemaChangeHelper.buildRenameColumnDDL("test.test_flink", "col", "col_new");
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` `col_new`", renameColumnDDL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public void setUp() {

@Test
public void testExtractDDLListMultipleColumns() throws IOException {
String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'";
String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13";
String sql0 = "ALTER TABLE `test`.`t1` ADD COLUMN `id` INT DEFAULT '10000'";
String sql1 = "ALTER TABLE `test`.`t1` ADD COLUMN `c199` INT";
String sql2 = "ALTER TABLE `test`.`t1` ADD COLUMN `c12` INT DEFAULT '100'";
String sql3 = "ALTER TABLE `test`.`t1` DROP COLUMN `c13`";
List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);

Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -211,7 +211,7 @@ public void testExtractDDLListRename() throws IOException {
schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap);

List<String> ddlList = schemaChange.extractDDLList(record);
Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", ddlList.get(0));
Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c3` `c333`", ddlList.get(0));
}

@Test
Expand Down

0 comments on commit 46de849

Please sign in to comment.