Skip to content

Commit

Permalink
Merge branch 'master' into auto-createtable
Browse files Browse the repository at this point in the history
# Conflicts:
#	flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
  • Loading branch information
wudi committed Nov 30, 2023
2 parents 1797f0c + 323b872 commit d7e36a8
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ public String visit(CharType charType) {

@Override
public String visit(VarCharType varCharType) {
int length = varCharType.getLength();
return length * 4 > 65533 ? STRING : String.format("%s(%s)", VARCHAR, length * 4);
//Flink varchar length max value is int, it may overflow after multiplying by 4
long length = varCharType.getLength();
return length * 4 >= 65533 ? STRING : String.format("%s(%s)", VARCHAR, length * 4);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
Expand All @@ -45,7 +46,8 @@
* Doris System Operate
*/
@Public
public class DorisSystem {
public class DorisSystem implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DorisSystem.class);
private final JdbcConnectionProvider jdbcConnectionProvider;
private static final List<String> builtinDatabases = Collections.singletonList("information_schema");
Expand Down Expand Up @@ -81,6 +83,22 @@ public boolean tableExists(String database, String table){
&& listTables(database).contains(table);
}

public boolean columnExists(String database, String table, String columnName){
if(tableExists(database, table)){
List<String> columns = extractColumnValuesBySQL(
"SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
1,
null,
database,
table,
columnName);
if(columns != null && !columns.isEmpty()){
return true;
}
}
return false;
}

public List<String> listTables(String databaseName) {
if (!databaseExists(databaseName)) {
throw new DorisRuntimeException("database" + databaseName + " is not exists");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.exception;

/**
* Doris Schema Change run exception.
*/
public class DorisSchemaChangeException extends RuntimeException {
public DorisSchemaChangeException() {
super();
}

public DorisSchemaChangeException(String message) {
super(message);
}

public DorisSchemaChangeException(String message, Throwable cause) {
super(message, cause);
}

public DorisSchemaChangeException(Throwable cause) {
super(cause);
}

protected DorisSchemaChangeException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,10 @@ public static Schema getSchema(DorisOptions options, DorisReadOptions readOption

public static boolean isUniqueKeyType(DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisRuntimeException {
//Enable 2pc in multi-table scenario
//disable 2pc in multi-table scenario
if(StringUtils.isBlank(options.getTableIdentifier())){
logger.info("table model verification is skipped in multi-table scenarios.");
return false;
return true;
}
try {
return UNIQUE_KEYS_TYPE.equals(getSchema(options, readOptions, logger).getKeysType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class SchemaChangeHelper {
private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s %s";
private static final String CHECK_COLUMN_EXISTS = "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'";

public static void compareSchema(Map<String, FieldSchema> updateFiledSchemaMap,
Map<String, FieldSchema> originFieldSchemaMap) {
Expand Down Expand Up @@ -115,6 +116,10 @@ public static String buildRenameColumnDDL(String tableIdentifier, String oldColu
return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName);
}

public static String buildColumnExistsQuery(String database, String table, String column){
return String.format(CHECK_COLUMN_EXISTS, database, table, column);
}

public static List<DDLSchema> getDdlSchemas() {
return ddlSchemas;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.doris.flink.sink.schema;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.DorisSchemaChangeException;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.HttpGetWithEntity;
Expand Down Expand Up @@ -63,12 +65,20 @@ public boolean createTable(TableSchema table) throws IOException, IllegalArgumen
}

public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException {
if(checkColumnExists(database, table, field.getName())){
LOG.warn("The column {} already exists in table {}, no need to add it again", field.getName(), table);
return true;
}
String tableIdentifier = getTableIdentifier(database, table);
String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field);
return schemaChange(database, table, buildRequestParam(false, field.getName()), addColumnDDL);
}

public boolean dropColumn(String database, String table, String columnName) throws IOException, IllegalArgumentException {
if(!checkColumnExists(database, table, columnName)){
LOG.warn("The column {} not exists in table {}, no need to drop", columnName, table);
return true;
}
String tableIdentifier = getTableIdentifier(database, table);
String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName);
return schemaChange(database, table, buildRequestParam(true, columnName), dropColumnDDL);
Expand Down Expand Up @@ -106,11 +116,7 @@ public boolean checkSchemaChange(String database, String table, Map<String, Obje
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}", database, table);
}
return success;
return handleResponse(httpGet);
}

/**
Expand All @@ -120,7 +126,12 @@ public boolean execute(String ddl, String database) throws IOException, IllegalA
if(StringUtils.isNullOrWhitespaceOnly(ddl)){
return false;
}
LOG.info("Execute query: {}", ddl);
LOG.info("Execute SQL: {}", ddl);
HttpPost httpPost = buildHttpPost(ddl, database);
return handleResponse(httpPost);
}

public HttpPost buildHttpPost(String ddl, String database) throws IllegalArgumentException, IOException {
Map<String, String> param = new HashMap<>();
param.put("stmt", ddl);
String requestUrl = String.format(SCHEMA_CHANGE_API,
Expand All @@ -129,26 +140,54 @@ public boolean execute(String ddl, String database) throws IOException, IllegalA
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpPost);
return success;
return httpPost;
}

private boolean handleResponse(HttpUriRequest request) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class);
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
return true;
} else {
LOG.error("schema change response:{}", loadResult);
throw new DorisSchemaChangeException("Failed to schemaChange, response: " + loadResult);
}
} else{
throw new DorisSchemaChangeException("Failed to schemaChange, status: " + statusCode + ", reason: " + reasonPhrase);
}
} catch (Exception e) {
LOG.error("SchemaChange request error,", e);
throw new DorisSchemaChangeException("SchemaChange request error with " + e.getMessage());
}
}

/**
* When processing a column, determine whether it exists and be idempotent.
*/
public boolean checkColumnExists(String database, String table, String columnName) throws IllegalArgumentException, IOException {
String existsQuery = SchemaChangeHelper.buildColumnExistsQuery(database, table, columnName);
HttpPost httpPost = buildHttpPost(existsQuery, database);
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(httpPost);
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
JsonNode responseNode = objectMapper.readTree(loadResult);
String code = responseNode.get("code").asText("-1");
if (code.equals("0")) {
JsonNode data = responseNode.get("data").get("data");
if(!data.isEmpty()){
return true;
}
}
}
} catch (Exception e) {
LOG.error("http request error,", e);
LOG.error("check column exist request error {}, default return false", e.getMessage());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public boolean schemaChange(JsonNode recordRoot) {
return false;
}

boolean doSchemaChange = checkSchemaChange(ddl, tuple.f0, tuple.f1);
boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddl);
status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0);
LOG.info("schema change status:{}", status);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.schema;

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicStatusLine;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;

import java.io.IOException;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

public class SchemaManagerTest {

static String QUERY_RESPONSE = "{\n" +
" \"data\": {\n" +
" \"type\": \"result_set\",\n" +
" \"meta\": [{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n" +
" \"data\": [\n" +
" [\"age\"]\n" +
" ],\n" +
" \"time\": 15\n" +
" },\n" +
" \"msg\": \"success\",\n" +
" \"code\": 0\n" +
"}";

static String QUERY_NO_EXISTS_RESPONSE = "{\n" +
" \"data\": {\n" +
" \"type\": \"result_set\",\n" +
" \"meta\": [{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n" +
" \"data\": [],\n" +
" \"time\": 0\n" +
" },\n" +
" \"msg\": \"success\",\n" +
" \"code\": 0\n" +
"}";

HttpEntityMock entityMock;
SchemaChangeManager schemaChangeManager;
static MockedStatic<HttpClients> httpClientMockedStatic = mockStatic(HttpClients.class);


@Before
public void setUp() throws IOException {
DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
schemaChangeManager = new SchemaChangeManager(dorisOptions);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
entityMock = new HttpEntityMock();

CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
StatusLine normalLine = new BasicStatusLine(new ProtocolVersion("http", 1, 0), 200, "");

when(httpClient.execute(any())).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
when(httpClient.execute(any())).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);

httpClientMockedStatic.when(()-> HttpClients.createDefault())
.thenReturn(httpClient);
}

@Test
public void testColumnExists() throws IOException, IllegalArgumentException {
entityMock.setValue(QUERY_RESPONSE);
boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age");
System.out.println(columnExists);
}

@Test
public void testColumnNotExists() throws IOException, IllegalArgumentException {
entityMock.setValue(QUERY_NO_EXISTS_RESPONSE);
boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age1");
System.out.println(columnExists);
}

}

0 comments on commit d7e36a8

Please sign in to comment.