From eb90905b82a6e46aeb8b9f0ddea151bcd64554c7 Mon Sep 17 00:00:00 2001 From: xiayang Date: Mon, 12 Aug 2024 15:53:33 +0800 Subject: [PATCH] [Feature] support reader doris using arrow flight driver (#465) --- flink-doris-connector/pom.xml | 21 +- .../doris/flink/cfg/ConfigurationOptions.java | 6 + .../doris/flink/cfg/DorisReadOptions.java | 42 +++- .../doris/flink/cfg/DorisStreamOptions.java | 12 ++ .../apache/doris/flink/rest/RestService.java | 32 +++ .../apache/doris/flink/rest/SchemaUtils.java | 24 +++ .../doris/flink/serialization/RowBatch.java | 102 ++++++++-- .../source/reader/DorisFlightValueReader.java | 182 ++++++++++++++++++ .../source/reader/DorisSourceSplitReader.java | 14 +- .../flink/source/reader/DorisValueReader.java | 2 +- .../flink/source/reader/ValueReader.java | 54 ++++++ .../flink/source/split/DorisSplitRecords.java | 8 +- .../doris/flink/table/DorisConfigOptions.java | 11 +- .../flink/table/DorisDynamicTableFactory.java | 9 +- .../apache/doris/flink/util/FastDateUtil.java | 90 +++++++++ .../doris/flink/rest/SchemaUtilsTest.java | 58 ++++++ .../table/DorisDynamicTableFactoryTest.java | 11 +- .../doris/flink/utils/FastDateUtilTest.java | 53 +++++ 18 files changed, 687 insertions(+), 44 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 2d7b28753..4bf34d689 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -73,7 +73,6 @@ under the License. 3.1.1 flink-python 0.16.0 - 13.0.0 3.10.1 3.3.0 3.2.1 @@ -95,6 +94,8 @@ under the License. 4.9 8.0.26 19.3.0.0 + 15.0.2 + 0.12.0 @@ -179,13 +180,16 @@ under the License. commons-codec ${commons-codec.version} - + + org.apache.arrow.adbc + adbc-driver-flight-sql + ${adbc.version} + org.apache.arrow arrow-vector ${arrow.version} - org.apache.arrow arrow-memory-netty @@ -206,7 +210,6 @@ under the License. - com.fasterxml.jackson.core @@ -410,13 +413,13 @@ under the License. org.apache.maven.plugins maven-shade-plugin - 3.2.4 + 3.4.1 - - org.apache.arrow - org.apache.doris.shaded.org.apache.arrow - + + + + io.netty org.apache.doris.shaded.io.netty diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java index 4a3f70b80..c249c2519 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java @@ -51,4 +51,10 @@ public interface ConfigurationOptions { Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size"; Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; + + String USE_FLIGHT_SQL = "source.use-flight-sql"; + Boolean USE_FLIGHT_SQL_DEFAULT = false; + + String FLIGHT_SQL_PORT = "source.flight-sql-port"; + Integer FLIGHT_SQL_PORT_DEFAULT = 9040; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 3669e740a..2f6cd8a86 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -37,6 +37,8 @@ public class DorisReadOptions implements Serializable { private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; private boolean useOldApi; + private boolean useFlightSql; + private Integer flightSqlPort; public DorisReadOptions( String readFields, @@ -50,7 +52,9 @@ public DorisReadOptions( Long execMemLimit, Integer deserializeQueueSize, Boolean deserializeArrowAsync, - boolean useOldApi) { + boolean useOldApi, + boolean useFlightSql, + Integer flightSqlPort) { this.readFields = readFields; this.filterQuery = filterQuery; this.requestTabletSize = requestTabletSize; @@ -63,6 +67,8 @@ public DorisReadOptions( this.deserializeQueueSize = deserializeQueueSize; this.deserializeArrowAsync = deserializeArrowAsync; this.useOldApi = useOldApi; + this.useFlightSql = useFlightSql; + this.flightSqlPort = flightSqlPort; } public String getReadFields() { @@ -121,6 +127,14 @@ public void setFilterQuery(String filterQuery) { this.filterQuery = filterQuery; } + public boolean getUseFlightSql() { + return useFlightSql; + } + + public Integer getFlightSqlPort() { + return flightSqlPort; + } + public static Builder builder() { return new Builder(); } @@ -149,7 +163,9 @@ public boolean equals(Object o) { && Objects.equals(requestBatchSize, that.requestBatchSize) && Objects.equals(execMemLimit, that.execMemLimit) && Objects.equals(deserializeQueueSize, that.deserializeQueueSize) - && Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync); + && Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync) + && Objects.equals(useFlightSql, that.useFlightSql) + && Objects.equals(flightSqlPort, that.flightSqlPort); } @Override @@ -166,7 +182,9 @@ public int hashCode() { execMemLimit, deserializeQueueSize, deserializeArrowAsync, - useOldApi); + useOldApi, + useFlightSql, + flightSqlPort); } /** Builder of {@link DorisReadOptions}. */ @@ -184,6 +202,8 @@ public static class Builder { private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; private Boolean useOldApi = false; + private Boolean useFlightSql = false; + private Integer flightSqlPort; public Builder setReadFields(String readFields) { this.readFields = readFields; @@ -240,11 +260,21 @@ public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) { return this; } - public Builder setUseOldApi(boolean useOldApi) { + public Builder setUseFlightSql(Boolean useFlightSql) { + this.useFlightSql = useFlightSql; + return this; + } + + public Builder setUseOldApi(Boolean useOldApi) { this.useOldApi = useOldApi; return this; } + public Builder setFlightSqlPort(Integer flightSqlPort) { + this.flightSqlPort = flightSqlPort; + return this; + } + public DorisReadOptions build() { return new DorisReadOptions( readFields, @@ -258,7 +288,9 @@ public DorisReadOptions build() { execMemLimit, deserializeQueueSize, deserializeArrowAsync, - useOldApi); + useOldApi, + useFlightSql, + flightSqlPort); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java index 89e9182e8..e6d75969c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java @@ -106,6 +106,18 @@ private void init() { prop.getProperty( ConfigurationOptions.DORIS_TABLET_SIZE, ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT + .toString()))) + .setUseFlightSql( + Boolean.valueOf( + prop.getProperty( + ConfigurationOptions.USE_FLIGHT_SQL, + ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT + .toString()))) + .setFlightSqlPort( + Integer.valueOf( + prop.getProperty( + ConfigurationOptions.FLIGHT_SQL_PORT, + ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT .toString()))); this.options = optionsBuilder.build(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 1dbb1fded..1663d4b39 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -234,6 +234,38 @@ public static String parseResponse(HttpURLConnection connection, Logger logger) } } + @VisibleForTesting + public static String parseFlightSql( + DorisReadOptions readOptions, + DorisOptions options, + PartitionDefinition partition, + Logger logger) + throws IllegalArgumentException { + String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger); + String readFields = + StringUtils.isBlank(readOptions.getReadFields()) + ? "*" + : readOptions.getReadFields(); + String sql = + "select " + + readFields + + " from `" + + tableIdentifiers[0] + + "`.`" + + tableIdentifiers[1] + + "`"; + String tablet = + partition.getTabletIds().stream() + .map(Object::toString) + .collect(Collectors.joining(",")); + sql += " TABLET(" + tablet + ") "; + if (!StringUtils.isEmpty(readOptions.getFilterQuery())) { + sql += " where " + readOptions.getFilterQuery(); + } + logger.info("Query SQL Sending to Doris FE is: '{}'.", sql); + return sql; + } + /** * parse table identifier to array. * diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java index f6594b5bc..9f7d7fc64 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java @@ -22,6 +22,9 @@ import org.apache.doris.sdk.thrift.TScanColumnDesc; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; public class SchemaUtils { @@ -46,4 +49,25 @@ public static Schema convertToSchema(List tscanColumnDescs) { ""))); return schema; } + + public static Schema convertToSchema( + Schema tableSchema, org.apache.arrow.vector.types.pojo.Schema tscanColumnDescs) { + Schema schema = new Schema(tscanColumnDescs.getFields().size()); + Map collect = + tableSchema.getProperties().stream() + .collect(Collectors.toMap(Field::getName, Function.identity())); + tscanColumnDescs + .getFields() + .forEach( + desc -> + schema.put( + new Field( + desc.getName(), + collect.get(desc.getName()).getType(), + "", + 0, + 0, + ""))); + return schema; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index 38c63b779..dee9c1fcb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -44,11 +44,13 @@ import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl; import org.apache.arrow.vector.complex.impl.UnionMapReader; import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.models.Schema; +import org.apache.doris.flink.util.FastDateUtil; import org.apache.doris.flink.util.IPUtils; import org.apache.doris.sdk.thrift.TScanBatchResult; import org.slf4j.Logger; @@ -58,6 +60,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -96,18 +99,19 @@ public void put(Object o) { private int rowCountInOneBatch = 0; private int readRowCount = 0; private final List rowBatch = new ArrayList<>(); - private final ArrowStreamReader arrowStreamReader; + private final ArrowReader arrowStreamReader; private VectorSchemaRoot root; private List fieldVectors; private RootAllocator rootAllocator; private final Schema schema; private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + private static final String DATE_PATTERN = "yyyy-MM-dd"; private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(DATETIME_PATTERN); private final DateTimeFormatter dateTimeV2Formatter = DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); - private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(DATE_PATTERN); private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault(); public List getRowBatch() { @@ -123,6 +127,43 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) { this.offsetInRowBatch = 0; } + public RowBatch(ArrowReader nextResult, Schema schema) { + this.schema = schema; + this.arrowStreamReader = nextResult; + this.offsetInRowBatch = 0; + } + + public RowBatch readFlightArrow() { + try { + this.root = arrowStreamReader.getVectorSchemaRoot(); + fieldVectors = root.getFieldVectors(); + if (fieldVectors.size() > schema.size()) { + logger.error( + "Schema size '{}' is not equal to arrow field size '{}'.", + fieldVectors.size(), + schema.size()); + throw new DorisException( + "Load Doris data failed, schema size of fetch data is wrong."); + } + if (fieldVectors.isEmpty() || root.getRowCount() == 0) { + logger.debug("One batch in arrow has no data."); + return null; + } + rowCountInOneBatch = root.getRowCount(); + for (int i = 0; i < rowCountInOneBatch; ++i) { + rowBatch.add(new RowBatch.Row(fieldVectors.size())); + } + convertArrowToRowBatch(); + readRowCount += root.getRowCount(); + return this; + } catch (DorisException e) { + logger.error("Read Doris Data failed because: ", e); + throw new DorisRuntimeException(e.getMessage()); + } catch (IOException e) { + return this; + } + } + public RowBatch readArrow() { try { this.root = arrowStreamReader.getVectorSchemaRoot(); @@ -297,6 +338,7 @@ public boolean doConvert( case "DECIMAL32": case "DECIMAL64": case "DECIMAL128I": + case "DECIMAL128": if (!minorType.equals(Types.MinorType.DECIMAL)) { return false; } @@ -320,8 +362,8 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(date.get(rowIndex)); - LocalDate localDate = LocalDate.parse(stringValue, dateFormatter); + String stringValue = new String(date.get(rowIndex), StandardCharsets.UTF_8); + LocalDate localDate = FastDateUtil.fastParseDate(stringValue, DATE_PATTERN); addValueToRow(rowIndex, localDate); } else { DateDayVector date = (DateDayVector) fieldVector; @@ -340,8 +382,11 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(varCharVector.get(rowIndex)); - LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeFormatter); + String stringValue = + new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); + stringValue = completeMilliseconds(stringValue); + LocalDateTime parse = + FastDateUtil.fastParseDateTime(stringValue, DATETIME_PATTERN); addValueToRow(rowIndex, parse); } else if (fieldVector instanceof TimeStampVector) { LocalDateTime dateTime = getDateTime(rowIndex, fieldVector); @@ -361,9 +406,11 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(varCharVector.get(rowIndex)); + String stringValue = + new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); stringValue = completeMilliseconds(stringValue); - LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter); + LocalDateTime parse = + FastDateUtil.fastParseDateTimeV2(stringValue, DATETIMEV2_PATTERN); addValueToRow(rowIndex, parse); } else if (fieldVector instanceof TimeStampVector) { LocalDateTime dateTime = getDateTime(rowIndex, fieldVector); @@ -405,7 +452,8 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(largeIntVector.get(rowIndex)); + String stringValue = + new String(largeIntVector.get(rowIndex), StandardCharsets.UTF_8); BigInteger largeInt = new BigInteger(stringValue); addValueToRow(rowIndex, largeInt); break; @@ -423,7 +471,8 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String stringValue = new String(varCharVector.get(rowIndex)); + String stringValue = + new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); addValueToRow(rowIndex, stringValue); break; case "IPV6": @@ -435,7 +484,8 @@ public boolean doConvert( addValueToRow(rowIndex, null); break; } - String ipv6Str = new String(ipv6VarcharVector.get(rowIndex)); + String ipv6Str = + new String(ipv6VarcharVector.get(rowIndex), StandardCharsets.UTF_8); String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str)); addValueToRow(rowIndex, ipv6Address); break; @@ -526,6 +576,14 @@ public static LocalDateTime longToLocalDateTime(long time) { return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID); } + /** + * use case when to replace while "Benchmark","Mode","Threads","Samples","Score","Score Error. + * (99.9%)","Unit" "CaseWhenTest", "thrpt", 1, 5, 40657433.897696, 2515802.067503,"ops/s" + * "WhileTest", "thrpt", 1, 5, 9708130.819491, 1207453.635429,"ops/s" + * + * @param stringValue + * @return + */ @VisibleForTesting public static String completeMilliseconds(String stringValue) { if (stringValue.length() == DATETIMEV2_PATTERN.length()) { @@ -536,14 +594,26 @@ public static String completeMilliseconds(String stringValue) { return stringValue; } - StringBuilder sb = new StringBuilder(stringValue); if (stringValue.length() == DATETIME_PATTERN.length()) { - sb.append("."); + stringValue += "."; } - while (sb.toString().length() < DATETIMEV2_PATTERN.length()) { - sb.append(0); + int s = DATETIMEV2_PATTERN.length() - stringValue.length(); + switch (s) { + case 1: + return stringValue + "0"; + case 2: + return stringValue + "00"; + case 3: + return stringValue + "000"; + case 4: + return stringValue + "0000"; + case 5: + return stringValue + "00000"; + case 6: + return stringValue + "000000"; + default: + return stringValue; } - return sb.toString(); } public List next() { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java new file mode 100644 index 000000000..a2b9b6327 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.source.reader; + +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.exception.ShouldNeverHappenException; +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.rest.SchemaUtils; +import org.apache.doris.flink.rest.models.Schema; +import org.apache.doris.flink.serialization.RowBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; + +public class DorisFlightValueReader extends ValueReader implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(DorisFlightValueReader.class); + protected AdbcConnection client; + protected Lock clientLock = new ReentrantLock(); + + private final PartitionDefinition partition; + private final DorisOptions options; + private final DorisReadOptions readOptions; + private AdbcStatement statement; + protected RowBatch rowBatch; + protected Schema schema; + AdbcStatement.QueryResult queryResult; + protected ArrowReader arrowReader; + protected AtomicBoolean eos = new AtomicBoolean(false); + + public DorisFlightValueReader( + PartitionDefinition partition, + DorisOptions options, + DorisReadOptions readOptions, + Schema schema) { + this.partition = partition; + this.options = options; + this.readOptions = readOptions; + this.client = openConnection(); + this.schema = schema; + init(); + } + + private void init() { + clientLock.lock(); + try { + this.statement = this.client.createStatement(); + this.statement.setSqlQuery( + RestService.parseFlightSql(readOptions, options, partition, LOG)); + this.queryResult = statement.executeQuery(); + this.arrowReader = queryResult.getReader(); + } catch (AdbcException | DorisException e) { + throw new RuntimeException(e); + } finally { + clientLock.unlock(); + } + LOG.debug("Open scan result is, schema: {}.", schema); + } + + private AdbcConnection openConnection() { + final Map parameters = new HashMap<>(); + RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FlightSqlDriver driver = new FlightSqlDriver(allocator); + String[] split = null; + try { + split = RestService.randomEndpoint(options.getFenodes(), LOG).split(":"); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Get FENode Error", e); + } + AdbcDriver.PARAM_URI.set( + parameters, + Location.forGrpcInsecure(String.valueOf(split[0]), readOptions.getFlightSqlPort()) + .getUri() + .toString()); + AdbcDriver.PARAM_USERNAME.set(parameters, options.getUsername()); + AdbcDriver.PARAM_PASSWORD.set(parameters, options.getPassword()); + try { + AdbcDatabase adbcDatabase = driver.open(parameters); + return adbcDatabase.connect(); + } catch (AdbcException e) { + LOG.debug("Open Flight Connection error: {}", e.getDetails()); + throw new RuntimeException(e); + } + } + + /** + * read data and cached in rowBatch. + * + * @return true if hax next value + */ + public boolean hasNext() { + boolean hasNext = false; + clientLock.lock(); + try { + // Arrow data was acquired synchronously during the iterative process + if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) { + if (!eos.get()) { + eos.set(!arrowReader.loadNextBatch()); + rowBatch = + new RowBatch( + arrowReader, + SchemaUtils.convertToSchema( + this.schema, + arrowReader.getVectorSchemaRoot().getSchema())) + .readFlightArrow(); + } + } + hasNext = !eos.get(); + return hasNext; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + clientLock.unlock(); + } + } + + /** + * get next value. + * + * @return next value + */ + public List next() { + if (!hasNext()) { + LOG.error(SHOULD_NOT_HAPPEN_MESSAGE); + throw new ShouldNeverHappenException(); + } + return rowBatch.next(); + } + + @Override + public void close() throws Exception { + clientLock.lock(); + try { + if (rowBatch != null) { + rowBatch.close(); + } + if (statement != null) { + statement.close(); + } + if (client != null) { + client.close(); + } + } finally { + clientLock.unlock(); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index 01777d43b..c9ed6f9ce 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -23,6 +23,7 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.source.split.DorisSourceSplit; import org.apache.doris.flink.source.split.DorisSplitRecords; import org.slf4j.Logger; @@ -41,7 +42,7 @@ public class DorisSourceSplitReader implements SplitReader splits; private final DorisOptions options; private final DorisReadOptions readOptions; - private DorisValueReader valueReader; + private ValueReader valueReader; private String currentSplitId; public DorisSourceSplitReader(DorisOptions options, DorisReadOptions readOptions) { @@ -52,7 +53,11 @@ public DorisSourceSplitReader(DorisOptions options, DorisReadOptions readOptions @Override public RecordsWithSplitIds fetch() throws IOException { - checkSplitOrStartNext(); + try { + checkSplitOrStartNext(); + } catch (DorisException e) { + throw new RuntimeException(e); + } if (!valueReader.hasNext()) { return finishSplit(); @@ -60,7 +65,7 @@ public RecordsWithSplitIds fetch() throws IOException { return DorisSplitRecords.forRecords(currentSplitId, valueReader); } - private void checkSplitOrStartNext() throws IOException { + private void checkSplitOrStartNext() throws IOException, DorisException { if (valueReader != null) { return; } @@ -70,7 +75,8 @@ private void checkSplitOrStartNext() throws IOException { } currentSplitId = nextSplit.splitId(); valueReader = - new DorisValueReader(nextSplit.getPartitionDefinition(), options, readOptions); + ValueReader.createReader( + nextSplit.getPartitionDefinition(), options, readOptions, LOG); } private DorisSplitRecords finishSplit() { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java index 098a7707d..35639e8a9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -52,7 +52,7 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; -public class DorisValueReader implements AutoCloseable { +public class DorisValueReader extends ValueReader implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DorisValueReader.class); protected BackendClient client; protected Lock clientLock = new ReentrantLock(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java new file mode 100644 index 000000000..9e4539349 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.source.reader; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.rest.RestService; +import org.slf4j.Logger; + +import java.util.List; + +public abstract class ValueReader { + + public static ValueReader createReader( + PartitionDefinition partition, + DorisOptions options, + DorisReadOptions readOptions, + Logger logger) + throws DorisException { + logger.info("create reader for partition: {}", partition); + if (readOptions.getUseFlightSql()) { + return new DorisFlightValueReader( + partition, + options, + readOptions, + RestService.getSchema(options, readOptions, logger)); + } else { + return new DorisValueReader(partition, options, readOptions); + } + } + + public abstract boolean hasNext(); + + public abstract List next(); + + public abstract void close() throws Exception; +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java index cef967624..24d10569e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java @@ -20,6 +20,7 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.doris.flink.source.reader.DorisValueReader; +import org.apache.doris.flink.source.reader.ValueReader; import javax.annotation.Nullable; @@ -34,18 +35,17 @@ public class DorisSplitRecords implements RecordsWithSplitIds { private final Set finishedSplits; - private final DorisValueReader valueReader; + private final ValueReader valueReader; private String splitId; - public DorisSplitRecords( - String splitId, DorisValueReader valueReader, Set finishedSplits) { + public DorisSplitRecords(String splitId, ValueReader valueReader, Set finishedSplits) { this.splitId = splitId; this.valueReader = valueReader; this.finishedSplits = finishedSplits; } public static DorisSplitRecords forRecords( - final String splitId, final DorisValueReader valueReader) { + final String splitId, final ValueReader valueReader) { return new DorisSplitRecords(splitId, valueReader, Collections.emptySet()); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 4b0b56c4e..02e59084c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -307,7 +307,16 @@ public class DorisConfigOptions { .booleanType() .defaultValue(false) .withDescription("Whether to use buffer cache for breakpoint resume"); - + public static final ConfigOption USE_FLIGHT_SQL = + ConfigOptions.key("source.use-flight-sql") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription("use flight sql flag"); + public static final ConfigOption FLIGHT_SQL_PORT = + ConfigOptions.key("source.flight-sql-port") + .intType() + .defaultValue(9040) + .withDescription("flight sql port"); // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index e8ac4dbf6..2559e1f04 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -52,6 +52,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE; import static org.apache.doris.flink.table.DorisConfigOptions.FENODES; +import static org.apache.doris.flink.table.DorisConfigOptions.FLIGHT_SQL_PORT; import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER; import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL; import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS; @@ -83,6 +84,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME; +import static org.apache.doris.flink.table.DorisConfigOptions.USE_FLIGHT_SQL; /** * The {@link DorisDynamicTableFactory} translates the catalog table to a table source. @@ -157,6 +159,9 @@ public Set> optionalOptions() { options.add(SOURCE_USE_OLD_API); options.add(SINK_WRITE_MODE); options.add(SINK_IGNORE_COMMIT_ERROR); + + options.add(USE_FLIGHT_SQL); + options.add(FLIGHT_SQL_PORT); return options; } @@ -216,7 +221,9 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { (int) readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS).toMillis()) .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)) - .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API)); + .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API)) + .setUseFlightSql(readableConfig.get(USE_FLIGHT_SQL)) + .setFlightSqlPort(readableConfig.get(FLIGHT_SQL_PORT)); return builder.build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java new file mode 100644 index 000000000..3c24b810e --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.util; + +import java.time.LocalDate; +import java.time.LocalDateTime; + +/** + * idea for this util is from https://bugs.openjdk.org/browse/JDK-8144808 991ms. + * LocalDateTime.parse(...) 246ms : LocalDateTime.of(...) + */ +public final class FastDateUtil { + + public static LocalDateTime fastParseDateTimeV2(String dateTime, String pattern) { + char[] arr = dateTime.toCharArray(); + int[] indexes = + new int[] { + pattern.indexOf("yyyy"), + pattern.indexOf("MM"), + pattern.indexOf("dd"), + pattern.indexOf("HH"), + pattern.indexOf("mm"), + pattern.indexOf("ss"), + pattern.indexOf("SSSSSS") + }; + int year = parseFromIndex(arr, indexes[0], indexes[0] + 4); + int month = parseFromIndex(arr, indexes[1], indexes[1] + 2); + int day = parseFromIndex(arr, indexes[2], indexes[2] + 2); + int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2); + int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2); + int second = parseFromIndex(arr, indexes[5], indexes[5] + 2); + int nanos = parseFromIndex(arr, indexes[6], indexes[6] + 6) * 1000; + return LocalDateTime.of(year, month, day, hour, minute, second, nanos); + } + + public static LocalDateTime fastParseDateTime(String dateTime, String pattern) { + char[] arr = dateTime.toCharArray(); + int[] indexes = + new int[] { + pattern.indexOf("yyyy"), + pattern.indexOf("MM"), + pattern.indexOf("dd"), + pattern.indexOf("HH"), + pattern.indexOf("mm"), + pattern.indexOf("ss") + }; + int year = parseFromIndex(arr, indexes[0], indexes[0] + 4); + int month = parseFromIndex(arr, indexes[1], indexes[1] + 2); + int day = parseFromIndex(arr, indexes[2], indexes[2] + 2); + int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2); + int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2); + int second = parseFromIndex(arr, indexes[5], indexes[5] + 2); + return LocalDateTime.of(year, month, day, hour, minute, second); + } + + public static LocalDate fastParseDate(String dateTime, String pattern) { + char[] arr = dateTime.toCharArray(); + int[] indexes = + new int[] { + pattern.indexOf("yyyy"), pattern.indexOf("MM"), pattern.indexOf("dd"), + }; + int year = parseFromIndex(arr, indexes[0], indexes[0] + 4); + int month = parseFromIndex(arr, indexes[1], indexes[1] + 2); + int day = parseFromIndex(arr, indexes[2], indexes[2] + 2); + return LocalDate.of(year, month, day); + } + + private static int parseFromIndex(char[] arr, int start, int end) { + int value = 0; + for (int i = start; i < end; i++) { + value = value * 10 + (arr[i] - '0'); + } + return value; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java new file mode 100644 index 000000000..2353bd7df --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/SchemaUtilsTest.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.rest; + +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.doris.flink.exception.DorisException; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class SchemaUtilsTest { + private static final Logger logger = LoggerFactory.getLogger(SchemaUtilsTest.class); + + @Test + public void convertToSchema() throws DorisException { + Field field1 = + new Field("field1", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Field field2 = + new Field("field2", FieldType.notNullable(new ArrowType.Int(32, true)), null); + Schema arrowSchema = new Schema(Arrays.asList(field1, field2)); + String schemaStr = + "{\"properties\":[" + + "{\"type\":\"int\",\"name\":\"field1\",\"comment\":\"\"}" + + ",{\"type\":\"int\",\"name\":\"field2\",\"comment\":\"\"}" + + "], \"status\":200}"; + org.apache.doris.flink.rest.models.Schema schema = + RestService.parseSchema(schemaStr, logger); + + org.apache.doris.flink.rest.models.Schema result = + SchemaUtils.convertToSchema(schema, arrowSchema); + + assertEquals(2, result.getProperties().size()); + assertEquals("field1", result.getProperties().get(0).getName()); + assertEquals("field2", result.getProperties().get(1).getName()); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java index 0004af05c..05a93dc5f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java @@ -65,7 +65,8 @@ public void testDorisSourceProperties() { properties.put("lookup.jdbc.read.batch.size", "16"); properties.put("lookup.jdbc.read.batch.queue-size", "16"); properties.put("lookup.jdbc.read.thread-size", "1"); - + properties.put("source.use-flight-sql", "false"); + properties.put("source.flight-sql-port", "9040"); DynamicTableSource actual = createTableSource(SCHEMA, properties); DorisOptions options = DorisOptions.builder() @@ -98,7 +99,9 @@ public void testDorisSourceProperties() { .setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) .setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) .setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT) - .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT); + .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT) + .setUseFlightSql(false) + .setFlightSqlPort(9040); DorisDynamicTableSource expected = new DorisDynamicTableSource( options, @@ -182,7 +185,9 @@ public void testDorisSinkProperties() { .setRequestConnectTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) .setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) .setRequestRetries(DORIS_REQUEST_RETRIES_DEFAULT) - .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT); + .setRequestTabletSize(DORIS_TABLET_SIZE_DEFAULT) + .setUseFlightSql(false) + .setFlightSqlPort(9040); DorisDynamicTableSink expected = new DorisDynamicTableSink( options, diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java new file mode 100644 index 000000000..a89e50a05 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FastDateUtilTest.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.utils; + +import org.apache.doris.flink.util.FastDateUtil; +import org.junit.jupiter.api.Test; + +import java.time.LocalDate; +import java.time.LocalDateTime; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class FastDateUtilTest { + + @Test + void fastParseDateTimeV2_withValidDateTimeAndPattern_returnsCorrectLocalDateTime() { + String dateTime = "2023-10-05 14:30:45.123456"; + String pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + LocalDateTime result = FastDateUtil.fastParseDateTimeV2(dateTime, pattern); + assertEquals(LocalDateTime.of(2023, 10, 5, 14, 30, 45, 123456000), result); + } + + @Test + void fastParseDateTime_withValidDateTimeAndPattern_returnsCorrectLocalDateTime() { + String dateTime = "2023-10-05 14:30:45"; + String pattern = "yyyy-MM-dd HH:mm:ss"; + LocalDateTime result = FastDateUtil.fastParseDateTime(dateTime, pattern); + assertEquals(LocalDateTime.of(2023, 10, 5, 14, 30, 45), result); + } + + @Test + void fastParseDate_withValidDateAndPattern_returnsCorrectLocalDate() { + String dateTime = "2023-10-05"; + String pattern = "yyyy-MM-dd"; + LocalDate result = FastDateUtil.fastParseDate(dateTime, pattern); + assertEquals(LocalDate.of(2023, 10, 5), result); + } +}