From d7323aaff5a67bed8c63624738f06109af1cdd89 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Sat, 14 Sep 2024 16:02:11 +0800 Subject: [PATCH 1/3] [Fix] update response code for try connection (#489) --- .../src/main/java/org/apache/doris/flink/sink/BackendUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 987ec9a1b..26771c9d9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -108,7 +108,8 @@ public static boolean tryHttpConnection(String host) { int responseCode = connection.getResponseCode(); String responseMessage = connection.getResponseMessage(); connection.disconnect(); - if (200 == responseCode) { + if (responseCode < 500) { + // code greater than 500 means a server-side exception. return true; } LOG.warn( From c4ae0512941f98108caaa13fe4cf2dfcece61b6e Mon Sep 17 00:00:00 2001 From: North Lin <37775475+qg-lin@users.noreply.github.com> Date: Fri, 20 Sep 2024 09:57:07 +0800 Subject: [PATCH 2/3] [Improve]Support modify column type without default when column exists default value (#490) --- .../exception/IllegalArgumentException.java | 4 ++ .../flink/sink/schema/SchemaChangeHelper.java | 11 +++ .../sink/schema/SchemaChangeManager.java | 55 +++++++++++++-- .../sink/schema/SchemaManagerITCase.java | 70 ++++++++++++++++--- 4 files changed, 127 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java index 4c0ae0939..7b2428916 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java @@ -25,4 +25,8 @@ public IllegalArgumentException(String msg, Throwable cause) { public IllegalArgumentException(String arg, String value) { super("argument '" + arg + "' is illegal, value is '" + value + "'."); } + + public IllegalArgumentException(String msg) { + super(msg); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 74b574177..d0630b03e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -44,6 +44,7 @@ public class SchemaChangeHelper { private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT EXISTS %s"; private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s"; private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY COLUMN %s COMMENT '%s'"; + private static final String SHOW_FULL_COLUMN_DDL = "SHOW FULL COLUMNS FROM `%s`.`%s`"; public static void compareSchema( Map updateFiledSchemaMap, @@ -166,6 +167,7 @@ public static String buildModifyColumnDataTypeDDL( String columnName = fieldSchema.getName(); String dataType = fieldSchema.getTypeString(); String comment = fieldSchema.getComment(); + String defaultValue = fieldSchema.getDefaultValue(); StringBuilder modifyDDL = new StringBuilder( String.format( @@ -173,6 +175,11 @@ public static String buildModifyColumnDataTypeDDL( DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), DorisSchemaFactory.identifier(columnName), dataType)); + if (StringUtils.isNotBlank(defaultValue)) { + modifyDDL + .append(" DEFAULT ") + .append(DorisSchemaFactory.quoteDefaultValue(defaultValue)); + } commentColumn(modifyDDL, comment); return modifyDDL.toString(); } @@ -183,6 +190,10 @@ private static void commentColumn(StringBuilder ddl, String comment) { } } + public static String buildShowFullColumnDDL(String database, String table) { + return String.format(SHOW_FULL_COLUMN_DDL, database, table); + } + public static List getDdlSchemas() { return ddlSchemas; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index c946bee70..50ec1d34a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.codec.binary.Base64; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; @@ -123,11 +124,30 @@ public boolean modifyColumnDataType(String database, String table, FieldSchema f throws IOException, IllegalArgumentException { if (!checkColumnExists(database, table, field.getName())) { LOG.warn( - "The column {} is not exists in table {}, can not modify it type", + "The column {} is not exists in table {}, can not modify it's type", field.getName(), table); return false; } + + String ddl = SchemaChangeHelper.buildShowFullColumnDDL(database, table); + String defaultValue = getDefaultValue(ddl, database, field.getName()); + if (!StringUtils.isNullOrWhitespaceOnly(field.getDefaultValue())) { + // Can not change default value + if (!field.getDefaultValue().equals(defaultValue)) { + LOG.warn( + "Column:{} can not change default value from {} to {}, fallback it", + field.getName(), + defaultValue, + field.getDefaultValue()); + field.setDefaultValue(defaultValue); + } + } else { + // If user does not give a default value, need fill it from + // original table schema to avoid change type failed if default value exists + field.setDefaultValue(defaultValue); + } + // If user does not give a comment, need fill it from // original table schema to avoid miss comment if (StringUtils.isNullOrWhitespaceOnly(field.getComment())) { @@ -214,15 +234,42 @@ private boolean handleSchemaChange(String responseEntity) throws JsonProcessingE } } + private String getDefaultValue(String ddl, String database, String column) + throws IOException, IllegalArgumentException { + String responseEntity = executeThenReturnResponse(ddl, database); + JsonNode responseNode = objectMapper.readTree(responseEntity); + String code = responseNode.get("code").asText("-1"); + if (code.equals("0")) { + JsonNode data = responseNode.get("data").get("data"); + for (JsonNode node : data) { + if (node.get(0).asText().equals(column)) { + JsonNode defaultValueNode = node.get(5); + return (defaultValueNode instanceof NullNode) + ? null + : defaultValueNode.asText(); + } + } + return null; + } else { + throw new DorisSchemaChangeException( + "Failed to get default value, response: " + responseEntity); + } + } + /** execute sql in doris. */ - public boolean execute(String ddl, String database) + private String executeThenReturnResponse(String ddl, String database) throws IOException, IllegalArgumentException { if (StringUtils.isNullOrWhitespaceOnly(ddl)) { - return false; + throw new IllegalArgumentException("ddl can not be null or empty string!"); } LOG.info("Execute SQL: {}", ddl); HttpPost httpPost = buildHttpPost(ddl, database); - String responseEntity = handleResponse(httpPost); + return handleResponse(httpPost); + } + + public boolean execute(String ddl, String database) + throws IOException, IllegalArgumentException { + String responseEntity = executeThenReturnResponse(ddl, database); return handleSchemaChange(responseEntity); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java index 37ca3a2d2..c4f7f282e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java @@ -17,7 +17,9 @@ package org.apache.doris.flink.sink.schema; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisOptions; @@ -57,7 +59,7 @@ public void setUp() throws Exception { schemaChangeManager = new SchemaChangeManager(options); } - private void initDorisSchemaChangeTable(String table) { + private void initDorisSchemaChangeTable(String table, String defaultValue) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(), LOG, @@ -67,17 +69,22 @@ private void initDorisSchemaChangeTable(String table) { "CREATE TABLE %s.%s ( \n" + "`id` varchar(32),\n" + "`age` int\n" + + (StringUtils.isNotBlank(defaultValue) + ? " DEFAULT " + + DorisSchemaFactory.quoteDefaultValue(defaultValue) + : "") + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" + ")\n", - DATABASE, table)); + DATABASE, + table)); } @Test public void testAddColumn() throws IOException, IllegalArgumentException { String addColumnTbls = "add_column"; - initDorisSchemaChangeTable(addColumnTbls); + initDorisSchemaChangeTable(addColumnTbls, null); FieldSchema field = new FieldSchema("c1", "int", ""); schemaChangeManager.addColumn(DATABASE, addColumnTbls, field); boolean exists = schemaChangeManager.addColumn(DATABASE, addColumnTbls, field); @@ -91,7 +98,7 @@ public void testAddColumn() throws IOException, IllegalArgumentException { public void testAddColumnWithChineseComment() throws IOException, IllegalArgumentException, InterruptedException { String addColumnTbls = "add_column"; - initDorisSchemaChangeTable(addColumnTbls); + initDorisSchemaChangeTable(addColumnTbls, null); // add a column by UTF-8 encoding String addColumnName = "col_with_comment1"; @@ -147,7 +154,7 @@ private String getColumnType(String table, String columnName) { @Test public void testDropColumn() throws IOException, IllegalArgumentException { String dropColumnTbls = "drop_column"; - initDorisSchemaChangeTable(dropColumnTbls); + initDorisSchemaChangeTable(dropColumnTbls, null); schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age"); boolean success = schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age"); Assert.assertTrue(success); @@ -159,7 +166,7 @@ public void testDropColumn() throws IOException, IllegalArgumentException { @Test public void testRenameColumn() throws IOException, IllegalArgumentException { String renameColumnTbls = "rename_column"; - initDorisSchemaChangeTable(renameColumnTbls); + initDorisSchemaChangeTable(renameColumnTbls, null); schemaChangeManager.renameColumn(DATABASE, renameColumnTbls, "age", "age1"); boolean exists = schemaChangeManager.checkColumnExists(DATABASE, renameColumnTbls, "age1"); Assert.assertTrue(exists); @@ -171,7 +178,7 @@ public void testRenameColumn() throws IOException, IllegalArgumentException { @Test public void testModifyColumnComment() throws IOException, IllegalArgumentException { String modifyColumnCommentTbls = "modify_column_comment"; - initDorisSchemaChangeTable(modifyColumnCommentTbls); + initDorisSchemaChangeTable(modifyColumnCommentTbls, null); String columnName = "age"; String newComment = "new comment of age"; schemaChangeManager.modifyColumnComment( @@ -187,7 +194,7 @@ public void testOnlyModifyColumnType() String modifyColumnTbls = "modify_column_type"; String columnName = "age"; String newColumnType = "bigint"; - initDorisSchemaChangeTable(modifyColumnTbls); + initDorisSchemaChangeTable(modifyColumnTbls, null); FieldSchema field = new FieldSchema(columnName, newColumnType, ""); schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); @@ -200,7 +207,7 @@ public void testOnlyModifyColumnType() public void testModifyColumnTypeAndComment() throws IOException, IllegalArgumentException, InterruptedException { String modifyColumnTbls = "modify_column_type_and_comment"; - initDorisSchemaChangeTable(modifyColumnTbls); + initDorisSchemaChangeTable(modifyColumnTbls, null); String columnName = "age"; String newColumnType = "bigint"; String newComment = "new comment of age"; @@ -238,4 +245,49 @@ public void testCreateTableWhenDatabaseNotExists() Thread.sleep(3_000); Assert.assertNotNull(schemaChangeManager.getTableSchema(databaseName, tableName)); } + + @Test + public void testModifyColumnTypeWithoutDefault() + throws IOException, IllegalArgumentException, InterruptedException { + String modifyColumnTbls = "modify_column_type_without_default_value"; + String columnName = "age"; + String newColumnType = "bigint"; + initDorisSchemaChangeTable(modifyColumnTbls, "18"); + FieldSchema field = new FieldSchema(columnName, newColumnType, null, ""); + schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); + + Thread.sleep(3_000); + String columnType = getColumnType(modifyColumnTbls, columnName); + Assert.assertEquals(newColumnType, columnType.toLowerCase()); + } + + @Test + public void testModifyColumnTypeWithDefault() + throws IOException, IllegalArgumentException, InterruptedException { + String modifyColumnTbls = "modify_column_type_with_default_value"; + String columnName = "age"; + String newColumnType = "bigint"; + initDorisSchemaChangeTable(modifyColumnTbls, "18"); + FieldSchema field = new FieldSchema(columnName, newColumnType, "18", ""); + schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); + + Thread.sleep(3_000); + String columnType = getColumnType(modifyColumnTbls, columnName); + Assert.assertEquals(newColumnType, columnType.toLowerCase()); + } + + @Test + public void testModifyColumnTypeWithDefaultAndChange() + throws IOException, IllegalArgumentException, InterruptedException { + String modifyColumnTbls = "modify_column_type_with_default_value_and_change"; + String columnName = "age"; + String newColumnType = "bigint"; + initDorisSchemaChangeTable(modifyColumnTbls, "18"); + FieldSchema field = new FieldSchema(columnName, newColumnType, "19", "new comment"); + schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); + + Thread.sleep(3_000); + String columnType = getColumnType(modifyColumnTbls, columnName); + Assert.assertEquals(newColumnType, columnType.toLowerCase()); + } } From c61342f9651829c72f7828480c831dba52ad8ef3 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 20 Sep 2024 10:16:09 +0800 Subject: [PATCH 3/3] [Improve](case) add customer doris container cluster (#491) --- .../container/AbstractContainerTestBase.java | 8 +- .../container/instance/ContainerService.java | 2 + .../container/instance/DorisContainer.java | 5 + .../instance/DorisCustomerContainer.java | 132 ++++++++++++++++++ .../container/instance/MySQLContainer.java | 5 + .../doris/flink/sink/DorisSinkITCase.java | 51 ++++++- 6 files changed, 196 insertions(+), 7 deletions(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java index 61e0faac8..5c7c151ec 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java @@ -19,6 +19,7 @@ import org.apache.doris.flink.container.instance.ContainerService; import org.apache.doris.flink.container.instance.DorisContainer; +import org.apache.doris.flink.container.instance.DorisCustomerContainer; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,8 @@ private static void initDorisContainer() { LOG.info("The doris container has been started and is running status."); return; } - dorisContainerService = new DorisContainer(); + Boolean customerEnv = Boolean.valueOf(System.getProperty("customer_env", "false")); + dorisContainerService = customerEnv ? new DorisCustomerContainer() : new DorisContainer(); dorisContainerService.startContainer(); LOG.info("Doris container was started."); } @@ -74,9 +76,7 @@ protected String getDorisPassword() { } protected String getDorisQueryUrl() { - return String.format( - "jdbc:mysql://%s:%s", - getDorisInstanceHost(), dorisContainerService.getMappedPort(9030)); + return dorisContainerService.getJdbcUrl(); } protected String getDorisInstanceHost() { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java index 6ad1e3cd0..684de5a0a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java @@ -29,6 +29,8 @@ public interface ContainerService { Connection getQueryConnection(); + String getJdbcUrl(); + String getInstanceHost(); Integer getMappedPort(int originalPort); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index 6af827b8d..ef399d0d4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -115,6 +115,11 @@ public Connection getQueryConnection() { } } + @Override + public String getJdbcUrl() { + return String.format(JDBC_URL, dorisContainer.getHost()); + } + @Override public String getInstanceHost() { return dorisContainer.getHost(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java new file mode 100644 index 000000000..3d4173035 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.container.instance; + +import org.apache.flink.util.Preconditions; + +import org.apache.doris.flink.exception.DorisRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +/** Using a custom Doris environment */ +public class DorisCustomerContainer implements ContainerService { + private static final Logger LOG = LoggerFactory.getLogger(DorisCustomerContainer.class); + private static final String JDBC_URL = "jdbc:mysql://%s:%s"; + + @Override + public void startContainer() { + LOG.info("Using doris customer containers env."); + checkParams(); + if (!isRunning()) { + throw new DorisRuntimeException( + "Backend is not alive. Please check the doris cluster."); + } + } + + private void checkParams() { + Preconditions.checkArgument( + System.getProperty("doris_host") != null, "doris_host is required."); + Preconditions.checkArgument( + System.getProperty("doris_query_port") != null, "doris_query_port is required."); + Preconditions.checkArgument( + System.getProperty("doris_http_port") != null, "doris_http_port is required."); + Preconditions.checkArgument( + System.getProperty("doris_user") != null, "doris_user is required."); + Preconditions.checkArgument( + System.getProperty("doris_passwd") != null, "doris_passwd is required."); + } + + @Override + public boolean isRunning() { + try (Connection conn = getQueryConnection(); + Statement stmt = conn.createStatement()) { + ResultSet showBackends = stmt.executeQuery("show backends"); + while (showBackends.next()) { + String isAlive = showBackends.getString("Alive").trim(); + if (Boolean.toString(true).equalsIgnoreCase(isAlive)) { + return true; + } + } + } catch (SQLException e) { + LOG.error("Failed to connect doris cluster.", e); + return false; + } + return false; + } + + @Override + public Connection getQueryConnection() { + LOG.info("Try to get query connection from doris."); + String jdbcUrl = + String.format( + JDBC_URL, + System.getProperty("doris_host"), + System.getProperty("doris_query_port")); + try { + return DriverManager.getConnection(jdbcUrl, getUsername(), getPassword()); + } catch (SQLException e) { + LOG.info("Failed to get doris query connection. jdbcUrl={}", jdbcUrl, e); + throw new DorisRuntimeException(e); + } + } + + @Override + public String getJdbcUrl() { + return String.format( + JDBC_URL, System.getProperty("doris_host"), System.getProperty("doris_query_port")); + } + + @Override + public String getInstanceHost() { + return System.getProperty("doris_host"); + } + + @Override + public Integer getMappedPort(int originalPort) { + return originalPort; + } + + @Override + public String getUsername() { + return System.getProperty("doris_user"); + } + + @Override + public String getPassword() { + return System.getProperty("doris_passwd"); + } + + @Override + public String getFenodes() { + return System.getProperty("doris_host") + ":" + System.getProperty("doris_http_port"); + } + + @Override + public String getBenodes() { + return null; + } + + @Override + public void close() {} +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java index 21b30e814..4e50ac64a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java @@ -92,6 +92,11 @@ public Connection getQueryConnection() { } } + @Override + public String getJdbcUrl() { + return mysqlcontainer.getJdbcUrl(); + } + @Override public void close() { LOG.info("Stopping MySQL container."); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 877074edf..80986ea3c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -63,6 +64,7 @@ public class DorisSinkITCase extends AbstractITCaseService { static final String TABLE_CSV = "tbl_csv"; static final String TABLE_JSON = "tbl_json"; static final String TABLE_JSON_TBL = "tbl_json_tbl"; + static final String TABLE_TBL_AUTO_REDIRECT = "tbl_tbl_auto_redirect"; static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl"; static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS"; static final String TABLE_GROUP_COMMIT = "tbl_group_commit"; @@ -177,8 +179,6 @@ public void testTableSinkJsonFormat() throws Exception { + DorisConfigOptions.IDENTIFIER + "'," + " 'fenodes' = '%s'," - + " 'benodes' = '%s'," - + " 'auto-redirect' = 'false'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," @@ -196,7 +196,6 @@ public void testTableSinkJsonFormat() throws Exception { + "'" + ")", getFenodes(), - getBenodes(), DATABASE + "." + TABLE_JSON_TBL, getDorisUsername(), getDorisPassword()); @@ -210,6 +209,52 @@ public void testTableSinkJsonFormat() throws Exception { ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); } + @Test + public void testTableSinkAutoRedirectFalse() throws Exception { + if (StringUtils.isNullOrWhitespaceOnly(getBenodes())) { + LOG.info("benodes is empty, skip the test."); + return; + } + initializeTable(TABLE_TBL_AUTO_REDIRECT); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sinkDDL = + String.format( + "CREATE TABLE doris_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'benodes' = '%s'," + + " 'auto-redirect' = 'false'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.label-prefix' = 'doris_sink" + + UUID.randomUUID() + + "'" + + ")", + getFenodes(), + getBenodes(), + DATABASE + "." + TABLE_TBL_AUTO_REDIRECT, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sinkDDL); + tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2"); + + Thread.sleep(10000); + List expected = Arrays.asList("doris,1", "flink,2"); + String query = + String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); + } + @Test public void testTableBatch() throws Exception { initializeTable(TABLE_CSV_BATCH_TBL);