From 7e7958c16927db235b61dc586aa29f2c9bf5f944 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 23 Nov 2023 09:50:08 +0800 Subject: [PATCH] [improve] Extract Schema change operation (#233) --- .../flink/catalog/doris/DorisSystem.java | 12 +- .../SchemaChangeHelper.java | 46 +++-- .../sink/schema/SchemaChangeManager.java | 160 ++++++++++++++++++ .../flink/sink/util/DeleteOperation.java | 32 ++++ .../JsonDebeziumSchemaSerializer.java | 111 ++---------- .../SchemaChangeHelperTest.java | 3 +- 6 files changed, 241 insertions(+), 123 deletions(-) rename flink-doris-connector/src/main/java/org/apache/doris/flink/sink/{writer => schema}/SchemaChangeHelper.java (76%) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java rename flink-doris-connector/src/test/java/org/apache/doris/flink/sink/{writer => schema}/SchemaChangeHelperTest.java (98%) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 7aa314f63..1f0a09fff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -139,7 +139,7 @@ public List extractColumnValuesBySQL( } } - public String buildCreateTableDDL(TableSchema schema) { + public static String buildCreateTableDDL(TableSchema schema) { StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS "); sb.append(identifier(schema.getDatabase())) .append(".") @@ -209,7 +209,7 @@ public String buildCreateTableDDL(TableSchema schema) { return sb.toString(); } - private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){ + private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){ String fieldType = field.getTypeString(); if(isKey && DorisType.STRING.equals(fieldType)){ fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533); @@ -222,7 +222,7 @@ private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){ .append("',"); } - private String quoteComment(String comment){ + private static String quoteComment(String comment){ if(comment == null){ return ""; } else { @@ -230,16 +230,16 @@ private String quoteComment(String comment){ } } - private List identifier(List name) { + private static List identifier(List name) { List result = name.stream().map(m -> identifier(m)).collect(Collectors.toList()); return result; } - private String identifier(String name) { + private static String identifier(String name) { return "`" + name + "`"; } - private String quoteProperties(String name) { + private static String quoteProperties(String name) { return "'" + name + "'"; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java similarity index 76% rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index c1380c7db..c7a338458 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.sink.writer; +package org.apache.doris.flink.sink.schema; import org.apache.doris.flink.catalog.doris.FieldSchema; @@ -66,8 +66,7 @@ public static List generateRenameDDLSql(String table, String oldColumnNa for (Entry originFieldSchema : originFieldSchemaMap.entrySet()) { if (originFieldSchema.getKey().equals(oldColumnName)) { fieldSchema = originFieldSchema.getValue(); - String renameSQL = String.format(RENAME_DDL, table, oldColumnName, newColumnName); - ddlList.add(renameSQL); + ddlList.add(buildRenameColumnDDL(table, oldColumnName, newColumnName)); ddlSchemas.add(new DDLSchema(oldColumnName, false)); } } @@ -80,23 +79,11 @@ public static List generateDDLSql(String table) { ddlSchemas.clear(); List ddlList = Lists.newArrayList(); for (FieldSchema fieldSchema : addFieldSchemas) { - String name = fieldSchema.getName(); - String type = fieldSchema.getTypeString(); - String defaultValue = fieldSchema.getDefaultValue(); - String comment = fieldSchema.getComment(); - String addDDL = String.format(ADD_DDL, table, name, type); - if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) { - addDDL = addDDL + " DEFAULT " + defaultValue; - } - if (!StringUtils.isNullOrWhitespaceOnly(comment)) { - addDDL = addDDL + " COMMENT " + comment; - } - ddlList.add(addDDL); - ddlSchemas.add(new DDLSchema(name, false)); + ddlList.add(buildAddColumnDDL(table, fieldSchema)); + ddlSchemas.add(new DDLSchema(fieldSchema.getName(), false)); } for (String columName : dropFieldSchemas) { - String dropDDL = String.format(DROP_DDL, table, columName); - ddlList.add(dropDDL); + ddlList.add(buildDropColumnDDL(table, columName)); ddlSchemas.add(new DDLSchema(columName, true)); } @@ -105,6 +92,29 @@ public static List generateDDLSql(String table) { return ddlList; } + public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema){ + String name = fieldSchema.getName(); + String type = fieldSchema.getTypeString(); + String defaultValue = fieldSchema.getDefaultValue(); + String comment = fieldSchema.getComment(); + String addDDL = String.format(ADD_DDL, tableIdentifier, name, type); + if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) { + addDDL = addDDL + " DEFAULT " + defaultValue; + } + if (!StringUtils.isNullOrWhitespaceOnly(comment)) { + addDDL = addDDL + " COMMENT " + comment; + } + return addDDL; + } + + public static String buildDropColumnDDL(String tableIdentifier, String columName){ + return String.format(DROP_DDL, tableIdentifier, columName); + } + + public static String buildRenameColumnDDL(String tableIdentifier, String oldColumnName, String newColumnName){ + return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName); + } + public static List getDdlSchemas() { return ddlSchemas; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java new file mode 100644 index 000000000..26e43cf81 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.codec.binary.Base64; +import org.apache.doris.flink.catalog.doris.DorisSystem; +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.sink.HttpGetWithEntity; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.StringUtils; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class SchemaChangeManager { + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); + private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s"; + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/"; + private ObjectMapper objectMapper = new ObjectMapper(); + private DorisOptions dorisOptions; + + public SchemaChangeManager(DorisOptions dorisOptions) { + this.dorisOptions = dorisOptions; + } + + public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException { + String createTableDDL = DorisSystem.buildCreateTableDDL(table); + return execute(createTableDDL); + } + + public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException { + String tableIdentifier = getTableIdentifier(database, table); + String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field); + return schemaChange(database, table, buildRequestParam(false, field.getName()), addColumnDDL); + } + + public boolean dropColumn(String database, String table, String columnName) throws IOException, IllegalArgumentException { + String tableIdentifier = getTableIdentifier(database, table); + String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName); + return schemaChange(database, table, buildRequestParam(true, columnName), dropColumnDDL); + } + + public boolean renameColumn(String database, String table, String oldColumnName, String newColumnName) throws IOException, IllegalArgumentException { + String tableIdentifier = getTableIdentifier(database, table); + String renameColumnDDL = SchemaChangeHelper.buildRenameColumnDDL(tableIdentifier, oldColumnName, newColumnName); + return schemaChange(database, table, buildRequestParam(true, oldColumnName), renameColumnDDL); + } + + public boolean schemaChange(String database, String table, Map params, String sql) throws IOException, IllegalArgumentException { + if(checkSchemaChange(database, table, params)){ + return execute(sql); + } + return false; + } + + public static Map buildRequestParam(boolean dropColumn, String columnName) { + Map params = new HashMap<>(); + params.put("isDropColumn", dropColumn); + params.put("columnName", columnName); + return params; + } + + /** + * check ddl can do light schema change + */ + public boolean checkSchemaChange(String database, String table, Map params) throws IOException, IllegalArgumentException { + if(CollectionUtil.isNullOrEmpty(params)){ + return false; + } + String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, + RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); + HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params))); + boolean success = handleResponse(httpGet); + if (!success) { + LOG.warn("schema change can not do table {}.{}", database, table); + } + return success; + } + + /** + * execute sql in doris + */ + public boolean execute(String ddl) throws IOException, IllegalArgumentException { + if(StringUtils.isNullOrWhitespaceOnly(ddl)){ + return false; + } + Map param = new HashMap<>(); + param.put("stmt", ddl); + String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG)); + HttpPost httpPost = new HttpPost(requestUrl); + httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); + boolean success = handleResponse(httpPost); + return success; + } + + private boolean handleResponse(HttpUriRequest request) { + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(request); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + Map responseMap = objectMapper.readValue(loadResult, Map.class); + String code = responseMap.getOrDefault("code", "-1").toString(); + if (code.equals("0")) { + return true; + } else { + LOG.error("schema change response:{}", loadResult); + } + } + } catch (Exception e) { + LOG.error("http request error,", e); + } + return false; + } + + private String authHeader() { + return "Basic " + new String(Base64.encodeBase64( + (dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8))); + } + + private String getTableIdentifier(String database, String table){ + return String.format("%s.%s", database, table); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java new file mode 100644 index 000000000..85ef3d795 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/util/DeleteOperation.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.util; + +import java.util.Map; + +import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN; + +public class DeleteOperation { + public static void addDeleteSign(Map valueMap, boolean delete) { + if (delete) { + valueMap.put(DORIS_DELETE_SIGN, "1"); + } else { + valueMap.put(DORIS_DELETE_SIGN, "0"); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 2171e842e..ee4cad3c7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -24,34 +24,22 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; - import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.IllegalArgumentException; -import org.apache.doris.flink.rest.RestService; -import org.apache.doris.flink.sink.HttpGetWithEntity; -import org.apache.doris.flink.sink.writer.SchemaChangeHelper; -import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema; +import org.apache.doris.flink.sink.schema.SchemaChangeHelper; +import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.mysql.MysqlType; import org.apache.doris.flink.tools.cdc.oracle.OracleType; import org.apache.doris.flink.tools.cdc.postgres.PostgresType; import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType; - import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.StringUtils; -import org.apache.http.HttpHeaders; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,15 +55,12 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN; +import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer { - private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class); - private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s"; - private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; private static final String OP_READ = "r"; // snapshot read private static final String OP_CREATE = "c"; // insert private static final String OP_UPDATE = "u"; // update @@ -100,6 +85,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer param = buildRequestParam(ddlSchema); - HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); - httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); - httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); - boolean success = handleResponse(httpGet); - if (!success) { - LOG.warn("schema change can not do table {}.{}", database, table); - } - return success; - } - @VisibleForTesting public List extractDDLList(JsonNode record) throws JsonProcessingException { JsonNode historyRecord = extractHistoryRecord(record); @@ -295,7 +268,7 @@ public boolean schemaChange(JsonNode recordRoot) { return false; } boolean doSchemaChange = checkSchemaChange(ddl); - status = doSchemaChange && execSchemaChange(ddl); + status = doSchemaChange && schemaChangeManager.execute(ddl); LOG.info("schema change status:{}", status); } catch (Exception ex) { LOG.warn("schema change error :", ex); @@ -313,36 +286,14 @@ protected boolean checkTable(JsonNode recordRoot) { return sourceTableName.equals(dbTbl); } - private void addDeleteSign(Map valueMap, boolean delete) { - if (delete) { - valueMap.put(DORIS_DELETE_SIGN, "1"); - } else { - valueMap.put(DORIS_DELETE_SIGN, "0"); - } - } - private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException { - String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, - RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); Map param = buildRequestParam(ddl); - if (param.size() != 2) { - return false; - } - HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); - httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); - httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); - boolean success = handleResponse(httpGet); - if (!success) { - LOG.warn("schema change can not do table {}.{}", database, table); - } - return success; + return schemaChangeManager.checkSchemaChange(database, table, param); } - protected Map buildRequestParam(DDLSchema ddlSchema) { - Map params = new HashMap<>(); - params.put("isDropColumn", ddlSchema.isDropColumn()); - params.put("columnName", ddlSchema.getColumnName()); - return params; + private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException { + Map param = SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(), ddlSchema.getColumnName()); + return schemaChangeManager.checkSchemaChange(database, table, param); } /** @@ -364,19 +315,6 @@ protected Map buildRequestParam(String ddl) { return params; } - private boolean execSchemaChange(String ddl) throws IOException, IllegalArgumentException { - Map param = new HashMap<>(); - param.put("stmt", ddl); - String requestUrl = String.format(SCHEMA_CHANGE_API, - RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database); - HttpPost httpPost = new HttpPost(requestUrl); - httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); - httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); - httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); - boolean success = handleResponse(httpPost); - return success; - } - protected String extractDatabase(JsonNode record) { if (record.get("source").has("schema")) { // compatible with schema @@ -390,25 +328,7 @@ protected String extractTable(JsonNode record) { return extractJsonNode(record.get("source"), "table"); } - private boolean handleResponse(HttpUriRequest request) { - try (CloseableHttpClient httpclient = HttpClients.createDefault()) { - CloseableHttpResponse response = httpclient.execute(request); - final int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode == 200 && response.getEntity() != null) { - String loadResult = EntityUtils.toString(response.getEntity()); - Map responseMap = objectMapper.readValue(loadResult, Map.class); - String code = responseMap.getOrDefault("code", "-1").toString(); - if (code.equals("0")) { - return true; - } else { - LOG.error("schema change response:{}", loadResult); - } - } - } catch (Exception e) { - LOG.error("http request error,", e); - } - return false; - } + private String extractJsonNode(JsonNode record, String key) { return record != null && record.get(key) != null && @@ -457,10 +377,7 @@ public String extractDDL(JsonNode record) throws JsonProcessingException { return null; } - private String authHeader() { - return "Basic " + new String(Base64.encodeBase64( - (dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8))); - } + @VisibleForTesting public void fillOriginSchema(JsonNode columns) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java similarity index 98% rename from flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java rename to flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java index 62906df7a..7e4fb47cf 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.sink.writer; +package org.apache.doris.flink.sink.schema; import org.apache.doris.flink.catalog.doris.FieldSchema; - import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before;