Skip to content

Commit

Permalink
Column increase quote
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jan 17, 2024
1 parent 800755e commit 25e9368
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.catalog.doris;

import org.apache.flink.annotation.Public;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.commons.compress.utils.Lists;
Expand Down Expand Up @@ -253,7 +254,7 @@ private static void buildColumn(StringBuilder sql, FieldSchema field, boolean is
.append("',");
}

private static String quoteComment(String comment) {
public static String quoteComment(String comment) {
if (comment == null) {
return "";
} else {
Expand All @@ -266,10 +267,16 @@ private static List<String> identifier(List<String> name) {
return result;
}

private static String identifier(String name) {
public static String identifier(String name) {
return "`" + name + "`";
}

public static String quoteTableIdentifier(String tableIdentifier) {
String[] dbTable = tableIdentifier.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);
return identifier(dbTable[0]) + "." + identifier(dbTable[1]);
}

private static String quoteProperties(String name) {
return "'" + name + "'";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;

import java.util.List;
Expand Down Expand Up @@ -103,23 +104,35 @@ public static String buildAddColumnDDL(String tableIdentifier, FieldSchema field
String type = fieldSchema.getTypeString();
String defaultValue = fieldSchema.getDefaultValue();
String comment = fieldSchema.getComment();
String addDDL = String.format(ADD_DDL, tableIdentifier, name, type);
String addDDL =
String.format(
ADD_DDL,
DorisSystem.quoteTableIdentifier(tableIdentifier),
DorisSystem.identifier(name),
type);
if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
addDDL = addDDL + " DEFAULT " + defaultValue;
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
addDDL = addDDL + " COMMENT " + comment;
addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) + "'";
}
return addDDL;
}

public static String buildDropColumnDDL(String tableIdentifier, String columName) {
return String.format(DROP_DDL, tableIdentifier, columName);
return String.format(
DROP_DDL,
DorisSystem.quoteTableIdentifier(tableIdentifier),
DorisSystem.identifier(columName));
}

public static String buildRenameColumnDDL(
String tableIdentifier, String oldColumnName, String newColumnName) {
return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName);
return String.format(
RENAME_DDL,
DorisSystem.quoteTableIdentifier(tableIdentifier),
DorisSystem.identifier(oldColumnName),
DorisSystem.identifier(newColumnName));
}

public static String buildColumnExistsQuery(String database, String table, String column) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private String authHeader() {
.getBytes(StandardCharsets.UTF_8)));
}

private String getTableIdentifier(String database, String table) {
private static String getTableIdentifier(String database, String table) {
return String.format("%s.%s", database, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.schema;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.HttpEntityMock;
Expand All @@ -27,6 +28,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicStatusLine;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
Expand Down Expand Up @@ -94,13 +96,36 @@ public void setUp() throws IOException {
public void testColumnExists() throws IOException, IllegalArgumentException {
entityMock.setValue(queryResponse);
boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age");
System.out.println(columnExists);
Assert.assertEquals(true, columnExists);
}

@Test
public void testColumnNotExists() throws IOException, IllegalArgumentException {
entityMock.setValue(queryNoExistsResponse);
boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age1");
System.out.println(columnExists);
Assert.assertEquals(false, columnExists);
}

@Test
public void testAddColumn() {
FieldSchema field = new FieldSchema("col", "int", "comment \"'sdf'");
String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 'comment \"\\'sdf\\''",
addColumnDDL);
}

@Test
public void testDropColumn() {
String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL("test.test_flink", "col");
Assert.assertEquals("ALTER TABLE `test`.`test_flink` DROP COLUMN `col`", dropColumnDDL);
}

@Test
public void testRenameColumn() {
String renameColumnDDL =
SchemaChangeHelper.buildRenameColumnDDL("test.test_flink", "col", "col_new");
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` `col_new`", renameColumnDDL);
}
}

0 comments on commit 25e9368

Please sign in to comment.