diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 066c5e7d9..f7801ddfa 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -73,7 +73,6 @@ under the License. 3.1.1 flink-python 0.16.0 - 13.0.0 3.10.1 3.3.0 3.2.1 @@ -96,6 +95,8 @@ under the License. 8.0.26 19.3.0.0 2.17.1 + 15.0.2 + 0.12.0 @@ -180,13 +181,16 @@ under the License. commons-codec ${commons-codec.version} - + + org.apache.arrow.adbc + adbc-driver-flight-sql + ${adbc.version} + org.apache.arrow arrow-vector ${arrow.version} - org.apache.arrow arrow-memory-netty @@ -207,7 +211,6 @@ under the License. - com.fasterxml.jackson.core @@ -429,13 +432,13 @@ under the License. org.apache.maven.plugins maven-shade-plugin - 3.2.4 + 3.4.1 - - org.apache.arrow - org.apache.doris.shaded.org.apache.arrow - + + + + io.netty org.apache.doris.shaded.io.netty diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java index 3f824a1ff..9693d433e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java @@ -18,9 +18,12 @@ package org.apache.doris.flink.catalog.doris; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.doris.flink.exception.CreateTableException; import org.apache.doris.flink.tools.cdc.DorisTableConfig; import java.util.ArrayList; @@ -30,6 +33,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Factory that creates doris schema. @@ -103,4 +107,132 @@ public static Integer parseTableSchemaBuckets( } return null; } + + public static String generateCreateTableDDL(TableSchema schema) { + StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS "); + sb.append(identifier(schema.getDatabase())) + .append(".") + .append(identifier(schema.getTable())) + .append("("); + + Map fields = schema.getFields(); + List keys = schema.getKeys(); + // append keys + for (String key : keys) { + if (!fields.containsKey(key)) { + throw new CreateTableException("key " + key + " not found in column list"); + } + FieldSchema field = fields.get(key); + buildColumn(sb, field, true); + } + + // append values + for (Map.Entry entry : fields.entrySet()) { + if (keys.contains(entry.getKey())) { + continue; + } + FieldSchema field = entry.getValue(); + buildColumn(sb, field, false); + } + sb = sb.deleteCharAt(sb.length() - 1); + sb.append(" ) "); + // append uniq model + if (DataModel.UNIQUE.equals(schema.getModel())) { + sb.append(schema.getModel().name()) + .append(" KEY(") + .append(String.join(",", identifier(schema.getKeys()))) + .append(")"); + } + + // append table comment + if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) { + sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' "); + } + + // append distribute key + sb.append(" DISTRIBUTED BY HASH(") + .append(String.join(",", identifier(schema.getDistributeKeys()))) + .append(")"); + + Map properties = schema.getProperties(); + if (schema.getTableBuckets() != null) { + + int bucketsNum = schema.getTableBuckets(); + if (bucketsNum <= 0) { + throw new CreateTableException("The number of buckets must be positive."); + } + sb.append(" BUCKETS ").append(bucketsNum); + } else { + sb.append(" BUCKETS AUTO "); + } + // append properties + int index = 0; + for (Map.Entry entry : properties.entrySet()) { + if (index == 0) { + sb.append(" PROPERTIES ("); + } + if (index > 0) { + sb.append(","); + } + sb.append(quoteProperties(entry.getKey())) + .append("=") + .append(quoteProperties(entry.getValue())); + index++; + + if (index == (schema.getProperties().size())) { + sb.append(")"); + } + } + return sb.toString(); + } + + private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) { + String fieldType = field.getTypeString(); + if (isKey && DorisType.STRING.equals(fieldType)) { + fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533); + } + sql.append(identifier(field.getName())).append(" ").append(fieldType); + + if (field.getDefaultValue() != null) { + sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue())); + } + sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',"); + } + + private static String quoteProperties(String name) { + return "'" + name + "'"; + } + + private static List identifier(List names) { + return names.stream().map(DorisSchemaFactory::identifier).collect(Collectors.toList()); + } + + public static String identifier(String name) { + if (name.startsWith("`") && name.endsWith("`")) { + return name; + } + return "`" + name + "`"; + } + + public static String quoteDefaultValue(String defaultValue) { + // DEFAULT current_timestamp not need quote + if (defaultValue.equalsIgnoreCase("current_timestamp")) { + return defaultValue; + } + return "'" + defaultValue + "'"; + } + + public static String quoteComment(String comment) { + if (comment == null) { + return ""; + } else { + return comment.replaceAll("'", "\\\\'"); + } + } + + public static String quoteTableIdentifier(String tableIdentifier) { + String[] dbTable = tableIdentifier.split("\\."); + Preconditions.checkArgument(dbTable.length == 2); + return identifier(dbTable[0]) + "." + identifier(dbTable[1]); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index be6572d31..427eb8b34 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -18,14 +18,12 @@ package org.apache.doris.flink.catalog.doris; import org.apache.flink.annotation.Public; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.commons.compress.utils.Lists; import org.apache.doris.flink.cfg.DorisConnectionOptions; import org.apache.doris.flink.connection.JdbcConnectionProvider; import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider; -import org.apache.doris.flink.exception.CreateTableException; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.exception.DorisSystemException; import org.slf4j.Logger; @@ -41,7 +39,6 @@ import java.util.Map; import java.util.Objects; import java.util.function.Predicate; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -141,81 +138,7 @@ public List extractColumnValuesBySQL( } public static String buildCreateTableDDL(TableSchema schema) { - StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS "); - sb.append(identifier(schema.getDatabase())) - .append(".") - .append(identifier(schema.getTable())) - .append("("); - - Map fields = schema.getFields(); - List keys = schema.getKeys(); - // append keys - for (String key : keys) { - if (!fields.containsKey(key)) { - throw new CreateTableException("key " + key + " not found in column list"); - } - FieldSchema field = fields.get(key); - buildColumn(sb, field, true); - } - - // append values - for (Map.Entry entry : fields.entrySet()) { - if (keys.contains(entry.getKey())) { - continue; - } - FieldSchema field = entry.getValue(); - buildColumn(sb, field, false); - } - sb = sb.deleteCharAt(sb.length() - 1); - sb.append(" ) "); - // append uniq model - if (DataModel.UNIQUE.equals(schema.getModel())) { - sb.append(schema.getModel().name()) - .append(" KEY(") - .append(String.join(",", identifier(schema.getKeys()))) - .append(")"); - } - - // append table comment - if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) { - sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' "); - } - - // append distribute key - sb.append(" DISTRIBUTED BY HASH(") - .append(String.join(",", identifier(schema.getDistributeKeys()))) - .append(")"); - - Map properties = schema.getProperties(); - if (schema.getTableBuckets() != null) { - - int bucketsNum = schema.getTableBuckets(); - if (bucketsNum <= 0) { - throw new CreateTableException("The number of buckets must be positive."); - } - sb.append(" BUCKETS ").append(bucketsNum); - } else { - sb.append(" BUCKETS AUTO "); - } - // append properties - int index = 0; - for (Map.Entry entry : properties.entrySet()) { - if (index == 0) { - sb.append(" PROPERTIES ("); - } - if (index > 0) { - sb.append(","); - } - sb.append(quoteProperties(entry.getKey())) - .append("=") - .append(quoteProperties(entry.getValue())); - index++; - - if (index == (schema.getProperties().size())) { - sb.append(")"); - } - } - return sb.toString(); + return DorisSchemaFactory.generateCreateTableDDL(schema); } public Map getTableFieldNames(String databaseName, String tableName) { @@ -244,53 +167,23 @@ public Map getTableFieldNames(String databaseName, String tableN } } - private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) { - String fieldType = field.getTypeString(); - if (isKey && DorisType.STRING.equals(fieldType)) { - fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533); - } - sql.append(identifier(field.getName())).append(" ").append(fieldType); - - if (field.getDefaultValue() != null) { - sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue())); - } - sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',"); - } - + @Deprecated public static String quoteDefaultValue(String defaultValue) { - // DEFAULT current_timestamp not need quote - if (defaultValue.equalsIgnoreCase("current_timestamp")) { - return defaultValue; - } - return "'" + defaultValue + "'"; + return DorisSchemaFactory.quoteDefaultValue(defaultValue); } + @Deprecated public static String quoteComment(String comment) { - if (comment == null) { - return ""; - } else { - return comment.replaceAll("'", "\\\\'"); - } - } - - private static List identifier(List name) { - return name.stream().map(DorisSystem::identifier).collect(Collectors.toList()); + return DorisSchemaFactory.quoteComment(comment); } + @Deprecated public static String identifier(String name) { - if (name.startsWith("`") && name.endsWith("`")) { - return name; - } - return "`" + name + "`"; + return DorisSchemaFactory.identifier(name); } + @Deprecated public static String quoteTableIdentifier(String tableIdentifier) { - String[] dbTable = tableIdentifier.split("\\."); - Preconditions.checkArgument(dbTable.length == 2); - return identifier(dbTable[0]) + "." + identifier(dbTable[1]); - } - - private static String quoteProperties(String name) { - return "'" + name + "'"; + return DorisSchemaFactory.quoteTableIdentifier(tableIdentifier); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java index 4a3f70b80..c249c2519 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java @@ -51,4 +51,10 @@ public interface ConfigurationOptions { Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size"; Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; + + String USE_FLIGHT_SQL = "source.use-flight-sql"; + Boolean USE_FLIGHT_SQL_DEFAULT = false; + + String FLIGHT_SQL_PORT = "source.flight-sql-port"; + Integer FLIGHT_SQL_PORT_DEFAULT = 9040; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 3669e740a..2f6cd8a86 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -37,6 +37,8 @@ public class DorisReadOptions implements Serializable { private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; private boolean useOldApi; + private boolean useFlightSql; + private Integer flightSqlPort; public DorisReadOptions( String readFields, @@ -50,7 +52,9 @@ public DorisReadOptions( Long execMemLimit, Integer deserializeQueueSize, Boolean deserializeArrowAsync, - boolean useOldApi) { + boolean useOldApi, + boolean useFlightSql, + Integer flightSqlPort) { this.readFields = readFields; this.filterQuery = filterQuery; this.requestTabletSize = requestTabletSize; @@ -63,6 +67,8 @@ public DorisReadOptions( this.deserializeQueueSize = deserializeQueueSize; this.deserializeArrowAsync = deserializeArrowAsync; this.useOldApi = useOldApi; + this.useFlightSql = useFlightSql; + this.flightSqlPort = flightSqlPort; } public String getReadFields() { @@ -121,6 +127,14 @@ public void setFilterQuery(String filterQuery) { this.filterQuery = filterQuery; } + public boolean getUseFlightSql() { + return useFlightSql; + } + + public Integer getFlightSqlPort() { + return flightSqlPort; + } + public static Builder builder() { return new Builder(); } @@ -149,7 +163,9 @@ public boolean equals(Object o) { && Objects.equals(requestBatchSize, that.requestBatchSize) && Objects.equals(execMemLimit, that.execMemLimit) && Objects.equals(deserializeQueueSize, that.deserializeQueueSize) - && Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync); + && Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync) + && Objects.equals(useFlightSql, that.useFlightSql) + && Objects.equals(flightSqlPort, that.flightSqlPort); } @Override @@ -166,7 +182,9 @@ public int hashCode() { execMemLimit, deserializeQueueSize, deserializeArrowAsync, - useOldApi); + useOldApi, + useFlightSql, + flightSqlPort); } /** Builder of {@link DorisReadOptions}. */ @@ -184,6 +202,8 @@ public static class Builder { private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; private Boolean useOldApi = false; + private Boolean useFlightSql = false; + private Integer flightSqlPort; public Builder setReadFields(String readFields) { this.readFields = readFields; @@ -240,11 +260,21 @@ public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) { return this; } - public Builder setUseOldApi(boolean useOldApi) { + public Builder setUseFlightSql(Boolean useFlightSql) { + this.useFlightSql = useFlightSql; + return this; + } + + public Builder setUseOldApi(Boolean useOldApi) { this.useOldApi = useOldApi; return this; } + public Builder setFlightSqlPort(Integer flightSqlPort) { + this.flightSqlPort = flightSqlPort; + return this; + } + public DorisReadOptions build() { return new DorisReadOptions( readFields, @@ -258,7 +288,9 @@ public DorisReadOptions build() { execMemLimit, deserializeQueueSize, deserializeArrowAsync, - useOldApi); + useOldApi, + useFlightSql, + flightSqlPort); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java index 89e9182e8..e6d75969c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java @@ -106,6 +106,18 @@ private void init() { prop.getProperty( ConfigurationOptions.DORIS_TABLET_SIZE, ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT + .toString()))) + .setUseFlightSql( + Boolean.valueOf( + prop.getProperty( + ConfigurationOptions.USE_FLIGHT_SQL, + ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT + .toString()))) + .setFlightSqlPort( + Integer.valueOf( + prop.getProperty( + ConfigurationOptions.FLIGHT_SQL_PORT, + ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT .toString()))); this.options = optionsBuilder.build(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 1dbb1fded..1663d4b39 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -234,6 +234,38 @@ public static String parseResponse(HttpURLConnection connection, Logger logger) } } + @VisibleForTesting + public static String parseFlightSql( + DorisReadOptions readOptions, + DorisOptions options, + PartitionDefinition partition, + Logger logger) + throws IllegalArgumentException { + String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger); + String readFields = + StringUtils.isBlank(readOptions.getReadFields()) + ? "*" + : readOptions.getReadFields(); + String sql = + "select " + + readFields + + " from `" + + tableIdentifiers[0] + + "`.`" + + tableIdentifiers[1] + + "`"; + String tablet = + partition.getTabletIds().stream() + .map(Object::toString) + .collect(Collectors.joining(",")); + sql += " TABLET(" + tablet + ") "; + if (!StringUtils.isEmpty(readOptions.getFilterQuery())) { + sql += " where " + readOptions.getFilterQuery(); + } + logger.info("Query SQL Sending to Doris FE is: '{}'.", sql); + return sql; + } + /** * parse table identifier to array. * diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java index f6594b5bc..9f7d7fc64 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java @@ -22,6 +22,9 @@ import org.apache.doris.sdk.thrift.TScanColumnDesc; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; public class SchemaUtils { @@ -46,4 +49,25 @@ public static Schema convertToSchema(List tscanColumnDescs) { ""))); return schema; } + + public static Schema convertToSchema( + Schema tableSchema, org.apache.arrow.vector.types.pojo.Schema tscanColumnDescs) { + Schema schema = new Schema(tscanColumnDescs.getFields().size()); + Map collect = + tableSchema.getProperties().stream() + .collect(Collectors.toMap(Field::getName, Function.identity())); + tscanColumnDescs + .getFields() + .forEach( + desc -> + schema.put( + new Field( + desc.getName(), + collect.get(desc.getName()).getType(), + "", + 0, + 0, + ""))); + return schema; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index 38c63b779..dee9c1fcb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -44,11 +44,13 @@ import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl; import org.apache.arrow.vector.complex.impl.UnionMapReader; import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.models.Schema; +import org.apache.doris.flink.util.FastDateUtil; import org.apache.doris.flink.util.IPUtils; import org.apache.doris.sdk.thrift.TScanBatchResult; import org.slf4j.Logger; @@ -58,6 +60,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -96,18 +99,19 @@ public void put(Object o) { private int rowCountInOneBatch = 0; private int readRowCount = 0; private final List rowBatch = new ArrayList<>(); - private final ArrowStreamReader arrowStreamReader; + private final ArrowReader arrowStreamReader; private VectorSchemaRoot root; private List fieldVectors; private RootAllocator rootAllocator; private final Schema schema; private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + private static final String DATE_PATTERN = "yyyy-MM-dd"; private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(DATETIME_PATTERN); private final DateTimeFormatter dateTimeV2Formatter = DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); - private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(DATE_PATTERN); private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault(); public List getRowBatch() { @@ -123,6 +127,43 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) { this.offsetInRowBatch = 0; } + public RowBatch(ArrowReader nextResult, Schema schema) { + this.schema = schema; + this.arrowStreamReader = nextResult; + this.offsetInRowBatch = 0; + } + + public RowBatch readFlightArrow() { + try { + this.root = arrowStreamReader.getVectorSchemaRoot(); + fieldVectors = root.getFieldVectors(); + if (fieldVectors.size() > schema.size()) { + logger.error( + "Schema size '{}' is not equal to arrow field size '{}'.", + fieldVectors.size(), + schema.size()); + throw new DorisException( + "Load Doris data failed, schema size of fetch data is wrong."); + } + if (fieldVectors.isEmpty() || root.getRowCount() == 0) { + logger.debug("One batch in arrow has no data."); + return null; + } + rowCountInOneBatch = root.getRowCount(); + for (int i = 0; i < rowCountInOneBatch; ++i) { + rowBatch.add(new RowBatch.Row(fieldVectors.size())); + } + convertArrowToRowBatch(); + readRowCount += root.getRowCount(); + return this; + } catch (DorisException e) { + logger.error("Read Doris Data failed because: ", e); + throw new DorisRuntimeException(e.getMessage()); + } catch (IOException e) { + return this; + } + } + public RowBatch readArrow() { try { this.root = arrowStreamReader.getVectorSchemaRoot(); @@ -297,6 +338,7 @@ public boolean doConvert( case "DECIMAL32": case "DECIMAL64": case "DECIMAL128I": + case "DECIMAL128": if (!minorType.equals(Types.MinorType.DECIMAL)) { return false; } @@ -320,8 +362,8 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(date.get(rowIndex)); - LocalDate localDate = LocalDate.parse(stringValue, dateFormatter); + String stringValue = new String(date.get(rowIndex), StandardCharsets.UTF_8); + LocalDate localDate = FastDateUtil.fastParseDate(stringValue, DATE_PATTERN); addValueToRow(rowIndex, localDate); } else { DateDayVector date = (DateDayVector) fieldVector; @@ -340,8 +382,11 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(varCharVector.get(rowIndex)); - LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeFormatter); + String stringValue = + new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); + stringValue = completeMilliseconds(stringValue); + LocalDateTime parse = + FastDateUtil.fastParseDateTime(stringValue, DATETIME_PATTERN); addValueToRow(rowIndex, parse); } else if (fieldVector instanceof TimeStampVector) { LocalDateTime dateTime = getDateTime(rowIndex, fieldVector); @@ -361,9 +406,11 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(varCharVector.get(rowIndex)); + String stringValue = + new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); stringValue = completeMilliseconds(stringValue); - LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter); + LocalDateTime parse = + FastDateUtil.fastParseDateTimeV2(stringValue, DATETIMEV2_PATTERN); addValueToRow(rowIndex, parse); } else if (fieldVector instanceof TimeStampVector) { LocalDateTime dateTime = getDateTime(rowIndex, fieldVector); @@ -405,7 +452,8 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(largeIntVector.get(rowIndex)); + String stringValue = + new String(largeIntVector.get(rowIndex), StandardCharsets.UTF_8); BigInteger largeInt = new BigInteger(stringValue); addValueToRow(rowIndex, largeInt); break; @@ -423,7 +471,8 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(varCharVector.get(rowIndex)); + String stringValue = + new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); addValueToRow(rowIndex, stringValue); break; case "IPV6": @@ -435,7 +484,8 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String ipv6Str = new String(ipv6VarcharVector.get(rowIndex)); + String ipv6Str = + new String(ipv6VarcharVector.get(rowIndex), StandardCharsets.UTF_8); String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str)); addValueToRow(rowIndex, ipv6Address); break; @@ -526,6 +576,14 @@ public static LocalDateTime longToLocalDateTime(long time) { return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID); } + /** + * use case when to replace while "Benchmark","Mode","Threads","Samples","Score","Score Error. + * (99.9%)","Unit" "CaseWhenTest", "thrpt", 1, 5, 40657433.897696, 2515802.067503,"ops/s" + * "WhileTest", "thrpt", 1, 5, 9708130.819491, 1207453.635429,"ops/s" + * + * @param stringValue + * @return + */ @VisibleForTesting public static String completeMilliseconds(String stringValue) { if (stringValue.length() == DATETIMEV2_PATTERN.length()) { @@ -536,14 +594,26 @@ public static String completeMilliseconds(String stringValue) { return stringValue; } - StringBuilder sb = new StringBuilder(stringValue); if (stringValue.length() == DATETIME_PATTERN.length()) { - sb.append("."); + stringValue += "."; } - while (sb.toString().length() < DATETIMEV2_PATTERN.length()) { - sb.append(0); + int s = DATETIMEV2_PATTERN.length() - stringValue.length(); + switch (s) { + case 1: + return stringValue + "0"; + case 2: + return stringValue + "00"; + case 3: + return stringValue + "000"; + case 4: + return stringValue + "0000"; + case 5: + return stringValue + "00000"; + case 6: + return stringValue + "000000"; + default: + return stringValue; } - return sb.toString(); } public List next() { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopySQLBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopySQLBuilder.java index 33843a064..8edfba3f1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopySQLBuilder.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopySQLBuilder.java @@ -17,7 +17,7 @@ package org.apache.doris.flink.sink.copy; -import org.apache.doris.flink.catalog.doris.DorisSystem; +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.cfg.DorisExecutionOptions; import java.util.Arrays; @@ -53,7 +53,7 @@ public CopySQLBuilder( public String buildCopySQL() { StringBuilder sb = new StringBuilder(); sb.append("COPY INTO ") - .append(DorisSystem.quoteTableIdentifier(tableIdentifier)) + .append(DorisSchemaFactory.quoteTableIdentifier(tableIdentifier)) .append(" FROM @~('{") .append(String.join(",", fileList)) .append("}') ") diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java index 9289c8860..67a2ddace 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java @@ -32,6 +32,7 @@ import net.sf.jsqlparser.statement.create.table.Index; import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; +import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils; @@ -42,9 +43,14 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.Set; /** Use {@link CCJSqlParserUtil} to parse SQL statements. */ public class SQLParserSchemaManager implements Serializable { @@ -54,6 +60,16 @@ public class SQLParserSchemaManager implements Serializable { private static final String PRIMARY = "PRIMARY"; private static final String PRIMARY_KEY = "PRIMARY KEY"; private static final String UNIQUE = "UNIQUE"; + private static final String DORIS_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP"; + private static final Set sourceConnectorTimeValues = + new HashSet<>( + Arrays.asList( + "SYSDATE", + "SYSTIMESTAMP", + "CURRENT_TIMESTAMP", + "NOW()", + "CURRENT TIMESTAMP", + "GETDATE()")); /** * Doris' schema change only supports ADD, DROP, and RENAME operations. This method is only used @@ -126,7 +142,8 @@ public TableSchema parseCreateTableStatement( ColDataType colDataType = column.getColDataType(); String dataType = parseDataType(colDataType, sourceConnector); List columnSpecs = column.getColumnSpecs(); - String defaultValue = extractDefaultValue(columnSpecs); + String defaultValue = + extractDefaultValue(dataType, columnSpecs); String comment = extractComment(columnSpecs); FieldSchema fieldSchema = new FieldSchema( @@ -232,7 +249,7 @@ private List processAddColumnOperation( String datatype = parseDataType(colDataType, sourceConnector); List columnSpecs = columnDataType.getColumnSpecs(); - String defaultValue = extractDefaultValue(columnSpecs); + String defaultValue = extractDefaultValue(datatype, columnSpecs); String comment = extractComment(columnSpecs); FieldSchema fieldSchema = new FieldSchema(columnName, datatype, defaultValue, comment); String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema); @@ -267,11 +284,25 @@ private String processRenameColumnOperation( } @VisibleForTesting - public String extractDefaultValue(List columnSpecs) { + public String extractDefaultValue(String dateType, List columnSpecs) { if (CollectionUtils.isEmpty(columnSpecs)) { return null; } - return extractAdjacentString(columnSpecs, DEFAULT); + String adjacentDefaultValue = extractAdjacentString(columnSpecs, DEFAULT); + return parseDorisDefaultValue(dateType, adjacentDefaultValue); + } + + private String parseDorisDefaultValue(String dateType, String defaultValue) { + if (Objects.isNull(defaultValue)) { + return null; + } + // In doris, DATETIME supports specifying the current time by default through + // CURRENT_TIMESTAMP. + if ((dateType.startsWith(DorisType.DATETIME) || dateType.startsWith(DorisType.DATETIME_V2)) + && sourceConnectorTimeValues.contains(defaultValue.toUpperCase(Locale.ROOT))) { + return DORIS_CURRENT_TIMESTAMP; + } + return defaultValue; } private String extractAdjacentString(List columnSpecs, String key) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 065468779..74b574177 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -20,7 +20,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang3.StringUtils; -import org.apache.doris.flink.catalog.doris.DorisSystem; +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.catalog.doris.FieldSchema; import java.util.List; @@ -114,11 +114,11 @@ public static String buildAddColumnDDL(String tableIdentifier, FieldSchema field new StringBuilder( String.format( ADD_DDL, - DorisSystem.quoteTableIdentifier(tableIdentifier), - DorisSystem.identifier(name), + DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), + DorisSchemaFactory.identifier(name), type)); if (defaultValue != null) { - addDDL.append(" DEFAULT ").append(DorisSystem.quoteDefaultValue(defaultValue)); + addDDL.append(" DEFAULT ").append(DorisSchemaFactory.quoteDefaultValue(defaultValue)); } commentColumn(addDDL, comment); return addDDL.toString(); @@ -127,17 +127,17 @@ public static String buildAddColumnDDL(String tableIdentifier, FieldSchema field public static String buildDropColumnDDL(String tableIdentifier, String columName) { return String.format( DROP_DDL, - DorisSystem.quoteTableIdentifier(tableIdentifier), - DorisSystem.identifier(columName)); + DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), + DorisSchemaFactory.identifier(columName)); } public static String buildRenameColumnDDL( String tableIdentifier, String oldColumnName, String newColumnName) { return String.format( RENAME_DDL, - DorisSystem.quoteTableIdentifier(tableIdentifier), - DorisSystem.identifier(oldColumnName), - DorisSystem.identifier(newColumnName)); + DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), + DorisSchemaFactory.identifier(oldColumnName), + DorisSchemaFactory.identifier(newColumnName)); } public static String buildColumnExistsQuery(String database, String table, String column) { @@ -156,9 +156,9 @@ public static String buildModifyColumnCommentDDL( String tableIdentifier, String columnName, String newComment) { return String.format( MODIFY_COMMENT_DDL, - DorisSystem.quoteTableIdentifier(tableIdentifier), - DorisSystem.identifier(columnName), - DorisSystem.quoteComment(newComment)); + DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), + DorisSchemaFactory.identifier(columnName), + DorisSchemaFactory.quoteComment(newComment)); } public static String buildModifyColumnDataTypeDDL( @@ -170,8 +170,8 @@ public static String buildModifyColumnDataTypeDDL( new StringBuilder( String.format( MODIFY_TYPE_DDL, - DorisSystem.quoteTableIdentifier(tableIdentifier), - DorisSystem.identifier(columnName), + DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), + DorisSchemaFactory.identifier(columnName), dataType)); commentColumn(modifyDDL, comment); return modifyDDL.toString(); @@ -179,7 +179,7 @@ public static String buildModifyColumnDataTypeDDL( private static void commentColumn(StringBuilder ddl, String comment) { if (StringUtils.isNotEmpty(comment)) { - ddl.append(" COMMENT '").append(DorisSystem.quoteComment(comment)).append("'"); + ddl.append(" COMMENT '").append(DorisSchemaFactory.quoteComment(comment)).append("'"); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 060bccb5f..44ff573e7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.io.Serializable; +import java.net.NoRouteToHostException; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -287,6 +288,10 @@ public RespContent stopLoad() throws IOException { Preconditions.checkState(pendingLoadFuture != null); try { return handlePreCommitResponse(pendingLoadFuture.get()); + } catch (NoRouteToHostException nex) { + LOG.error("Failed to connect, cause ", nex); + throw new DorisRuntimeException( + "No Route to Host to " + hostPort + ", exception: " + nex); } catch (Exception e) { throw new DorisRuntimeException(e); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 83dec58b6..9c89fce39 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -70,6 +70,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer private Map tableMapping; // create table properties @@ -111,6 +112,7 @@ public JsonDebeziumSchemaSerializer( .getStreamLoadProp() .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore(); + this.enableDelete = executionOptions.getDeletable(); } } @@ -149,7 +151,8 @@ private void init() { lineDelimiter, ignoreUpdateBefore, targetTablePrefix, - targetTableSuffix); + targetTableSuffix, + enableDelete); initSchemaChangeInstance(changeContext); this.dataChange = new JsonDebeziumDataChange(changeContext); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java index d1326c722..2f7764f31 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java @@ -41,6 +41,7 @@ public class JsonDebeziumChangeContext implements Serializable { private final Pattern pattern; private final String lineDelimiter; private final boolean ignoreUpdateBefore; + private final boolean enableDelete; private final String targetTablePrefix; private final String targetTableSuffix; @@ -55,7 +56,8 @@ public JsonDebeziumChangeContext( String lineDelimiter, boolean ignoreUpdateBefore, String targetTablePrefix, - String targetTableSuffix) { + String targetTableSuffix, + boolean enableDelete) { this.dorisOptions = dorisOptions; this.tableMapping = tableMapping; this.sourceTableName = sourceTableName; @@ -65,6 +67,7 @@ public JsonDebeziumChangeContext( this.pattern = pattern; this.lineDelimiter = lineDelimiter; this.ignoreUpdateBefore = ignoreUpdateBefore; + this.enableDelete = enableDelete; this.targetTablePrefix = targetTablePrefix; this.targetTableSuffix = targetTableSuffix; } @@ -116,6 +119,10 @@ public String getTargetTableSuffix() { return targetTableSuffix; } + public boolean enableDelete() { + return enableDelete; + } + public DorisTableConfig getDorisTableConf() { return dorisTableConfig; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java index 5075adf8c..298cfb95d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java @@ -50,6 +50,7 @@ public class JsonDebeziumDataChange extends CdcDataChange { private final ObjectMapper objectMapper; private final DorisOptions dorisOptions; private final boolean ignoreUpdateBefore; + private final boolean enableDelete; private final String lineDelimiter; private final JsonDebeziumChangeContext changeContext; @@ -59,6 +60,7 @@ public JsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) { this.objectMapper = changeContext.getObjectMapper(); this.ignoreUpdateBefore = changeContext.isIgnoreUpdateBefore(); this.lineDelimiter = changeContext.getLineDelimiter(); + this.enableDelete = changeContext.enableDelete(); } public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException { @@ -87,7 +89,7 @@ public DorisRecord serialize(String record, JsonNode recordRoot, String op) thro return DorisRecord.of(dorisTableIdentifier, extractUpdate(recordRoot)); case OP_DELETE: valueMap = extractBeforeRow(recordRoot); - addDeleteSign(valueMap, true); + addDeleteSign(valueMap, enableDelete); break; default: LOG.error("parse record fail, unknown op {} in {}", op, record); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index 94605105b..6600dd07e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -296,7 +296,8 @@ public void fillOriginSchema(String tableName, JsonNode columns) { } } - private void buildFieldSchema(Map filedSchemaMap, JsonNode column) { + @VisibleForTesting + public void buildFieldSchema(Map filedSchemaMap, JsonNode column) { String fieldName = column.get("name").asText(); String dorisTypeName = buildDorisTypeName(column); String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression")); @@ -315,7 +316,7 @@ public String buildDorisTypeName(JsonNode column) { } private String handleDefaultValue(String defaultValue) { - if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) { + if (defaultValue == null) { return null; } if (defaultValue.equals("1970-01-01 00:00:00")) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java new file mode 100644 index 000000000..a2b9b6327 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.source.reader; + +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.exception.ShouldNeverHappenException; +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.rest.SchemaUtils; +import org.apache.doris.flink.rest.models.Schema; +import org.apache.doris.flink.serialization.RowBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; + +public class DorisFlightValueReader extends ValueReader implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(DorisFlightValueReader.class); + protected AdbcConnection client; + protected Lock clientLock = new ReentrantLock(); + + private final PartitionDefinition partition; + private final DorisOptions options; + private final DorisReadOptions readOptions; + private AdbcStatement statement; + protected RowBatch rowBatch; + protected Schema schema; + AdbcStatement.QueryResult queryResult; + protected ArrowReader arrowReader; + protected AtomicBoolean eos = new AtomicBoolean(false); + + public DorisFlightValueReader( + PartitionDefinition partition, + DorisOptions options, + DorisReadOptions readOptions, + Schema schema) { + this.partition = partition; + this.options = options; + this.readOptions = readOptions; + this.client = openConnection(); + this.schema = schema; + init(); + } + + private void init() { + clientLock.lock(); + try { + this.statement = this.client.createStatement(); + this.statement.setSqlQuery( + RestService.parseFlightSql(readOptions, options, partition, LOG)); + this.queryResult = statement.executeQuery(); + this.arrowReader = queryResult.getReader(); + } catch (AdbcException | DorisException e) { + throw new RuntimeException(e); + } finally { + clientLock.unlock(); + } + LOG.debug("Open scan result is, schema: {}.", schema); + } + + private AdbcConnection openConnection() { + final Map parameters = new HashMap<>(); + RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FlightSqlDriver driver = new FlightSqlDriver(allocator); + String[] split = null; + try { + split = RestService.randomEndpoint(options.getFenodes(), LOG).split(":"); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Get FENode Error", e); + } + AdbcDriver.PARAM_URI.set( + parameters, + Location.forGrpcInsecure(String.valueOf(split[0]), readOptions.getFlightSqlPort()) + .getUri() + .toString()); + AdbcDriver.PARAM_USERNAME.set(parameters, options.getUsername()); + AdbcDriver.PARAM_PASSWORD.set(parameters, options.getPassword()); + try { + AdbcDatabase adbcDatabase = driver.open(parameters); + return adbcDatabase.connect(); + } catch (AdbcException e) { + LOG.debug("Open Flight Connection error: {}", e.getDetails()); + throw new RuntimeException(e); + } + } + + /** + * read data and cached in rowBatch. + * + * @return true if hax next value + */ + public boolean hasNext() { + boolean hasNext = false; + clientLock.lock(); + try { + // Arrow data was acquired synchronously during the iterative process + if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) { + if (!eos.get()) { + eos.set(!arrowReader.loadNextBatch()); + rowBatch = + new RowBatch( + arrowReader, + SchemaUtils.convertToSchema( + this.schema, + arrowReader.getVectorSchemaRoot().getSchema())) + .readFlightArrow(); + } + } + hasNext = !eos.get(); + return hasNext; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + clientLock.unlock(); + } + } + + /** + * get next value. + * + * @return next value + */ + public List next() { + if (!hasNext()) { + LOG.error(SHOULD_NOT_HAPPEN_MESSAGE); + throw new ShouldNeverHappenException(); + } + return rowBatch.next(); + } + + @Override + public void close() throws Exception { + clientLock.lock(); + try { + if (rowBatch != null) { + rowBatch.close(); + } + if (statement != null) { + statement.close(); + } + if (client != null) { + client.close(); + } + } finally { + clientLock.unlock(); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index 01777d43b..c9ed6f9ce 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -23,6 +23,7 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.source.split.DorisSourceSplit; import org.apache.doris.flink.source.split.DorisSplitRecords; import org.slf4j.Logger; @@ -41,7 +42,7 @@ public class DorisSourceSplitReader implements SplitReader splits; private final DorisOptions options; private final DorisReadOptions readOptions; - private DorisValueReader valueReader; + private ValueReader valueReader; private String currentSplitId; public DorisSourceSplitReader(DorisOptions options, DorisReadOptions readOptions) { @@ -52,7 +53,11 @@ public DorisSourceSplitReader(DorisOptions options, DorisReadOptions readOptions @Override public RecordsWithSplitIds fetch() throws IOException { - checkSplitOrStartNext(); + try { + checkSplitOrStartNext(); + } catch (DorisException e) { + throw new RuntimeException(e); + } if (!valueReader.hasNext()) { return finishSplit(); @@ -60,7 +65,7 @@ public RecordsWithSplitIds fetch() throws IOException { return DorisSplitRecords.forRecords(currentSplitId, valueReader); } - private void checkSplitOrStartNext() throws IOException { + private void checkSplitOrStartNext() throws IOException, DorisException { if (valueReader != null) { return; } @@ -70,7 +75,8 @@ private void checkSplitOrStartNext() throws IOException { } currentSplitId = nextSplit.splitId(); valueReader = - new DorisValueReader(nextSplit.getPartitionDefinition(), options, readOptions); + ValueReader.createReader( + nextSplit.getPartitionDefinition(), options, readOptions, LOG); } private DorisSplitRecords finishSplit() { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java index 098a7707d..35639e8a9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -52,7 +52,7 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; -public class DorisValueReader implements AutoCloseable { +public class DorisValueReader extends ValueReader implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DorisValueReader.class); protected BackendClient client; protected Lock clientLock = new ReentrantLock(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java new file mode 100644 index 000000000..9e4539349 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.source.reader; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.rest.RestService; +import org.slf4j.Logger; + +import java.util.List; + +public abstract class ValueReader { + + public static ValueReader createReader( + PartitionDefinition partition, + DorisOptions options, + DorisReadOptions readOptions, + Logger logger) + throws DorisException { + logger.info("create reader for partition: {}", partition); + if (readOptions.getUseFlightSql()) { + return new DorisFlightValueReader( + partition, + options, + readOptions, + RestService.getSchema(options, readOptions, logger)); + } else { + return new DorisValueReader(partition, options, readOptions); + } + } + + public abstract boolean hasNext(); + + public abstract List next(); + + public abstract void close() throws Exception; +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java index cef967624..24d10569e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java @@ -20,6 +20,7 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.doris.flink.source.reader.DorisValueReader; +import org.apache.doris.flink.source.reader.ValueReader; import javax.annotation.Nullable; @@ -34,18 +35,17 @@ public class DorisSplitRecords implements RecordsWithSplitIds { private final Set finishedSplits; - private final DorisValueReader valueReader; + private final ValueReader valueReader; private String splitId; - public DorisSplitRecords( - String splitId, DorisValueReader valueReader, Set finishedSplits) { + public DorisSplitRecords(String splitId, ValueReader valueReader, Set finishedSplits) { this.splitId = splitId; this.valueReader = valueReader; this.finishedSplits = finishedSplits; } public static DorisSplitRecords forRecords( - final String splitId, final DorisValueReader valueReader) { + final String splitId, final ValueReader valueReader) { return new DorisSplitRecords(splitId, valueReader, Collections.emptySet()); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 4b0b56c4e..02e59084c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -307,7 +307,16 @@ public class DorisConfigOptions { .booleanType() .defaultValue(false) .withDescription("Whether to use buffer cache for breakpoint resume"); - + public static final ConfigOption USE_FLIGHT_SQL = + ConfigOptions.key("source.use-flight-sql") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription("use flight sql flag"); + public static final ConfigOption FLIGHT_SQL_PORT = + ConfigOptions.key("source.flight-sql-port") + .intType() + .defaultValue(9040) + .withDescription("flight sql port"); // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index e8ac4dbf6..2559e1f04 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -52,6 +52,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE; import static org.apache.doris.flink.table.DorisConfigOptions.FENODES; +import static org.apache.doris.flink.table.DorisConfigOptions.FLIGHT_SQL_PORT; import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER; import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL; import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS; @@ -83,6 +84,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME; +import static org.apache.doris.flink.table.DorisConfigOptions.USE_FLIGHT_SQL; /** * The {@link DorisDynamicTableFactory} translates the catalog table to a table source. @@ -157,6 +159,9 @@ public Set> optionalOptions() { options.add(SOURCE_USE_OLD_API); options.add(SINK_WRITE_MODE); options.add(SINK_IGNORE_COMMIT_ERROR); + + options.add(USE_FLIGHT_SQL); + options.add(FLIGHT_SQL_PORT); return options; } @@ -216,7 +221,9 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { (int) readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS).toMillis()) .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)) - .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API)); + .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API)) + .setUseFlightSql(readableConfig.get(USE_FLIGHT_SQL)) + .setFlightSqlPort(readableConfig.get(FLIGHT_SQL_PORT)); return builder.build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index e565d0a72..2f672adb2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -59,6 +59,8 @@ import java.util.Set; import java.util.regex.Pattern; +import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; + public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); private static final String TABLE_NAME_OPTIONS = "table-name"; @@ -483,6 +485,25 @@ private void handleTableCreationFailure(Exception ex) throws DorisSystemExceptio } } + protected Properties getJdbcProperties() { + Properties jdbcProps = new Properties(); + for (Map.Entry entry : config.toMap().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(PROPERTIES_PREFIX)) { + jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), value); + } + } + return jdbcProps; + } + + protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); + jdbcProperties.forEach( + (key, value) -> jdbcUrlBuilder.append("&").append(key).append("=").append(value)); + return jdbcUrlBuilder.toString(); + } + public DatabaseSync setEnv(StreamExecutionEnvironment env) { this.env = env; return this; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java index 2dcd21a9b..3947c1e16 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java @@ -89,9 +89,11 @@ public void registerDriver() throws SQLException { @Override public Connection getConnection() throws SQLException { + Properties jdbcProperties = getJdbcProperties(); + String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties); String jdbcUrl = String.format( - JDBC_URL, + jdbcUrlTemplate, config.get(JdbcSourceOptions.HOSTNAME), config.get(PORT), config.get(JdbcSourceOptions.DATABASE_NAME)); @@ -224,4 +226,21 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { public String getTableListPrefix() { return config.get(JdbcSourceOptions.SCHEMA_NAME); } + + @Override + protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); + boolean firstParam = true; + for (Map.Entry entry : jdbcProperties.entrySet()) { + Object key = entry.getKey(); + Object value = entry.getValue(); + if (firstParam) { + jdbcUrlBuilder.append(":").append(key).append("=").append(value).append(";"); + firstParam = false; + } else { + jdbcUrlBuilder.append(key).append("=").append(value).append(";"); + } + } + return jdbcUrlBuilder.toString(); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java index 5aaf8cea5..c36777f36 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java @@ -17,9 +17,12 @@ package org.apache.doris.flink.tools.cdc.db2; +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.LinkedHashMap; public class Db2Schema extends JdbcSourceSchema { public Db2Schema( @@ -41,4 +44,11 @@ public String convertToDorisType(String fieldType, Integer precision, Integer sc public String getCdcTableName() { return schemaName + "\\." + tableName; } + + @Override + public LinkedHashMap getColumnInfo( + DatabaseMetaData metaData, String databaseName, String schemaName, String tableName) + throws SQLException { + return super.getColumnInfo(metaData, null, schemaName, tableName); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java index 08ec45091..984419bcc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java @@ -22,18 +22,31 @@ import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.FieldSchema; -import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; public class MongoDBSchema extends SourceSchema { private static final Logger LOG = LoggerFactory.getLogger(MongoDBSchema.class); + private static final List CONVERT_TYPE = + Arrays.asList(DorisType.BIGINT, DorisType.INT, DorisType.SMALLINT, DorisType.TINYINT); + + public enum DecimalJudgement { + NOT_DECIMAL, + CERTAIN_DECIMAL, + CONVERT_TO_DECIMAL; + + public static boolean needProcessing(DecimalJudgement decimalJudgement) { + return !decimalJudgement.equals(NOT_DECIMAL); + } + } public MongoDBSchema( ArrayList sampleData, @@ -51,21 +64,50 @@ public MongoDBSchema( primaryKeys.add("_id"); } - private void processSampleData(Document sampleData) { + @VisibleForTesting + protected void processSampleData(Document sampleData) { for (Map.Entry entry : sampleData.entrySet()) { String fieldName = entry.getKey(); Object value = entry.getValue(); - String dorisType = MongoDBType.toDorisType(value); - if (isDecimalField(fieldName)) { - dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType); - } + String dorisType = determineDorisType(fieldName, value); fields.put(fieldName, new FieldSchema(fieldName, dorisType, null)); } } - private boolean isDecimalField(String fieldName) { + private String determineDorisType(String fieldName, Object value) { + String dorisType = MongoDBType.toDorisType(value); + // Check if the type is string or if the existing field is a string type + FieldSchema existingField = fields.get(fieldName); + if (dorisType.equals(DorisType.STRING) + || (existingField != null + && existingField.getTypeString().equals(DorisType.STRING))) { + return DorisType.STRING; + } + // Check and process for decimal types + DecimalJudgement decimalJudgement = judgeDecimalField(fieldName, dorisType); + if (DecimalJudgement.needProcessing(decimalJudgement)) { + if (decimalJudgement == DecimalJudgement.CONVERT_TO_DECIMAL) { + int precision = value.toString().length(); + dorisType = MongoDBType.formatDecimalType(precision, 0); + } + dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType); + } + return dorisType; + } + + private DecimalJudgement judgeDecimalField(String fieldName, String dorisType) { FieldSchema existingField = fields.get(fieldName); - return existingField != null && existingField.getTypeString().startsWith(DorisType.DECIMAL); + if (existingField == null) { + return DecimalJudgement.NOT_DECIMAL; + } + boolean existDecimal = existingField.getTypeString().startsWith(DorisType.DECIMAL); + boolean isDecimal = dorisType.startsWith(DorisType.DECIMAL); + if (existDecimal && isDecimal) { + return DecimalJudgement.CERTAIN_DECIMAL; + } else if (CONVERT_TYPE.contains(dorisType)) { + return DecimalJudgement.CONVERT_TO_DECIMAL; + } + return DecimalJudgement.NOT_DECIMAL; } @VisibleForTesting @@ -76,28 +118,17 @@ protected String replaceDecimalTypeIfNeeded(String fieldName, String newDorisTyp MongoDBType.getDecimalPrecisionAndScale(existingField.getTypeString()); int existingPrecision = existingPrecisionAndScale.f0; int existingScale = existingPrecisionAndScale.f1; + Tuple2 currentPrecisionAndScale = + MongoDBType.getDecimalPrecisionAndScale(newDorisType); + int currentPrecision = currentPrecisionAndScale.f0; + int currentScale = currentPrecisionAndScale.f1; - try { - Tuple2 currentPrecisionAndScale = - MongoDBType.getDecimalPrecisionAndScale(newDorisType); - int currentPrecision = currentPrecisionAndScale.f0; - int currentScale = currentPrecisionAndScale.f1; - - int newScale = Math.max(existingScale, currentScale); - int newIntegerPartSize = - Math.max( - existingPrecision - existingScale, currentPrecision - currentScale); - int newPrecision = newIntegerPartSize + newScale; - - return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; - } catch (DorisRuntimeException e) { - LOG.warn( - "Replace decimal type of field:{} failed, the newly type is:{}, rollback to existing type:{}", - fieldName, - newDorisType, - existingField.getTypeString()); - return existingField.getTypeString(); - } + int newScale = Math.max(existingScale, currentScale); + int newIntegerPartSize = + Math.max(existingPrecision - existingScale, currentPrecision - currentScale); + int newPrecision = newIntegerPartSize + newScale; + + return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; } return newDorisType; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java index bee85ced3..578a407cd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java @@ -118,9 +118,11 @@ public static String checkAndRebuildBigDecimal(BigDecimal decimal) { decimal = new BigDecimal(decimal.toPlainString()); } return decimal.precision() <= 38 - ? String.format( - "%s(%s,%s)", - DorisType.DECIMAL_V3, decimal.precision(), Math.max(decimal.scale(), 0)) + ? formatDecimalType(decimal.precision(), Math.max(decimal.scale(), 0)) : DorisType.STRING; } + + public static String formatDecimalType(int precision, int scale) { + return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java index b7faec2fa..d4a87ff87 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java @@ -50,6 +50,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize private final String sourceTableName; private String lineDelimiter = LINE_DELIMITER_DEFAULT; private boolean ignoreUpdateBefore = true; + private boolean enableDelete = true; // private Map tableMapping; // create table properties @@ -90,6 +91,7 @@ public MongoDBJsonDebeziumSchemaSerializer( .getStreamLoadProp() .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore(); + this.enableDelete = executionOptions.getDeletable(); } init(); } @@ -107,7 +109,8 @@ private void init() { lineDelimiter, ignoreUpdateBefore, targetTablePrefix, - targetTableSuffix); + targetTableSuffix, + enableDelete); this.dataChange = new MongoJsonDebeziumDataChange(changeContext); this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java index 8048e38a8..9dbe7ffe1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java @@ -61,6 +61,7 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change public JsonDebeziumChangeContext changeContext; public ObjectMapper objectMapper; public Map tableMapping; + private final boolean enableDelete; public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) { this.changeContext = changeContext; @@ -68,6 +69,7 @@ public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) { this.objectMapper = changeContext.getObjectMapper(); this.lineDelimiter = changeContext.getLineDelimiter(); this.tableMapping = changeContext.getTableMapping(); + this.enableDelete = changeContext.enableDelete(); } @Override @@ -93,7 +95,7 @@ public DorisRecord serialize(String record, JsonNode recordRoot, String op) thro break; case OP_DELETE: valueMap = extractDeleteRow(recordRoot); - addDeleteSign(valueMap, true); + addDeleteSign(valueMap, enableDelete); break; default: LOG.error("parse record fail, unknown op {} in {}", op, record); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index a58008cdd..9fdfdcaed 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -55,10 +55,11 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; + public class MysqlDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class); private static final String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; - private static final String PROPERTIES_PREFIX = "jdbc.properties."; public MysqlDatabaseSync() throws SQLException { super(); @@ -83,12 +84,10 @@ public void registerDriver() throws SQLException { @Override public Connection getConnection() throws SQLException { Properties jdbcProperties = getJdbcProperties(); - StringBuilder jdbcUrlSb = new StringBuilder(JDBC_URL); - jdbcProperties.forEach( - (key, value) -> jdbcUrlSb.append("&").append(key).append("=").append(value)); + String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties); String jdbcUrl = String.format( - jdbcUrlSb.toString(), + jdbcUrlTemplate, config.get(MySqlSourceOptions.HOSTNAME), config.get(MySqlSourceOptions.PORT)); @@ -269,16 +268,4 @@ private Map getChunkColumnMap() { } return chunkMap; } - - private Properties getJdbcProperties() { - Properties jdbcProps = new Properties(); - for (Map.Entry entry : config.toMap().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (key.startsWith(PROPERTIES_PREFIX)) { - jdbcProps.put(key.substring(PROPERTIES_PREFIX.length()), value); - } - } - return jdbcProps; - } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index 66e26d152..15fc632b4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -66,7 +66,7 @@ public class PostgresDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(PostgresDatabaseSync.class); - private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s"; + private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s?"; public PostgresDatabaseSync() throws SQLException { super(); @@ -84,9 +84,11 @@ public void registerDriver() throws SQLException { @Override public Connection getConnection() throws SQLException { + Properties jdbcProperties = getJdbcProperties(); + String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties); String jdbcUrl = String.format( - JDBC_URL, + jdbcUrlTemplate, config.get(PostgresSourceOptions.HOSTNAME), config.get(PostgresSourceOptions.PG_PORT), config.get(PostgresSourceOptions.DATABASE_NAME)); @@ -227,7 +229,24 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { @Override public String getTableListPrefix() { - String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME); - return schemaName; + return config.get(PostgresSourceOptions.SCHEMA_NAME); + } + + @Override + protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + + if (!initialJdbcUrl.startsWith("?")) { + return super.getJdbcUrlTemplate(initialJdbcUrl, jdbcProperties); + } + StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); + int recordIndex = 0; + for (Map.Entry entry : jdbcProperties.entrySet()) { + jdbcUrlBuilder.append(entry.getKey()).append("=").append(entry.getValue()); + if (recordIndex < jdbcProperties.size() - 1) { + jdbcUrlBuilder.append("&"); + recordIndex++; + } + } + return jdbcUrlBuilder.toString(); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index 08c54dd38..cb6b66829 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -63,7 +63,7 @@ public class SqlServerDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class); - private static final String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s"; + private static final String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s;"; private static final String PORT = "port"; public SqlServerDatabaseSync() throws SQLException { @@ -82,9 +82,11 @@ public void registerDriver() throws SQLException { @Override public Connection getConnection() throws SQLException { + Properties jdbcProperties = getJdbcProperties(); + String jdbcUrlTemplate = getJdbcUrlTemplate(JDBC_URL, jdbcProperties); String jdbcUrl = String.format( - JDBC_URL, + jdbcUrlTemplate, config.get(JdbcSourceOptions.HOSTNAME), config.getInteger(PORT, 1433), config.get(JdbcSourceOptions.DATABASE_NAME)); @@ -216,4 +218,12 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { public String getTableListPrefix() { return config.get(JdbcSourceOptions.SCHEMA_NAME); } + + @Override + public String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); + jdbcProperties.forEach( + (key, value) -> jdbcUrlBuilder.append(key).append("=").append(value).append(";")); + return jdbcUrlBuilder.toString(); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java new file mode 100644 index 000000000..3c24b810e --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.util; + +import java.time.LocalDate; +import java.time.LocalDateTime; + +/** + * idea for this util is from https://bugs.openjdk.org/browse/JDK-8144808 991ms. + * LocalDateTime.parse(...) 246ms : LocalDateTime.of(...) + */ +public final class FastDateUtil { + + public static LocalDateTime fastParseDateTimeV2(String dateTime, String pattern) { + char[] arr = dateTime.toCharArray(); + int[] indexes = + new int[] { + pattern.indexOf("yyyy"), + pattern.indexOf("MM"), + pattern.indexOf("dd"), + pattern.indexOf("HH"), + pattern.indexOf("mm"), + pattern.indexOf("ss"), + pattern.indexOf("SSSSSS") + }; + int year = parseFromIndex(arr, indexes[0], indexes[0] + 4); + int month = parseFromIndex(arr, indexes[1], indexes[1] + 2); + int day = parseFromIndex(arr, indexes[2], indexes[2] + 2); + int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2); + int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2); + int second = parseFromIndex(arr, indexes[5], indexes[5] + 2); + int nanos = parseFromIndex(arr, indexes[6], indexes[6] + 6) * 1000; + return LocalDateTime.of(year, month, day, hour, minute, second, nanos); + } + + public static LocalDateTime fastParseDateTime(String dateTime, String pattern) { + char[] arr = dateTime.toCharArray(); + int[] indexes = + new int[] { + pattern.indexOf("yyyy"), + pattern.indexOf("MM"), + pattern.indexOf("dd"), + pattern.indexOf("HH"), + pattern.indexOf("mm"), + pattern.indexOf("ss") + }; + int year = parseFromIndex(arr, indexes[0], indexes[0] + 4); + int month = parseFromIndex(arr, indexes[1], indexes[1] + 2); + int day = parseFromIndex(arr, indexes[2], indexes[2] + 2); + int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2); + int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2); + int second = parseFromIndex(arr, indexes[5], indexes[5] + 2); + return LocalDateTime.of(year, month, day, hour, minute, second); + } + + public static LocalDate fastParseDate(String dateTime, String pattern) { + char[] arr = dateTime.toCharArray(); + int[] indexes = + new int[] { + pattern.indexOf("yyyy"), pattern.indexOf("MM"), pattern.indexOf("dd"), + }; + int year = parseFromIndex(arr, indexes[0], indexes[0] + 4); + int month = parseFromIndex(arr, indexes[1], indexes[1] + 2); + int day = parseFromIndex(arr, indexes[2], indexes[2] + 2); + return LocalDate.of(year, month, day); + } + + private static int parseFromIndex(char[] arr, int start, int end) { + int value = 0; + for (int i = start; i < end; i++) { + value = value * 10 + (arr[i] - '0'); + } + return value; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java index 1bc1f115b..a0f3aaf01 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java @@ -46,6 +46,13 @@ public void testParseTableSchemaBuckets() { @Test public void testCreateTableSchema() { + TableSchema tableSchema = buildCreateTableSchema(); + Assert.assertEquals( + "TableSchema{database='doris', table='create_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={light_schema_change=true}, tableBuckets=null}", + tableSchema.toString()); + } + + private TableSchema buildCreateTableSchema() { String dorisTable = "doris.create_tab"; String[] dbTable = dorisTable.split("\\."); Preconditions.checkArgument(dbTable.length == 2); @@ -58,21 +65,24 @@ public void testCreateTableSchema() { List pkKeys = Collections.singletonList("email"); Map tableProperties = new HashMap<>(); String tableComment = "auto_tab_comment"; - TableSchema tableSchema = - DorisSchemaFactory.createTableSchema( - dbTable[0], - dbTable[1], - columnFields, - pkKeys, - new DorisTableConfig(tableProperties), - tableComment); - Assert.assertEquals( - "TableSchema{database='doris', table='create_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={light_schema_change=true}, tableBuckets=null}", - tableSchema.toString()); + return DorisSchemaFactory.createTableSchema( + dbTable[0], + dbTable[1], + columnFields, + pkKeys, + new DorisTableConfig(tableProperties), + tableComment); } @Test public void testCreateTableSchemaTableBuckets() { + TableSchema tableSchema = buildCreateTableSchemaTableBuckets(); + Assert.assertEquals( + "TableSchema{database='doris', table='create_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={replication_num=2, light_schema_change=true}, tableBuckets=40}", + tableSchema.toString()); + } + + private TableSchema buildCreateTableSchemaTableBuckets() { String dorisTable = "doris.create_tab"; String[] dbTable = dorisTable.split("\\."); Preconditions.checkArgument(dbTable.length == 2); @@ -87,21 +97,24 @@ public void testCreateTableSchemaTableBuckets() { tableProperties.put("table-buckets", "create_tab:40, create_taba:10, tabs:12"); tableProperties.put("replication_num", "2"); String tableComment = "auto_tab_comment"; - TableSchema tableSchema = - DorisSchemaFactory.createTableSchema( - dbTable[0], - dbTable[1], - columnFields, - pkKeys, - new DorisTableConfig(tableProperties), - tableComment); - Assert.assertEquals( - "TableSchema{database='doris', table='create_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=email, model=UNIQUE, distributeKeys=email, properties={replication_num=2, light_schema_change=true}, tableBuckets=40}", - tableSchema.toString()); + return DorisSchemaFactory.createTableSchema( + dbTable[0], + dbTable[1], + columnFields, + pkKeys, + new DorisTableConfig(tableProperties), + tableComment); } @Test public void testCreateDuplicateTableSchema() { + TableSchema tableSchema = buildCreateDuplicateTableSchema(); + Assert.assertEquals( + "TableSchema{database='doris', table='dup_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=name, model=DUPLICATE, distributeKeys=name, properties={replication_num=1, light_schema_change=true}, tableBuckets=null}", + tableSchema.toString()); + } + + private TableSchema buildCreateDuplicateTableSchema() { String dorisTable = "doris.dup_tab"; String[] dbTable = dorisTable.split("\\."); Preconditions.checkArgument(dbTable.length == 2); @@ -114,16 +127,50 @@ public void testCreateDuplicateTableSchema() { Map tableProperties = new HashMap<>(); tableProperties.put("replication_num", "1"); String tableComment = "auto_tab_comment"; - TableSchema tableSchema = - DorisSchemaFactory.createTableSchema( - dbTable[0], - dbTable[1], - columnFields, - new ArrayList<>(), - new DorisTableConfig(tableProperties), - tableComment); + return DorisSchemaFactory.createTableSchema( + dbTable[0], + dbTable[1], + columnFields, + new ArrayList<>(), + new DorisTableConfig(tableProperties), + tableComment); + } + + @Test + public void testGenerateCreateTableDDL() { + TableSchema tableSchema = buildCreateTableSchema(); + String ddl = DorisSchemaFactory.generateCreateTableDDL(tableSchema); Assert.assertEquals( - "TableSchema{database='doris', table='dup_tab', tableComment='auto_tab_comment', fields={name=FieldSchema{name='name', typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'}, id=FieldSchema{name='id', typeString='INT', defaultValue='100', comment='int_test'}, age=FieldSchema{name='age', typeString='INT', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(100)', defaultValue='email@doris.com', comment='e'}}, keys=name, model=DUPLICATE, distributeKeys=name, properties={replication_num=1, light_schema_change=true}, tableBuckets=null}", - tableSchema.toString()); + "CREATE TABLE IF NOT EXISTS `doris`.`create_tab`(`email` VARCHAR(100) DEFAULT 'email@doris.com' COMMENT 'e',`name` VARVHAR(100) COMMENT 'Name_test',`id` INT DEFAULT '100' COMMENT 'int_test',`age` INT COMMENT '' ) UNIQUE KEY(`email`) COMMENT 'auto_tab_comment' DISTRIBUTED BY HASH(`email`) BUCKETS AUTO PROPERTIES ('light_schema_change'='true')", + ddl); + } + + @Test + public void testGenerateCreateTableDDLBuckets() { + TableSchema tableSchema = buildCreateTableSchemaTableBuckets(); + String ddl = DorisSchemaFactory.generateCreateTableDDL(tableSchema); + Assert.assertEquals( + "CREATE TABLE IF NOT EXISTS `doris`.`create_tab`(`email` VARCHAR(100) DEFAULT 'email@doris.com' COMMENT 'e',`name` VARVHAR(100) COMMENT 'Name_test',`id` INT DEFAULT '100' COMMENT 'int_test',`age` INT COMMENT '' ) UNIQUE KEY(`email`) COMMENT 'auto_tab_comment' DISTRIBUTED BY HASH(`email`) BUCKETS 40 PROPERTIES ('replication_num'='2','light_schema_change'='true')", + ddl); + } + + @Test + public void testGenerateCreateTableDDLDuplicate() { + TableSchema tableSchema = buildCreateDuplicateTableSchema(); + String ddl = DorisSchemaFactory.generateCreateTableDDL(tableSchema); + Assert.assertEquals( + "CREATE TABLE IF NOT EXISTS `doris`.`dup_tab`(`name` VARVHAR(100) COMMENT 'Name_test',`id` INT DEFAULT '100' COMMENT 'int_test',`age` INT COMMENT '',`email` VARCHAR(100) DEFAULT 'email@doris.com' COMMENT 'e' ) COMMENT 'auto_tab_comment' DISTRIBUTED BY HASH(`name`) BUCKETS AUTO PROPERTIES ('replication_num'='1','light_schema_change'='true')", + ddl); + } + + @Test + public void quoteTableIdentifier() { + String quoted = DorisSchemaFactory.quoteTableIdentifier("db.tbl"); + Assert.assertEquals("`db`.`tbl`", quoted); + } + + @Test(expected = IllegalArgumentException.class) + public void quoteTableIdentifierException() { + DorisSchemaFactory.quoteTableIdentifier("db.tbl.sc"); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java new file mode 100644 index 000000000..2353bd7df --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.rest; + +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.doris.flink.exception.DorisException; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class SchemaUtilsTest { + private static final Logger logger = LoggerFactory.getLogger(SchemaUtilsTest.class); + + @Test + public void convertToSchema() throws DorisException { + Field field1 = + new Field("field1", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Field field2 = + new Field("field2", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Schema arrowSchema = new Schema(Arrays.asList(field1, field2)); + String schemaStr = + "{\"properties\":[" + + "{\"type\":\"int\",\"name\":\"field1\",\"comment\":\"\"}" + + ",{\"type\":\"int\",\"name\":\"field2\",\"comment\":\"\"}" + + "], \"status\":200}"; + org.apache.doris.flink.rest.models.Schema schema = + RestService.parseSchema(schemaStr, logger); + + org.apache.doris.flink.rest.models.Schema result = + SchemaUtils.convertToSchema(schema, arrowSchema); + + assertEquals(2, result.getProperties().size()); + assertEquals("field1", result.getProperties().get(0).getName()); + assertEquals("field2", result.getProperties().get(1).getName()); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java index d8cb7c3f2..d65deeb0b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.schema; +import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.tools.cdc.DorisTableConfig; import org.apache.doris.flink.tools.cdc.SourceConnector; @@ -139,7 +140,7 @@ public void testExtractCommentValueA() { public void testExtractDefaultValue() { String expectDefault = "100"; List columnSpecs = Arrays.asList("default", "'100'", "comment", ""); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.INT, columnSpecs); Assert.assertEquals(expectDefault, actualDefault); } @@ -147,14 +148,14 @@ public void testExtractDefaultValue() { public void testExtractDefaultValueQuotes() { String expectDefault = "100"; List columnSpecs = Arrays.asList("default", "\"100\"", "comment", ""); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.BIGINT, columnSpecs); Assert.assertEquals(expectDefault, actualDefault); } @Test public void testExtractDefaultValueNull() { List columnSpecs = Arrays.asList("Default", null, "comment", null); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs); Assert.assertNull(actualDefault); } @@ -162,7 +163,7 @@ public void testExtractDefaultValueNull() { public void testExtractDefaultValueEmpty() { String expectDefault = null; List columnSpecs = Arrays.asList("DEFAULT", "comment", null); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs); Assert.assertEquals(expectDefault, actualDefault); } @@ -170,17 +171,53 @@ public void testExtractDefaultValueEmpty() { public void testExtractDefaultValueA() { String expectDefault = "aaa"; List columnSpecs = Arrays.asList("default", "aaa"); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs); Assert.assertEquals(expectDefault, actualDefault); } @Test public void testExtractDefaultValueNULL() { List columnSpecs = Collections.singletonList("default"); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs); Assert.assertNull(actualDefault); } + @Test + public void testExtractDefaultValueDateTime() { + List columnSpecs = Arrays.asList("default", "SYSTIMESTAMP"); + String actualDefault = schemaManager.extractDefaultValue(DorisType.DATETIME, columnSpecs); + Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault); + } + + @Test + public void testExtractDefaultValueDateTimeV2() { + List columnSpecs = Arrays.asList("default", "GETDATE()"); + String actualDefault = + schemaManager.extractDefaultValue(DorisType.DATETIME_V2, columnSpecs); + Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault); + } + + @Test + public void testExtractDefaultValueDateTimeV2Time() { + List columnSpecs = Arrays.asList("default", "2024-03-14 17:50:36.002"); + String actualDefault = schemaManager.extractDefaultValue("DATETIMEV2(3)", columnSpecs); + Assert.assertEquals("2024-03-14 17:50:36.002", actualDefault); + } + + @Test + public void testExtractDefaultValueDateTimeV2CurrentTime() { + List columnSpecs = Arrays.asList("default", "now()"); + String actualDefault = schemaManager.extractDefaultValue("DATETIMEV2(3)", columnSpecs); + Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault); + } + + @Test + public void testExtractDefaultValueDate() { + List columnSpecs = Arrays.asList("default", "2024-03-14 17:50:36"); + String actualDefault = schemaManager.extractDefaultValue(DorisType.DATE, columnSpecs); + Assert.assertEquals("2024-03-14 17:50:36", actualDefault); + } + @Test public void testRemoveContinuousChar() { // Test removing continuous target characters from both ends @@ -288,7 +325,7 @@ public void testParseOracleTableStatement() { SourceConnector.ORACLE, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -314,7 +351,7 @@ public void testParseOraclePrimaryTableStatement() { SourceConnector.ORACLE, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', comment='null'}, phone_number=FieldSchema{name='phone_number', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, hire_date=FieldSchema{name='hire_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, job_id=FieldSchema{name='job_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, salary=FieldSchema{name='salary', typeString='DECIMALV3(8,2)', defaultValue='null', comment='null'}, commission_pct=FieldSchema{name='commission_pct', typeString='DECIMALV3(2,2)', defaultValue='null', comment='null'}, manager_id=FieldSchema{name='manager_id', typeString='BIGINT', defaultValue='null', comment='null'}, department_id=FieldSchema{name='department_id', typeString='BIGINT', defaultValue='null', comment='null'}}, keys=employee_id, model=UNIQUE, distributeKeys=employee_id, properties={}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } @@ -341,7 +378,7 @@ public void testParseOracleDuplicateTableStatement() { dorisTable, new DorisTableConfig(new HashMap<>())); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, total_amount=FieldSchema{name='total_amount', typeString='DECIMALV3(12,2)', defaultValue='null', comment='null'}, shipping_address=FieldSchema{name='shipping_address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}, delivery_date=FieldSchema{name='delivery_date', typeString='DATETIMEV2', defaultValue='null', comment='null'}}, keys=order_id, model=DUPLICATE, distributeKeys=order_id, properties={light_schema_change=true}, tableBuckets=null}"; + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(60)', defaultValue='null', comment='null'}, total_amount=FieldSchema{name='total_amount', typeString='DECIMALV3(12,2)', defaultValue='null', comment='null'}, shipping_address=FieldSchema{name='shipping_address', typeString='VARCHAR(765)', defaultValue='null', comment='null'}, delivery_date=FieldSchema{name='delivery_date', typeString='DATETIMEV2', defaultValue='null', comment='null'}}, keys=order_id, model=DUPLICATE, distributeKeys=order_id, properties={light_schema_change=true}, tableBuckets=null}"; Assert.assertEquals(expected, tableSchema.toString()); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java index 4891d820c..f8098ccc7 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java @@ -50,7 +50,8 @@ public void setUp() { lineDelimiter, ignoreUpdateBefore, "", - ""); + "", + true); dataChange = new JsonDebeziumDataChange(changeContext); } @@ -113,7 +114,8 @@ public void testSerializeUpdateBefore() throws IOException { lineDelimiter, false, "", - ""); + "", + true); dataChange = new JsonDebeziumDataChange(changeContext); // update t1 set name='doris-update' WHERE id =1; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java index caf5542c9..e66ecaab8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java @@ -63,7 +63,8 @@ public void setUp() { lineDelimiter, ignoreUpdateBefore, "", - ""); + "", + true); schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index 68239618c..39241f949 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -83,7 +83,8 @@ public void setUp() { lineDelimiter, ignoreUpdateBefore, "", - ""); + "", + true); schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext); } @@ -542,6 +543,44 @@ private Map buildDatetimeFieldSchemaMap() { return filedSchemaMap; } + @Test + public void buildFieldSchemaTest() { + Map result = new HashMap<>(); + String columnInfo = + "{\"name\":\"order_ts\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":\"中文注释\",\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]}\n"; + schemaChange.setSourceConnector("mysql"); + JsonNode columns = null; + try { + columns = objectMapper.readTree(columnInfo); + } catch (IOException e) { + e.printStackTrace(); + } + schemaChange.buildFieldSchema(result, columns); + Assert.assertTrue(result.containsKey("order_ts")); + FieldSchema fieldSchema = result.get("order_ts"); + Assert.assertEquals(fieldSchema.getName().toLowerCase(), "order_ts"); + Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "datetimev2(0)"); + Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), "current_timestamp"); + Assert.assertEquals(fieldSchema.getComment(), "中文注释"); + + columnInfo = + "{\"name\":\"other_no\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":23,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":\"comment\",\"hasDefaultValue\":true,\"defaultValueExpression\":\"\",\"enumValues\":[]}\n"; + schemaChange.setSourceConnector("mysql"); + columns = null; + try { + columns = objectMapper.readTree(columnInfo); + } catch (IOException e) { + e.printStackTrace(); + } + schemaChange.buildFieldSchema(result, columns); + Assert.assertTrue(result.containsKey("other_no")); + fieldSchema = result.get("other_no"); + Assert.assertEquals(fieldSchema.getName().toLowerCase(), "other_no"); + Assert.assertEquals(fieldSchema.getTypeString().toLowerCase(), "varchar(150)"); + Assert.assertEquals(fieldSchema.getDefaultValue().toLowerCase(), ""); + Assert.assertEquals(fieldSchema.getComment(), "comment"); + } + @After public void after() { mockRestService.close(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java index d3194f811..a7958b70d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java @@ -46,7 +46,8 @@ public void setUp() { lineDelimiter, ignoreUpdateBefore, "", - ""); + "", + true); schemaChange = new SQLParserSchemaChange(changeContext); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java index 0004af05c..05a93dc5f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java @@ -65,7 +65,8 @@ public void testDorisSourceProperties() { properties.put("lookup.jdbc.read.batch.size", "16"); properties.put("lookup.jdbc.read.batch.queue-size", "16"); properties.put("lookup.jdbc.read.thread-size", "1"); - + properties.put("source.use-flight-sql", "false"); + properties.put("source.flight-sql-port", "9040"); DynamicTableSource actual = createTableSource(SCHEMA, properties); DorisOptions options = DorisOptions.builder() @@ -98,7 +99,9 @@ public void testDorisSourceProperties() { .setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) .setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) .setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT) - .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT); + .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT) + .setUseFlightSql(false) + .setFlightSqlPort(9040); DorisDynamicTableSource expected = new DorisDynamicTableSource( options, @@ -182,7 +185,9 @@ public void testDorisSinkProperties() { .setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) .setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) .setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT) - .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT); + .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT) + .setUseFlightSql(false) + .setFlightSqlPort(9040); DorisDynamicTableSink expected = new DorisDynamicTableSink( options, diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java index 77b8931d9..0666cb9d4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java @@ -17,9 +17,12 @@ package org.apache.doris.flink.tools.cdc; +import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.doris.flink.table.DorisConfigOptions; import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync; import java.util.HashMap; @@ -35,42 +38,35 @@ public static void main(String[] args) throws Exception { env.disableOperatorChaining(); env.enableCheckpointing(10000); - // Map flinkMap = new HashMap<>(); - // flinkMap.put("execution.checkpointing.interval","10s"); - // flinkMap.put("pipeline.operator-chaining","false"); - // flinkMap.put("parallelism.default","1"); - - // Configuration configuration = Configuration.fromMap(flinkMap); - // env.configure(configuration); - String database = "db2_test"; String tablePrefix = ""; String tableSuffix = ""; Map sourceConfig = new HashMap<>(); - sourceConfig.put("database-name", "testdb"); - sourceConfig.put("schema-name", "DB2INST1"); - sourceConfig.put("hostname", "127.0.0.1"); - sourceConfig.put("port", "50000"); - sourceConfig.put("username", "db2inst1"); - sourceConfig.put("password", "=doris123456"); - // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); - sourceConfig.put("scan.incremental.snapshot.enabled", "true"); - // sourceConfig.put("debezium.include.schema.changes","false"); + sourceConfig.put(JdbcSourceOptions.DATABASE_NAME.key(), "testdb"); + sourceConfig.put(JdbcSourceOptions.SCHEMA_NAME.key(), "DB2INST1"); + sourceConfig.put(JdbcSourceOptions.HOSTNAME.key(), "127.0.0.1"); + sourceConfig.put(Db2DatabaseSync.PORT.key(), "50000"); + sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1"); + sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456"); + sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true"); + // add jdbc properties configuration + sourceConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1"); + sourceConfig.put("jdbc.properties.resultSetHoldability", "1"); + sourceConfig.put("jdbc.properties.SSL", "false"); Configuration config = Configuration.fromMap(sourceConfig); Map sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes", "127.0.0.1:8030"); - // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); - sinkConfig.put("username", "root"); - sinkConfig.put("password", "123456"); - sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030"); - sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + sinkConfig.put(DorisConfigOptions.FENODES.key(), "127.0.0.1:8030"); + sinkConfig.put(DorisConfigOptions.USERNAME.key(), "root"); + sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "123456"); + sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); - tableConfig.put("replication_num", "1"); - // tableConfig.put("table-buckets", "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); + tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); String includingTables = "FULL_TYPES"; String excludingTables = null; String multiToOneOrigin = null; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index e0c0b828e..c430ea87b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -53,6 +53,8 @@ public static void main(String[] args) throws Exception { mysqlConfig.put(MySqlSourceOptions.PORT.key(), "3306"); mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "root"); mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "12345678"); + // add jdbc properties for MySQL + mysqlConfig.put("jdbc.properties.use_ssl", "false"); Configuration config = Configuration.fromMap(mysqlConfig); Map sinkConfig = new HashMap<>(); @@ -61,6 +63,7 @@ public static void main(String[] args) throws Exception { sinkConfig.put(DorisConfigOptions.PASSWORD.key(), ""); sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://10.20.30.1:9030"); sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString()); + sinkConfig.put("sink.enable-delete", "false"); Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java index f1e61e72e..99892e022 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java @@ -49,6 +49,8 @@ public static void main(String[] args) throws Exception { sourceConfig.put(PostgresSourceOptions.PG_PORT.key(), "5432"); sourceConfig.put(PostgresSourceOptions.USERNAME.key(), "postgres"); sourceConfig.put(PostgresSourceOptions.PASSWORD.key(), "123456"); + // add jdbc properties configuration + sourceConfig.put("jdbc.properties.ssl", "false"); // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); // sourceConfig.put("scan.incremental.snapshot.enabled","true"); // sourceConfig.put("debezium.include.schema.changes","false"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java index 4e343239a..ca6a3121b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -47,6 +47,9 @@ public static void main(String[] args) throws Exception { sourceConfig.put(DatabaseSyncConfig.PORT, "1433"); sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "sa"); sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "Passw@rd"); + // add jdbc properties configuration + sourceConfig.put("jdbc.properties.encrypt", "false"); + sourceConfig.put("jdbc.properties.integratedSecurity", "false"); // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); // sourceConfig.put("scan.incremental.snapshot.enabled","true"); // sourceConfig.put("debezium.include.schema.changes","false"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java index f0cd0a51f..859a87208 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java @@ -20,16 +20,22 @@ import org.apache.flink.configuration.Configuration; import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; +import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync; +import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync; import org.jetbrains.annotations.NotNull; +import org.junit.Assert; import org.junit.Test; import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -169,4 +175,95 @@ public void singleSinkTablePatternTest() throws SQLException { assertFalse("ssb_test.dates".matches(syncTableListPattern)); assertFalse("ssb_test.lineorder".matches(syncTableListPattern)); } + + @Test + public void getJdbcPropertiesTest() throws Exception { + DatabaseSync databaseSync = new MysqlDatabaseSync(); + Map mysqlConfig = new HashMap<>(); + mysqlConfig.put("jdbc.properties.use_ssl", "false"); + + Configuration config = Configuration.fromMap(mysqlConfig); + databaseSync.setConfig(config); + Properties jdbcProperties = databaseSync.getJdbcProperties(); + Assert.assertEquals(1, jdbcProperties.size()); + Assert.assertEquals("false", jdbcProperties.getProperty("use_ssl")); + } + + @Test + public void getJdbcUrlTemplateTest() throws SQLException { + String mysqlJdbcTemplate = "jdbc:mysql://%s:%d?useInformationSchema=true"; + String postgresJdbcTemplate = "jdbc:postgresql://%s:%d/%s?"; + String sqlServerJdbcTemplate = "jdbc:sqlserver://%s:%d;database=%s;"; + String db2JdbcTemplate = "jdbc:db2://%s:%d/%s"; + + // mysql jdbc properties configuration + DatabaseSync mysqlDatabaseSync = new MysqlDatabaseSync(); + Map mysqlJdbcConfig = new LinkedHashMap<>(); + mysqlJdbcConfig.put("jdbc.properties.use_ssl", "false"); + + DatabaseSync postgresDatabaseSync = new PostgresDatabaseSync(); + Map postgresJdbcConfig = new LinkedHashMap<>(); + postgresJdbcConfig.put("jdbc.properties.ssl", "false"); + + DatabaseSync sqlServerDatabaseSync = new SqlServerDatabaseSync(); + Map sqlServerJdbcConfig = new LinkedHashMap<>(); + sqlServerJdbcConfig.put("jdbc.properties.encrypt", "false"); + sqlServerJdbcConfig.put("jdbc.properties.integratedSecurity", "false"); + + DatabaseSync db2DatabaseSync = new Db2DatabaseSync(); + Map db2JdbcConfig = new LinkedHashMap<>(); + db2JdbcConfig.put("jdbc.properties.ssl", "false"); + db2JdbcConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1"); + db2JdbcConfig.put("jdbc.properties.resultSetHoldability", "1"); + + Configuration mysqlConfig = Configuration.fromMap(mysqlJdbcConfig); + mysqlDatabaseSync.setConfig(mysqlConfig); + + Configuration postgresConfig = Configuration.fromMap(postgresJdbcConfig); + postgresDatabaseSync.setConfig(postgresConfig); + + Configuration sqlServerConfig = Configuration.fromMap(sqlServerJdbcConfig); + sqlServerDatabaseSync.setConfig(sqlServerConfig); + + Configuration db2Config = Configuration.fromMap(db2JdbcConfig); + db2DatabaseSync.setConfig(db2Config); + + Properties mysqlJdbcProperties = mysqlDatabaseSync.getJdbcProperties(); + Assert.assertEquals(1, mysqlJdbcProperties.size()); + Assert.assertEquals("false", mysqlJdbcProperties.getProperty("use_ssl")); + String mysqlJdbcUrlTemplate = + mysqlDatabaseSync.getJdbcUrlTemplate(mysqlJdbcTemplate, mysqlJdbcProperties); + Assert.assertEquals(mysqlJdbcTemplate + "&use_ssl=false", mysqlJdbcUrlTemplate); + + Properties postgresJdbcProperties = postgresDatabaseSync.getJdbcProperties(); + Assert.assertEquals(1, postgresJdbcProperties.size()); + Assert.assertEquals("false", postgresJdbcProperties.getProperty("ssl")); + String postgresJdbcUrlTemplate = + postgresDatabaseSync.getJdbcUrlTemplate( + postgresJdbcTemplate, postgresJdbcProperties); + Assert.assertEquals(postgresJdbcTemplate + "&ssl=false", postgresJdbcUrlTemplate); + + Properties sqlServerJdbcProperties = sqlServerDatabaseSync.getJdbcProperties(); + Assert.assertEquals(2, sqlServerJdbcProperties.size()); + Assert.assertEquals("false", sqlServerJdbcProperties.getProperty("encrypt")); + Assert.assertEquals("false", sqlServerJdbcProperties.getProperty("integratedSecurity")); + String sqlServerJdbcUrlTemplate = + sqlServerDatabaseSync.getJdbcUrlTemplate( + sqlServerJdbcTemplate, sqlServerJdbcProperties); + Assert.assertEquals( + sqlServerJdbcTemplate + "encrypt=false;integratedSecurity=false;", + sqlServerJdbcUrlTemplate); + + Properties db2JdbcProperties = db2DatabaseSync.getJdbcProperties(); + Assert.assertEquals(3, db2JdbcProperties.size()); + Assert.assertEquals("false", db2JdbcProperties.getProperty("ssl")); + Assert.assertEquals("1", db2JdbcProperties.getProperty("allowNextOnExhaustedResultSet")); + Assert.assertEquals("1", db2JdbcProperties.getProperty("resultSetHoldability")); + String db2JdbcUrlTemplate = + db2DatabaseSync.getJdbcUrlTemplate(db2JdbcTemplate, db2JdbcProperties); + Assert.assertEquals( + db2JdbcTemplate + + ":allowNextOnExhaustedResultSet=1;ssl=false;resultSetHoldability=1;", + db2JdbcUrlTemplate); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java index 6a6138416..aeb17c29d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java @@ -503,6 +503,122 @@ public void testMySQL2DorisByDefault() throws Exception { jobClient.cancel().get(); } + @Test + public void testMySQL2DorisEnableDelete() throws Exception { + printClusterStatus(); + initializeMySQLTable(); + initializeDorisTable(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + Map flinkMap = new HashMap<>(); + flinkMap.put("execution.checkpointing.interval", "10s"); + flinkMap.put("pipeline.operator-chaining", "false"); + flinkMap.put("parallelism.default", "1"); + + Configuration configuration = Configuration.fromMap(flinkMap); + env.configure(configuration); + + String database = DATABASE; + Map mysqlConfig = new HashMap<>(); + mysqlConfig.put("database-name", DATABASE); + mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost()); + mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + ""); + mysqlConfig.put("username", MYSQL_USER); + mysqlConfig.put("password", MYSQL_PASSWD); + mysqlConfig.put("server-time-zone", "Asia/Shanghai"); + Configuration config = Configuration.fromMap(mysqlConfig); + + Map sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", getFenodes()); + sinkConfig.put("username", USERNAME); + sinkConfig.put("password", PASSWORD); + sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost())); + sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + sinkConfig.put("sink.check-interval", "5000"); + sinkConfig.put("sink.enable-delete", "false"); + Configuration sinkConf = Configuration.fromMap(sinkConfig); + + Map tableConfig = new HashMap<>(); + tableConfig.put("replication_num", "1"); + + String includingTables = "tbl1|tbl2|tbl3|tbl5"; + String excludingTables = ""; + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync + .setEnv(env) + .setDatabase(database) + .setConfig(config) + .setIncludingTables(includingTables) + .setExcludingTables(excludingTables) + .setIgnoreDefaultValue(false) + .setSinkConfig(sinkConf) + .setTableConfig(tableConfig) + .setCreateTableOnly(false) + .setNewSchemaChange(true) + // no single sink + .setSingleSink(false) + .create(); + databaseSync.build(); + JobClient jobClient = env.executeAsync(); + waitForJobStatus( + jobClient, + Collections.singletonList(RUNNING), + Deadline.fromNow(Duration.ofSeconds(10))); + + // wait 2 times checkpoint + Thread.sleep(20000); + List expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5"); + String sql = + "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1"; + String query1 = + String.format( + sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE, + TABLE_5); + checkResult(expected, query1, 2); + + // add incremental data + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD); + Statement statement = connection.createStatement()) { + statement.execute( + String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1)); + statement.execute( + String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2)); + statement.execute( + String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3)); + + statement.execute( + String.format( + "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1)); + statement.execute( + String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2)); + statement.execute( + String.format("delete from %s.%s where name='doris_3'", DATABASE, TABLE_3)); + statement.execute( + String.format("delete from %s.%s where name='doris_5'", DATABASE, TABLE_5)); + } + + Thread.sleep(20000); + List expected2 = + Arrays.asList( + "doris_1,18", + "doris_1_1,10", + "doris_2,2", + "doris_2_1,11", + "doris_3,3", + "doris_3_1,12", + "doris_5,5"); + sql = + "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1"; + String query2 = + String.format( + sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE, + TABLE_5); + checkResult(expected2, query2, 2); + jobClient.cancel().get(); + } + private void initializeDorisTable() throws Exception { try (Connection connection = DriverManager.getConnection( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java index 75dadde87..a9cc8bbc3 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java @@ -17,10 +17,14 @@ package org.apache.doris.flink.tools.cdc.mongodb; +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.bson.Document; +import org.bson.types.Decimal128; import org.junit.Test; +import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -37,22 +41,119 @@ public void getCdcTableName() throws Exception { } @Test - public void replaceDecimalTypeIfNeeded() throws Exception { + public void replaceDecimalTypeIfNeededTest1() throws Exception { ArrayList documents = new ArrayList<>(); documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789.88888888)); + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); - String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", "DECIMALV3(12,8)"); - assertEquals("DECIMAL(15,8)", d); + Map fields = mongoDBSchema.getFields(); + for (Map.Entry entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("DECIMAL(17,8)", fieldSchema.getTypeString()); + } + } } @Test - public void replaceDecimalTypeIfNeededWhenContainsNonDecimalType() throws Exception { + public void replaceDecimalTypeIfNeededTest2() throws Exception { ArrayList documents = new ArrayList<>(); documents.add(new Document("fields1", 1234567.666666)); - documents.add(new Document("fields1", 1234567)); + documents.add(new Document("fields1", 123456789)); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + Map fields = mongoDBSchema.getFields(); + for (Map.Entry entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("DECIMAL(15,6)", fieldSchema.getTypeString()); + } + } + } + + @Test + public void replaceDecimalTypeIfNeededTest3() throws Exception { + ArrayList documents = new ArrayList<>(); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789)); documents.add(new Document("fields1", 1234567.7777777)); + documents.add( + new Document("fields1", new Decimal128(new BigDecimal("12345679012.999999999")))); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + Map fields = mongoDBSchema.getFields(); + for (Map.Entry entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("DECIMAL(20,9)", fieldSchema.getTypeString()); + } + } + } + + @Test + public void replaceDecimalTypeIfNeededTest4() throws Exception { + ArrayList documents = new ArrayList<>(); + documents.add(new Document("fields1", "yes")); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789)); + documents.add(new Document("fields1", 1234567.7777777)); + documents.add( + new Document("fields1", new Decimal128(new BigDecimal("12345679012.999999999")))); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + Map fields = mongoDBSchema.getFields(); + for (Map.Entry entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("STRING", fieldSchema.getTypeString()); + } + } + } + + @Test + public void replaceDecimalTypeIfNeededTest5() throws Exception { + ArrayList documents = new ArrayList<>(); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789)); + documents.add(new Document("fields1", 1234567.7777777)); + documents.add(new Document("fields1", "yes")); + documents.add( + new Document("fields1", new Decimal128(new BigDecimal("12345679012.999999999")))); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + Map fields = mongoDBSchema.getFields(); + for (Map.Entry entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("STRING", fieldSchema.getTypeString()); + } + } + } + + @Test + public void replaceDecimalTypeIfNeededTest6() throws Exception { + ArrayList documents = new ArrayList<>(); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789)); + documents.add(new Document("fields1", 1234567.7777777)); + documents.add(new Document("fields1", 123444555433445L)); + documents.add( + new Document("fields1", new Decimal128(new BigDecimal("12345679012.999999999")))); + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); - String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", "DECIMALV3(12,8)"); - assertEquals("DECIMAL(15,8)", d); + Map fields = mongoDBSchema.getFields(); + for (Map.Entry entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("DECIMAL(24,9)", fieldSchema.getTypeString()); + } + } } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java new file mode 100644 index 000000000..a89e50a05 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.utils; + +import org.apache.doris.flink.util.FastDateUtil; +import org.junit.jupiter.api.Test; + +import java.time.LocalDate; +import java.time.LocalDateTime; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class FastDateUtilTest { + + @Test + void fastParseDateTimeV2_withValidDateTimeAndPattern_returnsCorrectLocalDateTime() { + String dateTime = "2023-10-05 14:30:45.123456"; + String pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + LocalDateTime result = FastDateUtil.fastParseDateTimeV2(dateTime, pattern); + assertEquals(LocalDateTime.of(2023, 10, 5, 14, 30, 45, 123456000), result); + } + + @Test + void fastParseDateTime_withValidDateTimeAndPattern_returnsCorrectLocalDateTime() { + String dateTime = "2023-10-05 14:30:45"; + String pattern = "yyyy-MM-dd HH:mm:ss"; + LocalDateTime result = FastDateUtil.fastParseDateTime(dateTime, pattern); + assertEquals(LocalDateTime.of(2023, 10, 5, 14, 30, 45), result); + } + + @Test + void fastParseDate_withValidDateAndPattern_returnsCorrectLocalDate() { + String dateTime = "2023-10-05"; + String pattern = "yyyy-MM-dd"; + LocalDate result = FastDateUtil.fastParseDate(dateTime, pattern); + assertEquals(LocalDate.of(2023, 10, 5), result); + } +}