Skip to content

Commit

Permalink
support catalog table read
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Dec 26, 2024
1 parent b7b5802 commit 29a4954
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
public class PartitionDefinition implements Serializable, Comparable<PartitionDefinition> {
private final String database;
private final String table;

private final String beAddress;
private final Set<Long> tabletIds;
private final String queryPlan;
Expand All @@ -42,6 +41,10 @@ public PartitionDefinition(
this.queryPlan = queryPlan;
}

public static PartitionDefinition emptyPartition(String table) {
return new PartitionDefinition("", table, "", new HashSet<>(), "");
}

public String getBeAddress() {
return beAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.rest;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class RestService implements Serializable {
private static final String FE_LOGIN = "/rest/v1/login";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema";
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";

/**
Expand Down Expand Up @@ -234,38 +236,6 @@ public static String parseResponse(HttpURLConnection connection, Logger logger)
}
}

@VisibleForTesting
public static String parseFlightSql(
DorisReadOptions readOptions,
DorisOptions options,
PartitionDefinition partition,
Logger logger)
throws IllegalArgumentException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
: readOptions.getReadFields();
String sql =
"select "
+ readFields
+ " from `"
+ tableIdentifiers[0]
+ "`.`"
+ tableIdentifiers[1]
+ "`";
String tablet =
partition.getTabletIds().stream()
.map(Object::toString)
.collect(Collectors.joining(","));
sql += " TABLET(" + tablet + ") ";
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " where " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}

/**
* parse table identifier to array.
*
Expand All @@ -275,15 +245,16 @@ public static String parseFlightSql(
* @throws IllegalArgumentException table identifier is illegal
*/
@VisibleForTesting
static String[] parseIdentifier(String tableIdentifier, Logger logger)
public static String[] parseIdentifier(String tableIdentifier, Logger logger)
throws IllegalArgumentException {
logger.trace("Parse identifier '{}'.", tableIdentifier);
if (StringUtils.isEmpty(tableIdentifier)) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
throw new IllegalArgumentException("table.identifier", tableIdentifier);
}
String[] identifier = tableIdentifier.split("\\.");
if (identifier.length != 2) {
// db.table or catalog.db.table
if (identifier.length != 2 && identifier.length != 3) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
throw new IllegalArgumentException("table.identifier", tableIdentifier);
}
Expand Down Expand Up @@ -426,12 +397,26 @@ public static Schema getSchema(
throws DorisException {
logger.trace("Finding schema.");
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);
String tableSchemaUri =
String.format(
TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1]);
String tableSchemaUri;
if (tableIdentifier.length == 2) {
tableSchemaUri =
String.format(
TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1]);
} else if (tableIdentifier.length == 3) {
tableSchemaUri =
String.format(
CATALOG_TABLE_SCHEMA_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1],
tableIdentifier[2]);
} else {
throw new IllegalArgumentException(
"table identifier is illegal, should be db.table or catalog.db.table");
}
HttpGet httpGet = new HttpGet(tableSchemaUri);
String response = send(options, readOptions, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
Expand Down Expand Up @@ -561,6 +546,8 @@ public static List<PartitionDefinition> findPartitions(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
Preconditions.checkArgument(
tableIdentifiers.length == 2, "table identifier is illegal, should be db.table");
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
Expand All @@ -578,6 +565,7 @@ public static List<PartitionDefinition> findPartitions(
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);

String queryPlanUri =
String.format(
QUERY_PLAN_API,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class DorisSource<OUT>
ResultTypeQueryable<OUT> {

private static final Logger LOG = LoggerFactory.getLogger(DorisSource.class);
private static final String SINGLE_SPLIT = "SingleSplit";

private final DorisOptions options;
private final DorisReadOptions readOptions;
Expand Down Expand Up @@ -95,13 +97,27 @@ public SourceReader<OUT, DorisSourceSplit> createReader(SourceReaderContext read
public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> createEnumerator(
SplitEnumeratorContext<DorisSourceSplit> context) throws Exception {
List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
for (int index = 0; index < partitions.size(); index++) {
PartitionDefinition partitionDef = partitions.get(index);
String splitId = partitionDef.getBeAddress() + "_" + index;
dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef));
String[] tableIdentifiers = RestService.parseIdentifier(options.getTableIdentifier(), LOG);

if (tableIdentifiers.length == 2) {
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
for (int index = 0; index < partitions.size(); index++) {
PartitionDefinition partitionDef = partitions.get(index);
String splitId = partitionDef.getBeAddress() + "_" + index;
dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef));
}
} else {
Preconditions.checkArgument(
readOptions.getUseFlightSql(),
"UseFlightSql must be true when table.identifier is catalog.db.table");
// catalog query or customer query
dorisSourceSplits.add(
new DorisSourceSplit(
SINGLE_SPLIT,
PartitionDefinition.emptyPartition(options.getTableIdentifier())));
}

DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(dorisSourceSplits);
return new DorisSourceEnumerator(context, splitAssigner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
Expand All @@ -39,12 +41,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;

Expand All @@ -64,24 +68,20 @@ public class DorisFlightValueReader extends ValueReader implements AutoCloseable
protected AtomicBoolean eos = new AtomicBoolean(false);

public DorisFlightValueReader(
PartitionDefinition partition,
DorisOptions options,
DorisReadOptions readOptions,
Schema schema) {
PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) {
this.partition = partition;
this.options = options;
this.readOptions = readOptions;
initSchema();
this.client = openConnection();
this.schema = schema;
init();
}

private void init() {
clientLock.lock();
try {
this.statement = this.client.createStatement();
this.statement.setSqlQuery(
RestService.parseFlightSql(readOptions, options, partition, LOG));
this.statement.setSqlQuery(parseFlightSql(readOptions, options, partition, LOG));
this.queryResult = statement.executeQuery();
this.arrowReader = queryResult.getReader();
} catch (AdbcException | DorisException e) {
Expand All @@ -92,6 +92,47 @@ private void init() {
LOG.debug("Open scan result is, schema: {}.", schema);
}

private void initSchema() {
try {
this.schema = RestService.getSchema(options, readOptions, LOG);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

private String parseFlightSql(
DorisReadOptions readOptions,
DorisOptions options,
PartitionDefinition partition,
Logger logger)
throws IllegalArgumentException {
String[] tableIdentifiers =
RestService.parseIdentifier(options.getTableIdentifier(), logger);
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
: readOptions.getReadFields();

String queryTable =
Arrays.stream(tableIdentifiers)
.map(v -> "`" + v + "`")
.collect(Collectors.joining("."));
String sql = "SELECT " + readFields + " FROM " + queryTable;
if (CollectionUtils.isNotEmpty(partition.getTabletIds())) {
String tablet =
partition.getTabletIds().stream()
.map(Object::toString)
.collect(Collectors.joining(","));
sql += " TABLET(" + tablet + ") ";
}

if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " WHERE " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}

private AdbcConnection openConnection() {
final Map<String, Object> parameters = new HashMap<>();
RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.PartitionDefinition;
import org.apache.doris.flink.rest.RestService;
import org.slf4j.Logger;

import java.util.List;
Expand All @@ -32,15 +30,10 @@ public static ValueReader createReader(
PartitionDefinition partition,
DorisOptions options,
DorisReadOptions readOptions,
Logger logger)
throws DorisException {
Logger logger) {
logger.info("create reader for partition: {}", partition.toStringWithoutPlan());
if (readOptions.getUseFlightSql()) {
return new DorisFlightValueReader(
partition,
options,
readOptions,
RestService.getSchema(options, readOptions, logger));
return new DorisFlightValueReader(partition, options, readOptions);
} else {
return new DorisValueReader(partition, options, readOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public PartitionDefinition getPartitionDefinition() {
@Override
public String toString() {
return String.format(
"DorisSourceSplit: %s.%s,id=%s,be=%s,tablets=%s",
"DorisSourceSplit: database=%s,table=%s,id=%s,be=%s,tablets=%s",
partitionDefinition.getDatabase(),
partitionDefinition.getTable(),
id,
Expand Down

0 comments on commit 29a4954

Please sign in to comment.