From 2bd1c8fb05aa99a6490b158e3f7fb137f43d8814 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 26 Dec 2024 18:29:29 +0800 Subject: [PATCH] [Feature] support catalog table read by arrowflightsql (#530) --- .../doris/flink/rest/PartitionDefinition.java | 5 +- .../apache/doris/flink/rest/RestService.java | 68 ++++++++----------- .../doris/flink/source/DorisSource.java | 28 ++++++-- .../source/reader/DorisFlightValueReader.java | 58 ++++++++++++++-- .../flink/source/reader/ValueReader.java | 11 +-- .../flink/source/split/DorisSourceSplit.java | 2 +- .../doris/flink/rest/TestRestService.java | 3 + 7 files changed, 111 insertions(+), 64 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java index 9f2bd07dd..b747b110e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java @@ -27,7 +27,6 @@ public class PartitionDefinition implements Serializable, Comparable { private final String database; private final String table; - private final String beAddress; private final Set tabletIds; private final String queryPlan; @@ -42,6 +41,10 @@ public PartitionDefinition( this.queryPlan = queryPlan; } + public static PartitionDefinition emptyPartition(String table) { + return new PartitionDefinition("", table, "", new HashSet<>(), ""); + } + public String getBeAddress() { return beAddress; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 459fbe70d..75c1e27d3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.rest; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; @@ -92,6 +93,7 @@ public class RestService implements Serializable { private static final String FE_LOGIN = "/rest/v1/login"; private static final ObjectMapper objectMapper = new ObjectMapper(); private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema"; + private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema"; private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan"; /** @@ -234,38 +236,6 @@ public static String parseResponse(HttpURLConnection connection, Logger logger) } } - @VisibleForTesting - public static String parseFlightSql( - DorisReadOptions readOptions, - DorisOptions options, - PartitionDefinition partition, - Logger logger) - throws IllegalArgumentException { - String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger); - String readFields = - StringUtils.isBlank(readOptions.getReadFields()) - ? "*" - : readOptions.getReadFields(); - String sql = - "select " - + readFields - + " from `" - + tableIdentifiers[0] - + "`.`" - + tableIdentifiers[1] - + "`"; - String tablet = - partition.getTabletIds().stream() - .map(Object::toString) - .collect(Collectors.joining(",")); - sql += " TABLET(" + tablet + ") "; - if (!StringUtils.isEmpty(readOptions.getFilterQuery())) { - sql += " where " + readOptions.getFilterQuery(); - } - logger.info("Query SQL Sending to Doris FE is: '{}'.", sql); - return sql; - } - /** * parse table identifier to array. * @@ -275,7 +245,7 @@ public static String parseFlightSql( * @throws IllegalArgumentException table identifier is illegal */ @VisibleForTesting - static String[] parseIdentifier(String tableIdentifier, Logger logger) + public static String[] parseIdentifier(String tableIdentifier, Logger logger) throws IllegalArgumentException { logger.trace("Parse identifier '{}'.", tableIdentifier); if (StringUtils.isEmpty(tableIdentifier)) { @@ -283,7 +253,8 @@ static String[] parseIdentifier(String tableIdentifier, Logger logger) throw new IllegalArgumentException("table.identifier", tableIdentifier); } String[] identifier = tableIdentifier.split("\\."); - if (identifier.length != 2) { + // db.table or catalog.db.table + if (identifier.length != 2 && identifier.length != 3) { logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier); throw new IllegalArgumentException("table.identifier", tableIdentifier); } @@ -426,12 +397,26 @@ public static Schema getSchema( throws DorisException { logger.trace("Finding schema."); String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger); - String tableSchemaUri = - String.format( - TABLE_SCHEMA_API, - randomEndpoint(options.getFenodes(), logger), - tableIdentifier[0], - tableIdentifier[1]); + String tableSchemaUri; + if (tableIdentifier.length == 2) { + tableSchemaUri = + String.format( + TABLE_SCHEMA_API, + randomEndpoint(options.getFenodes(), logger), + tableIdentifier[0], + tableIdentifier[1]); + } else if (tableIdentifier.length == 3) { + tableSchemaUri = + String.format( + CATALOG_TABLE_SCHEMA_API, + randomEndpoint(options.getFenodes(), logger), + tableIdentifier[0], + tableIdentifier[1], + tableIdentifier[2]); + } else { + throw new IllegalArgumentException( + "table identifier is illegal, should be db.table or catalog.db.table"); + } HttpGet httpGet = new HttpGet(tableSchemaUri); String response = send(options, readOptions, httpGet, logger); logger.debug("Find schema response is '{}'.", response); @@ -561,6 +546,8 @@ public static List findPartitions( DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException { String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger); + Preconditions.checkArgument( + tableIdentifiers.length == 2, "table identifier is illegal, should be db.table"); String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" @@ -578,6 +565,7 @@ public static List findPartitions( } logger.info("Query SQL Sending to Doris FE is: '{}'.", sql); String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger); + String queryPlanUri = String.format( QUERY_PLAN_API, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java index 5cdd406ce..00e08e46b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -56,6 +57,7 @@ public class DorisSource ResultTypeQueryable { private static final Logger LOG = LoggerFactory.getLogger(DorisSource.class); + private static final String SINGLE_SPLIT = "SingleSplit"; private final DorisOptions options; private final DorisReadOptions readOptions; @@ -95,13 +97,27 @@ public SourceReader createReader(SourceReaderContext read public SplitEnumerator createEnumerator( SplitEnumeratorContext context) throws Exception { List dorisSourceSplits = new ArrayList<>(); - List partitions = - RestService.findPartitions(options, readOptions, LOG); - for (int index = 0; index < partitions.size(); index++) { - PartitionDefinition partitionDef = partitions.get(index); - String splitId = partitionDef.getBeAddress() + "_" + index; - dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef)); + String[] tableIdentifiers = RestService.parseIdentifier(options.getTableIdentifier(), LOG); + + if (tableIdentifiers.length == 2) { + List partitions = + RestService.findPartitions(options, readOptions, LOG); + for (int index = 0; index < partitions.size(); index++) { + PartitionDefinition partitionDef = partitions.get(index); + String splitId = partitionDef.getBeAddress() + "_" + index; + dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef)); + } + } else { + Preconditions.checkArgument( + readOptions.getUseFlightSql(), + "UseFlightSql must be true when table.identifier is catalog.db.table"); + // catalog query or customer query + dorisSourceSplits.add( + new DorisSourceSplit( + SINGLE_SPLIT, + PartitionDefinition.emptyPartition(options.getTableIdentifier()))); } + DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(dorisSourceSplits); return new DorisSourceEnumerator(context, splitAssigner); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java index a2b9b6327..fbf050501 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisFlightValueReader.java @@ -26,6 +26,8 @@ import org.apache.arrow.flight.Location; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisException; @@ -39,17 +41,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; public class DorisFlightValueReader extends ValueReader implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DorisFlightValueReader.class); + private static final String PREFIX = "/* ApplicationName=Flink ArrowFlightSQL Query */"; + protected AdbcConnection client; protected Lock clientLock = new ReentrantLock(); @@ -64,15 +70,12 @@ public class DorisFlightValueReader extends ValueReader implements AutoCloseable protected AtomicBoolean eos = new AtomicBoolean(false); public DorisFlightValueReader( - PartitionDefinition partition, - DorisOptions options, - DorisReadOptions readOptions, - Schema schema) { + PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) { this.partition = partition; this.options = options; this.readOptions = readOptions; + initSchema(); this.client = openConnection(); - this.schema = schema; init(); } @@ -80,8 +83,7 @@ private void init() { clientLock.lock(); try { this.statement = this.client.createStatement(); - this.statement.setSqlQuery( - RestService.parseFlightSql(readOptions, options, partition, LOG)); + this.statement.setSqlQuery(parseFlightSql(readOptions, options, partition, LOG)); this.queryResult = statement.executeQuery(); this.arrowReader = queryResult.getReader(); } catch (AdbcException | DorisException e) { @@ -92,6 +94,48 @@ private void init() { LOG.debug("Open scan result is, schema: {}.", schema); } + private void initSchema() { + try { + this.schema = RestService.getSchema(options, readOptions, LOG); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private String parseFlightSql( + DorisReadOptions readOptions, + DorisOptions options, + PartitionDefinition partition, + Logger logger) + throws IllegalArgumentException { + String[] tableIdentifiers = + RestService.parseIdentifier(options.getTableIdentifier(), logger); + String readFields = + StringUtils.isBlank(readOptions.getReadFields()) + ? "*" + : readOptions.getReadFields(); + + String queryTable = + Arrays.stream(tableIdentifiers) + .map(v -> "`" + v + "`") + .collect(Collectors.joining(".")); + + String sql = PREFIX + " SELECT " + readFields + " FROM " + queryTable; + if (CollectionUtils.isNotEmpty(partition.getTabletIds())) { + String tablet = + partition.getTabletIds().stream() + .map(Object::toString) + .collect(Collectors.joining(",")); + sql += " TABLET(" + tablet + ") "; + } + + if (!StringUtils.isEmpty(readOptions.getFilterQuery())) { + sql += " WHERE " + readOptions.getFilterQuery(); + } + logger.info("Query SQL Sending to Doris FE is: '{}'.", sql); + return sql; + } + private AdbcConnection openConnection() { final Map parameters = new HashMap<>(); RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java index 9e1d091eb..cd99f20c7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java @@ -19,9 +19,7 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.PartitionDefinition; -import org.apache.doris.flink.rest.RestService; import org.slf4j.Logger; import java.util.List; @@ -32,15 +30,10 @@ public static ValueReader createReader( PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions, - Logger logger) - throws DorisException { + Logger logger) { logger.info("create reader for partition: {}", partition.toStringWithoutPlan()); if (readOptions.getUseFlightSql()) { - return new DorisFlightValueReader( - partition, - options, - readOptions, - RestService.getSchema(options, readOptions, logger)); + return new DorisFlightValueReader(partition, options, readOptions); } else { return new DorisValueReader(partition, options, readOptions); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java index f80d4165b..7d5138c82 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java @@ -53,7 +53,7 @@ public PartitionDefinition getPartitionDefinition() { @Override public String toString() { return String.format( - "DorisSourceSplit: %s.%s,id=%s,be=%s,tablets=%s", + "DorisSourceSplit: database=%s,table=%s,id=%s,be=%s,tablets=%s", partitionDefinition.getDatabase(), partitionDefinition.getTable(), id, diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java index ef59ef418..42487acf8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java @@ -114,6 +114,9 @@ public void testParseIdentifierIllegalEmpty() throws IllegalArgumentException { @Test public void testParseIdentifierIllegal() throws Exception { String invalidIdentifier3 = "a.b.c"; + RestService.parseIdentifier(invalidIdentifier3, logger); + + invalidIdentifier3 = "a.b.c.d.e"; thrown.expect(IllegalArgumentException.class); thrown.expectMessage( "argument 'table.identifier' is illegal, value is '" + invalidIdentifier3 + "'.");