Skip to content

Commit

Permalink
[improve] Extract Schema change operation (apache#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Nov 23, 2023
1 parent 1b90428 commit 7e7958c
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public List<String> extractColumnValuesBySQL(
}
}

public String buildCreateTableDDL(TableSchema schema) {
public static String buildCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
Expand Down Expand Up @@ -209,7 +209,7 @@ public String buildCreateTableDDL(TableSchema schema) {
return sb.toString();
}

private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
String fieldType = field.getTypeString();
if(isKey && DorisType.STRING.equals(fieldType)){
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
Expand All @@ -222,24 +222,24 @@ private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
.append("',");
}

private String quoteComment(String comment){
private static String quoteComment(String comment){
if(comment == null){
return "";
} else {
return comment.replaceAll("'","\\\\'");
}
}

private List<String> identifier(List<String> name) {
private static List<String> identifier(List<String> name) {
List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
return result;
}

private String identifier(String name) {
private static String identifier(String name) {
return "`" + name + "`";
}

private String quoteProperties(String name) {
private static String quoteProperties(String name) {
return "'" + name + "'";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer;
package org.apache.doris.flink.sink.schema;

import org.apache.doris.flink.catalog.doris.FieldSchema;

Expand Down Expand Up @@ -66,8 +66,7 @@ public static List<String> generateRenameDDLSql(String table, String oldColumnNa
for (Entry<String, FieldSchema> originFieldSchema : originFieldSchemaMap.entrySet()) {
if (originFieldSchema.getKey().equals(oldColumnName)) {
fieldSchema = originFieldSchema.getValue();
String renameSQL = String.format(RENAME_DDL, table, oldColumnName, newColumnName);
ddlList.add(renameSQL);
ddlList.add(buildRenameColumnDDL(table, oldColumnName, newColumnName));
ddlSchemas.add(new DDLSchema(oldColumnName, false));
}
}
Expand All @@ -80,23 +79,11 @@ public static List<String> generateDDLSql(String table) {
ddlSchemas.clear();
List<String> ddlList = Lists.newArrayList();
for (FieldSchema fieldSchema : addFieldSchemas) {
String name = fieldSchema.getName();
String type = fieldSchema.getTypeString();
String defaultValue = fieldSchema.getDefaultValue();
String comment = fieldSchema.getComment();
String addDDL = String.format(ADD_DDL, table, name, type);
if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
addDDL = addDDL + " DEFAULT " + defaultValue;
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
addDDL = addDDL + " COMMENT " + comment;
}
ddlList.add(addDDL);
ddlSchemas.add(new DDLSchema(name, false));
ddlList.add(buildAddColumnDDL(table, fieldSchema));
ddlSchemas.add(new DDLSchema(fieldSchema.getName(), false));
}
for (String columName : dropFieldSchemas) {
String dropDDL = String.format(DROP_DDL, table, columName);
ddlList.add(dropDDL);
ddlList.add(buildDropColumnDDL(table, columName));
ddlSchemas.add(new DDLSchema(columName, true));
}

Expand All @@ -105,6 +92,29 @@ public static List<String> generateDDLSql(String table) {
return ddlList;
}

public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema){
String name = fieldSchema.getName();
String type = fieldSchema.getTypeString();
String defaultValue = fieldSchema.getDefaultValue();
String comment = fieldSchema.getComment();
String addDDL = String.format(ADD_DDL, tableIdentifier, name, type);
if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
addDDL = addDDL + " DEFAULT " + defaultValue;
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
addDDL = addDDL + " COMMENT " + comment;
}
return addDDL;
}

public static String buildDropColumnDDL(String tableIdentifier, String columName){
return String.format(DROP_DDL, tableIdentifier, columName);
}

public static String buildRenameColumnDDL(String tableIdentifier, String oldColumnName, String newColumnName){
return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName);
}

public static List<DDLSchema> getDdlSchemas() {
return ddlSchemas;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.schema;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.HttpGetWithEntity;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class SchemaChangeManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/";
private ObjectMapper objectMapper = new ObjectMapper();
private DorisOptions dorisOptions;

public SchemaChangeManager(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
}

public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException {
String createTableDDL = DorisSystem.buildCreateTableDDL(table);
return execute(createTableDDL);
}

public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException {
String tableIdentifier = getTableIdentifier(database, table);
String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field);
return schemaChange(database, table, buildRequestParam(false, field.getName()), addColumnDDL);
}

public boolean dropColumn(String database, String table, String columnName) throws IOException, IllegalArgumentException {
String tableIdentifier = getTableIdentifier(database, table);
String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName);
return schemaChange(database, table, buildRequestParam(true, columnName), dropColumnDDL);
}

public boolean renameColumn(String database, String table, String oldColumnName, String newColumnName) throws IOException, IllegalArgumentException {
String tableIdentifier = getTableIdentifier(database, table);
String renameColumnDDL = SchemaChangeHelper.buildRenameColumnDDL(tableIdentifier, oldColumnName, newColumnName);
return schemaChange(database, table, buildRequestParam(true, oldColumnName), renameColumnDDL);
}

public boolean schemaChange(String database, String table, Map<String, Object> params, String sql) throws IOException, IllegalArgumentException {
if(checkSchemaChange(database, table, params)){
return execute(sql);
}
return false;
}

public static Map<String, Object> buildRequestParam(boolean dropColumn, String columnName) {
Map<String, Object> params = new HashMap<>();
params.put("isDropColumn", dropColumn);
params.put("columnName", columnName);
return params;
}

/**
* check ddl can do light schema change
*/
public boolean checkSchemaChange(String database, String table, Map<String, Object> params) throws IOException, IllegalArgumentException {
if(CollectionUtil.isNullOrEmpty(params)){
return false;
}
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}", database, table);
}
return success;
}

/**
* execute sql in doris
*/
public boolean execute(String ddl) throws IOException, IllegalArgumentException {
if(StringUtils.isNullOrWhitespaceOnly(ddl)){
return false;
}
Map<String, String> param = new HashMap<>();
param.put("stmt", ddl);
String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG));
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpPost);
return success;
}

private boolean handleResponse(HttpUriRequest request) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
return true;
} else {
LOG.error("schema change response:{}", loadResult);
}
}
} catch (Exception e) {
LOG.error("http request error,", e);
}
return false;
}

private String authHeader() {
return "Basic " + new String(Base64.encodeBase64(
(dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
}

private String getTableIdentifier(String database, String table){
return String.format("%s.%s", database, table);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.util;

import java.util.Map;

import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;

public class DeleteOperation {
public static void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
if (delete) {
valueMap.put(DORIS_DELETE_SIGN, "1");
} else {
valueMap.put(DORIS_DELETE_SIGN, "0");
}
}
}
Loading

0 comments on commit 7e7958c

Please sign in to comment.