Skip to content

Commit

Permalink
[Feature] support reader doris using arrow flight driver (apache#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
MaoMiMao authored Aug 12, 2024
1 parent 1869a7d commit eb90905
Show file tree
Hide file tree
Showing 18 changed files with 687 additions and 44 deletions.
21 changes: 12 additions & 9 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ under the License.
<flink.sql.cdc.version>3.1.1</flink.sql.cdc.version>
<flink.python.id>flink-python</flink.python.id>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>13.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
Expand All @@ -95,6 +94,8 @@ under the License.
<jsqlparser.version>4.9</jsqlparser.version>
<mysql.driver.version>8.0.26</mysql.driver.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
<arrow.version>15.0.2</arrow.version>
<adbc.version>0.12.0</adbc.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -179,13 +180,16 @@ under the License.
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>

<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
Expand All @@ -206,7 +210,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -410,13 +413,13 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<version>3.4.1</version>
<configuration>
<relocations>
<relocation>
<pattern>org.apache.arrow</pattern>
<shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>
</relocation>
<!-- <relocation>-->
<!-- <pattern>org.apache.arrow</pattern>-->
<!-- <shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>-->
<!-- </relocation>-->
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.doris.shaded.io.netty</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ public interface ConfigurationOptions {
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;

String USE_FLIGHT_SQL = "source.use-flight-sql";
Boolean USE_FLIGHT_SQL_DEFAULT = false;

String FLIGHT_SQL_PORT = "source.flight-sql-port";
Integer FLIGHT_SQL_PORT_DEFAULT = 9040;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class DorisReadOptions implements Serializable {
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
private boolean useOldApi;
private boolean useFlightSql;
private Integer flightSqlPort;

public DorisReadOptions(
String readFields,
Expand All @@ -50,7 +52,9 @@ public DorisReadOptions(
Long execMemLimit,
Integer deserializeQueueSize,
Boolean deserializeArrowAsync,
boolean useOldApi) {
boolean useOldApi,
boolean useFlightSql,
Integer flightSqlPort) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
Expand All @@ -63,6 +67,8 @@ public DorisReadOptions(
this.deserializeQueueSize = deserializeQueueSize;
this.deserializeArrowAsync = deserializeArrowAsync;
this.useOldApi = useOldApi;
this.useFlightSql = useFlightSql;
this.flightSqlPort = flightSqlPort;
}

public String getReadFields() {
Expand Down Expand Up @@ -121,6 +127,14 @@ public void setFilterQuery(String filterQuery) {
this.filterQuery = filterQuery;
}

public boolean getUseFlightSql() {
return useFlightSql;
}

public Integer getFlightSqlPort() {
return flightSqlPort;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -149,7 +163,9 @@ public boolean equals(Object o) {
&& Objects.equals(requestBatchSize, that.requestBatchSize)
&& Objects.equals(execMemLimit, that.execMemLimit)
&& Objects.equals(deserializeQueueSize, that.deserializeQueueSize)
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync);
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync)
&& Objects.equals(useFlightSql, that.useFlightSql)
&& Objects.equals(flightSqlPort, that.flightSqlPort);
}

@Override
Expand All @@ -166,7 +182,9 @@ public int hashCode() {
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi);
useOldApi,
useFlightSql,
flightSqlPort);
}

/** Builder of {@link DorisReadOptions}. */
Expand All @@ -184,6 +202,8 @@ public static class Builder {
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
private Boolean useOldApi = false;
private Boolean useFlightSql = false;
private Integer flightSqlPort;

public Builder setReadFields(String readFields) {
this.readFields = readFields;
Expand Down Expand Up @@ -240,11 +260,21 @@ public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) {
return this;
}

public Builder setUseOldApi(boolean useOldApi) {
public Builder setUseFlightSql(Boolean useFlightSql) {
this.useFlightSql = useFlightSql;
return this;
}

public Builder setUseOldApi(Boolean useOldApi) {
this.useOldApi = useOldApi;
return this;
}

public Builder setFlightSqlPort(Integer flightSqlPort) {
this.flightSqlPort = flightSqlPort;
return this;
}

public DorisReadOptions build() {
return new DorisReadOptions(
readFields,
Expand All @@ -258,7 +288,9 @@ public DorisReadOptions build() {
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi);
useOldApi,
useFlightSql,
flightSqlPort);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ private void init() {
prop.getProperty(
ConfigurationOptions.DORIS_TABLET_SIZE,
ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT
.toString())))
.setUseFlightSql(
Boolean.valueOf(
prop.getProperty(
ConfigurationOptions.USE_FLIGHT_SQL,
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT
.toString())))
.setFlightSqlPort(
Integer.valueOf(
prop.getProperty(
ConfigurationOptions.FLIGHT_SQL_PORT,
ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT
.toString())));

this.options = optionsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,38 @@ public static String parseResponse(HttpURLConnection connection, Logger logger)
}
}

@VisibleForTesting
public static String parseFlightSql(
DorisReadOptions readOptions,
DorisOptions options,
PartitionDefinition partition,
Logger logger)
throws IllegalArgumentException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
: readOptions.getReadFields();
String sql =
"select "
+ readFields
+ " from `"
+ tableIdentifiers[0]
+ "`.`"
+ tableIdentifiers[1]
+ "`";
String tablet =
partition.getTabletIds().stream()
.map(Object::toString)
.collect(Collectors.joining(","));
sql += " TABLET(" + tablet + ") ";
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " where " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}

/**
* parse table identifier to array.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.doris.sdk.thrift.TScanColumnDesc;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class SchemaUtils {

Expand All @@ -46,4 +49,25 @@ public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs) {
"")));
return schema;
}

public static Schema convertToSchema(
Schema tableSchema, org.apache.arrow.vector.types.pojo.Schema tscanColumnDescs) {
Schema schema = new Schema(tscanColumnDescs.getFields().size());
Map<String, Field> collect =
tableSchema.getProperties().stream()
.collect(Collectors.toMap(Field::getName, Function.identity()));
tscanColumnDescs
.getFields()
.forEach(
desc ->
schema.put(
new Field(
desc.getName(),
collect.get(desc.getName()).getType(),
"",
0,
0,
"")));
return schema;
}
}
Loading

0 comments on commit eb90905

Please sign in to comment.