Skip to content

Commit

Permalink
[Fix] Fix flink sql projection pushdown error (apache#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Jul 15, 2024
1 parent 2b595a7 commit 38026fd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
readOptions.setFilterQuery(filterQuery);
}
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
readOptions.setReadFields(
Arrays.stream(selectFields)
.map(item -> String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
}

if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
try {
Expand Down Expand Up @@ -199,14 +208,11 @@ public boolean supportsNestedProjection() {
@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
this.readOptions.setReadFields(
Arrays.stream(selectFields)
.map(item -> String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
}
String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
this.readOptions.setReadFields(
Arrays.stream(selectFields)
.map(item -> String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class DorisSourceITCase extends DorisTestBase {
static final String TABLE_READ_TBL = "tbl_read_tbl";
static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options";
static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";

@Test
public void testSource() throws Exception {
Expand Down Expand Up @@ -231,6 +232,41 @@ public void testTableSourceAllOptions() throws Exception {
Assert.assertArrayEquals(expected, actual.toArray());
}

@Test
public void testTableSourceFilterAndProjectionPushDown() throws Exception {
initializeTable(TABLE_READ_TBL_PUSH_DOWN);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sourceDDL =
String.format(
"CREATE TABLE doris_source ("
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN,
USERNAME,
PASSWORD);
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("SELECT age FROM doris_source where age = '18'");

List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
actual.add(iterator.next().toString());
}
}
String[] expected = new String[] {"+I[18]"};
Assert.assertArrayEquals(expected, actual.toArray());
}

private void initializeTable(String table) throws Exception {
try (Connection connection =
DriverManager.getConnection(
Expand Down

0 comments on commit 38026fd

Please sign in to comment.