diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index fda3c93c8..9c99b4f9b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -124,8 +124,9 @@ public String visit(CharType charType) { @Override public String visit(VarCharType varCharType) { - int length = varCharType.getLength(); - return length * 4 > 65533 ? STRING : String.format("%s(%s)", VARCHAR, length * 4); + //Flink varchar length max value is int, it may overflow after multiplying by 4 + long length = varCharType.getLength(); + return length * 4 >= 65533 ? STRING : String.format("%s(%s)", VARCHAR, length * 4); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 1f0a09fff..7be9a5f9f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; @@ -45,7 +46,8 @@ * Doris System Operate */ @Public -public class DorisSystem { +public class DorisSystem implements Serializable { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(DorisSystem.class); private final JdbcConnectionProvider jdbcConnectionProvider; private static final List builtinDatabases = Collections.singletonList("information_schema"); @@ -81,6 +83,22 @@ public boolean tableExists(String database, String table){ && listTables(database).contains(table); } + public boolean columnExists(String database, String table, String columnName){ + if(tableExists(database, table)){ + List columns = extractColumnValuesBySQL( + "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?", + 1, + null, + database, + table, + columnName); + if(columns != null && !columns.isEmpty()){ + return true; + } + } + return false; + } + public List listTables(String databaseName) { if (!databaseExists(databaseName)) { throw new DorisRuntimeException("database" + databaseName + " is not exists"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java new file mode 100644 index 000000000..52c67b1c0 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisSchemaChangeException.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.exception; + +/** + * Doris Schema Change run exception. + */ +public class DorisSchemaChangeException extends RuntimeException { + public DorisSchemaChangeException() { + super(); + } + + public DorisSchemaChangeException(String message) { + super(message); + } + + public DorisSchemaChangeException(String message, Throwable cause) { + super(message, cause); + } + + public DorisSchemaChangeException(Throwable cause) { + super(cause); + } + + protected DorisSchemaChangeException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index cd022097e..219806e5b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -458,10 +458,10 @@ public static Schema getSchema(DorisOptions options, DorisReadOptions readOption public static boolean isUniqueKeyType(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisRuntimeException { - //Enable 2pc in multi-table scenario + //disable 2pc in multi-table scenario if(StringUtils.isBlank(options.getTableIdentifier())){ logger.info("table model verification is skipped in multi-table scenarios."); - return false; + return true; } try { return UNIQUE_KEYS_TYPE.equals(getSchema(options, readOptions, logger).getKeysType()); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index c7a338458..6f1f2a91e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -35,6 +35,7 @@ public class SchemaChangeHelper { private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s %s"; + private static final String CHECK_COLUMN_EXISTS = "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'"; public static void compareSchema(Map updateFiledSchemaMap, Map originFieldSchemaMap) { @@ -115,6 +116,10 @@ public static String buildRenameColumnDDL(String tableIdentifier, String oldColu return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName); } + public static String buildColumnExistsQuery(String database, String table, String column){ + return String.format(CHECK_COLUMN_EXISTS, database, table, column); + } + public static List getDdlSchemas() { return ddlSchemas; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 66d6d99ae..569d44046 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -17,12 +17,14 @@ package org.apache.doris.flink.sink.schema; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.codec.binary.Base64; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.DorisSchemaChangeException; import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.HttpGetWithEntity; @@ -63,12 +65,20 @@ public boolean createTable(TableSchema table) throws IOException, IllegalArgumen } public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException { + if(checkColumnExists(database, table, field.getName())){ + LOG.warn("The column {} already exists in table {}, no need to add it again", field.getName(), table); + return true; + } String tableIdentifier = getTableIdentifier(database, table); String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field); return schemaChange(database, table, buildRequestParam(false, field.getName()), addColumnDDL); } public boolean dropColumn(String database, String table, String columnName) throws IOException, IllegalArgumentException { + if(!checkColumnExists(database, table, columnName)){ + LOG.warn("The column {} not exists in table {}, no need to drop", columnName, table); + return true; + } String tableIdentifier = getTableIdentifier(database, table); String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL(tableIdentifier, columnName); return schemaChange(database, table, buildRequestParam(true, columnName), dropColumnDDL); @@ -106,11 +116,7 @@ public boolean checkSchemaChange(String database, String table, Map param = new HashMap<>(); param.put("stmt", ddl); String requestUrl = String.format(SCHEMA_CHANGE_API, @@ -129,14 +140,14 @@ public boolean execute(String ddl, String database) throws IOException, IllegalA httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); - boolean success = handleResponse(httpPost); - return success; + return httpPost; } private boolean handleResponse(HttpUriRequest request) { try (CloseableHttpClient httpclient = HttpClients.createDefault()) { CloseableHttpResponse response = httpclient.execute(request); final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); if (statusCode == 200 && response.getEntity() != null) { String loadResult = EntityUtils.toString(response.getEntity()); Map responseMap = objectMapper.readValue(loadResult, Map.class); @@ -144,11 +155,39 @@ private boolean handleResponse(HttpUriRequest request) { if (code.equals("0")) { return true; } else { - LOG.error("schema change response:{}", loadResult); + throw new DorisSchemaChangeException("Failed to schemaChange, response: " + loadResult); + } + } else{ + throw new DorisSchemaChangeException("Failed to schemaChange, status: " + statusCode + ", reason: " + reasonPhrase); + } + } catch (Exception e) { + LOG.error("SchemaChange request error,", e); + throw new DorisSchemaChangeException("SchemaChange request error with " + e.getMessage()); + } + } + + /** + * When processing a column, determine whether it exists and be idempotent. + */ + public boolean checkColumnExists(String database, String table, String columnName) throws IllegalArgumentException, IOException { + String existsQuery = SchemaChangeHelper.buildColumnExistsQuery(database, table, columnName); + HttpPost httpPost = buildHttpPost(existsQuery, database); + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(httpPost); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + JsonNode responseNode = objectMapper.readTree(loadResult); + String code = responseNode.get("code").asText("-1"); + if (code.equals("0")) { + JsonNode data = responseNode.get("data").get("data"); + if(!data.isEmpty()){ + return true; + } } } } catch (Exception e) { - LOG.error("http request error,", e); + LOG.error("check column exist request error {}, default return false", e.getMessage()); } return false; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 77d57c6c7..29d9ceec1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -375,7 +375,7 @@ public boolean schemaChange(JsonNode recordRoot) { return false; } - boolean doSchemaChange = checkSchemaChange(ddl, tuple.f0, tuple.f1); + boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddl); status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0); LOG.info("schema change status:{}", status); } catch (Exception ex) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java new file mode 100644 index 000000000..ec80e9260 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.schema; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.HttpEntityMock; +import org.apache.doris.flink.sink.OptionUtils; +import org.apache.http.ProtocolVersion; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicStatusLine; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +public class SchemaManagerTest { + + static String QUERY_RESPONSE = "{\n" + + " \"data\": {\n" + + " \"type\": \"result_set\",\n" + + " \"meta\": [{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n" + + " \"data\": [\n" + + " [\"age\"]\n" + + " ],\n" + + " \"time\": 15\n" + + " },\n" + + " \"msg\": \"success\",\n" + + " \"code\": 0\n" + + "}"; + + static String QUERY_NO_EXISTS_RESPONSE = "{\n" + + " \"data\": {\n" + + " \"type\": \"result_set\",\n" + + " \"meta\": [{\"name\":\"COLUMN_NAME\",\"type\":\"CHAR\"}],\n" + + " \"data\": [],\n" + + " \"time\": 0\n" + + " },\n" + + " \"msg\": \"success\",\n" + + " \"code\": 0\n" + + "}"; + + HttpEntityMock entityMock; + SchemaChangeManager schemaChangeManager; + static MockedStatic httpClientMockedStatic = mockStatic(HttpClients.class); + + + @Before + public void setUp() throws IOException { + DorisOptions dorisOptions = OptionUtils.buildDorisOptions(); + schemaChangeManager = new SchemaChangeManager(dorisOptions); + CloseableHttpClient httpClient = mock(CloseableHttpClient.class); + entityMock = new HttpEntityMock(); + + CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class); + StatusLine normalLine = new BasicStatusLine(new ProtocolVersion("http", 1, 0), 200, ""); + + when(httpClient.execute(any())).thenReturn(httpResponse); + when(httpResponse.getStatusLine()).thenReturn(normalLine); + when(httpResponse.getEntity()).thenReturn(entityMock); + when(httpClient.execute(any())).thenReturn(httpResponse); + when(httpResponse.getStatusLine()).thenReturn(normalLine); + when(httpResponse.getEntity()).thenReturn(entityMock); + + httpClientMockedStatic.when(()-> HttpClients.createDefault()) + .thenReturn(httpClient); + } + + @Test + public void testColumnExists() throws IOException, IllegalArgumentException { + entityMock.setValue(QUERY_RESPONSE); + boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age"); + System.out.println(columnExists); + } + + @Test + public void testColumnNotExists() throws IOException, IllegalArgumentException { + entityMock.setValue(QUERY_NO_EXISTS_RESPONSE); + boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age1"); + System.out.println(columnExists); + } + +}