diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 7aa314f63..1f0a09fff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -139,7 +139,7 @@ public List extractColumnValuesBySQL( } } - public String buildCreateTableDDL(TableSchema schema) { + public static String buildCreateTableDDL(TableSchema schema) { StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS "); sb.append(identifier(schema.getDatabase())) .append(".") @@ -209,7 +209,7 @@ public String buildCreateTableDDL(TableSchema schema) { return sb.toString(); } - private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){ + private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){ String fieldType = field.getTypeString(); if(isKey && DorisType.STRING.equals(fieldType)){ fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533); @@ -222,7 +222,7 @@ private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){ .append("',"); } - private String quoteComment(String comment){ + private static String quoteComment(String comment){ if(comment == null){ return ""; } else { @@ -230,16 +230,16 @@ private String quoteComment(String comment){ } } - private List identifier(List name) { + private static List identifier(List name) { List result = name.stream().map(m -> identifier(m)).collect(Collectors.toList()); return result; } - private String identifier(String name) { + private static String identifier(String name) { return "`" + name + "`"; } - private String quoteProperties(String name) { + private static String quoteProperties(String name) { return "'" + name + "'"; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java similarity index 75% rename from flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java rename to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 8e6307b5f..c7a338458 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.sink.writer; +package org.apache.doris.flink.sink.schema; import org.apache.doris.flink.catalog.doris.FieldSchema; @@ -66,8 +66,7 @@ public static List generateRenameDDLSql(String table, String oldColumnNa for (Entry originFieldSchema : originFieldSchemaMap.entrySet()) { if (originFieldSchema.getKey().equals(oldColumnName)) { fieldSchema = originFieldSchema.getValue(); - String renameSQL = String.format(RENAME_DDL, table, oldColumnName, newColumnName); - ddlList.add(renameSQL); + ddlList.add(buildRenameColumnDDL(table, oldColumnName, newColumnName)); ddlSchemas.add(new DDLSchema(oldColumnName, false)); } } @@ -80,23 +79,11 @@ public static List generateDDLSql(String table) { ddlSchemas.clear(); List ddlList = Lists.newArrayList(); for (FieldSchema fieldSchema : addFieldSchemas) { - String name = fieldSchema.getName(); - String type = fieldSchema.getTypeString(); - String defaultValue = fieldSchema.getDefaultValue(); - String comment = fieldSchema.getComment(); - String addDDL = String.format(ADD_DDL, table, name, type); - if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) { - addDDL = addDDL + " DEFAULT " + defaultValue; - } - if (!StringUtils.isNullOrWhitespaceOnly(comment)) { - addDDL = addDDL + " COMMENT " + comment; - } - ddlList.add(addDDL); - ddlSchemas.add(new DDLSchema(name, false)); + ddlList.add(buildAddColumnDDL(table, fieldSchema)); + ddlSchemas.add(new DDLSchema(fieldSchema.getName(), false)); } for (String columName : dropFieldSchemas) { - String dropDDL = String.format(DROP_DDL, table, columName); - ddlList.add(dropDDL); + ddlList.add(buildDropColumnDDL(table, columName)); ddlSchemas.add(new DDLSchema(columName, true)); } @@ -105,11 +92,34 @@ public static List generateDDLSql(String table) { return ddlList; } + public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema){ + String name = fieldSchema.getName(); + String type = fieldSchema.getTypeString(); + String defaultValue = fieldSchema.getDefaultValue(); + String comment = fieldSchema.getComment(); + String addDDL = String.format(ADD_DDL, tableIdentifier, name, type); + if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) { + addDDL = addDDL + " DEFAULT " + defaultValue; + } + if (!StringUtils.isNullOrWhitespaceOnly(comment)) { + addDDL = addDDL + " COMMENT " + comment; + } + return addDDL; + } + + public static String buildDropColumnDDL(String tableIdentifier, String columName){ + return String.format(DROP_DDL, tableIdentifier, columName); + } + + public static String buildRenameColumnDDL(String tableIdentifier, String oldColumnName, String newColumnName){ + return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName); + } + public static List getDdlSchemas() { return ddlSchemas; } - static class DDLSchema { + public static class DDLSchema { private final String columnName; private final boolean isDropColumn; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java new file mode 100644 index 000000000..250906058 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.codec.binary.Base64; +import org.apache.doris.flink.catalog.doris.DorisSystem; +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.sink.HttpGetWithEntity; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.StringUtils; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class SchemaChangeManager { + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); + private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s"; + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/"; + private ObjectMapper objectMapper = new ObjectMapper(); + private DorisOptions dorisOptions; + + public SchemaChangeManager(DorisOptions dorisOptions) { + this.dorisOptions = dorisOptions; + } + + public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException { + String createTableDDL = DorisSystem.buildCreateTableDDL(table); + return execute(createTableDDL); + } + + public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException { + String tableIdentifier = getTableIdentifier(database, table); + String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field); + return schemaChange(database, table, buildRequestParam(false, field.getName()), addColumnDDL); + } + + public boolean dropColumn(String database, String table, String columnName) throws IOException, IllegalArgumentException { + String tableIdentifier = getTableIdentifier(database, table); + String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName); + return schemaChange(database, table, buildRequestParam(true, columnName), dropColumnDDL); + } + + public boolean renameColumn(String database, String table, String oldColumnName, String newColumnName) throws IOException, IllegalArgumentException { + String tableIdentifier = getTableIdentifier(database, table); + String renameColumnDDL = SchemaChangeHelper.buildRenameColumnDDL(tableIdentifier, oldColumnName, newColumnName); + return schemaChange(database, table, buildRequestParam(true, oldColumnName), renameColumnDDL); + } + + public boolean schemaChange(String database, String table, Map params, String sql) throws IOException, IllegalArgumentException { + if(checkSchemaChange(database, table, params)){ + return execute(sql); + } + return false; + } + + private Map buildRequestParam(boolean dropColumn, String columnName) { + Map params = new HashMap<>(); + params.put("isDropColumn", dropColumn); + params.put("columnName", columnName); + return params; + } + + /** + * check ddl can do light schema change + */ + public boolean checkSchemaChange(String database, String table, Map params) throws IOException, IllegalArgumentException { + if(CollectionUtil.isNullOrEmpty(params)){ + return false; + } + String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, + RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); + HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params))); + boolean success = handleResponse(httpGet); + if (!success) { + LOG.warn("schema change can not do table {}.{}", database, table); + } + return success; + } + + /** + * execute sql in doris + */ + public boolean execute(String ddl) throws IOException, IllegalArgumentException { + if(StringUtils.isNullOrWhitespaceOnly(ddl)){ + return false; + } + Map param = new HashMap<>(); + param.put("stmt", ddl); + String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG)); + HttpPost httpPost = new HttpPost(requestUrl); + httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); + boolean success = handleResponse(httpPost); + return success; + } + + private boolean handleResponse(HttpUriRequest request) { + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(request); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + Map responseMap = objectMapper.readValue(loadResult, Map.class); + String code = responseMap.getOrDefault("code", "-1").toString(); + if (code.equals("0")) { + return true; + } else { + LOG.error("schema change response:{}", loadResult); + } + } + } catch (Exception e) { + LOG.error("http request error,", e); + } + return false; + } + + private String authHeader() { + return "Basic " + new String(Base64.encodeBase64( + (dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8))); + } + + private String getTableIdentifier(String database, String table){ + return String.format("%s.%s", database, table); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java similarity index 98% rename from flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java rename to flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java index 62906df7a..7e4fb47cf 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.sink.writer; +package org.apache.doris.flink.sink.schema; import org.apache.doris.flink.catalog.doris.FieldSchema; - import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before;