Skip to content

Commit

Permalink
[fix](source)fix timestamp format push down error (apache#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Dec 17, 2024
1 parent b7b5802 commit c4eed25
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,17 @@
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.StringUtils;

import org.apache.doris.flink.exception.DorisRuntimeException;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

public class DorisExpressionVisitor implements ExpressionVisitor<String> {
private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(DATETIME_PATTERN);
DateTimeFormatter dateTimev2Formatter = DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);

@Override
public String visit(CallExpression call) {
Expand Down Expand Up @@ -94,11 +102,47 @@ private String combineLeftExpression(String operator, ResolvedExpression operand
@Override
public String visit(ValueLiteralExpression valueLiteral) {
LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot();
if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
|| typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
|| typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
|| typeRoot.equals(LogicalTypeRoot.DATE)) {
return "'" + valueLiteral + "'";

switch (typeRoot) {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITH_TIME_ZONE:
case DATE:
return "'" + valueLiteral + "'";
case TIMESTAMP_WITHOUT_TIME_ZONE:
Class<?> conversionClass = valueLiteral.getOutputDataType().getConversionClass();
if (LocalDateTime.class.isAssignableFrom(conversionClass)) {
try {
LocalDateTime localDateTime =
valueLiteral
.getValueAs(LocalDateTime.class)
.orElseThrow(
() ->
new RuntimeException(
"Failed to get LocalDateTime value"));
int nano = localDateTime.getNano();
if (nano == 0) {
// if nanoseconds equals to zero, the timestamp is in seconds.
return wrapWithQuotes(localDateTime.format(dateTimeFormatter));
} else {
// 1. Even though the datetime precision in Doris is set to 3, the
// microseconds format such as "yyyy-MM-dd HH:mm:ss.SSSSSS" can still
// function properly in the Doris query plan.
// 2. If the timestamp is in nanoseconds, format it like 'yyyy-MM-dd
// HH:mm:ss.SSSSSS'. This will have no impact on the result. Because
// when parsing the imported DATETIME type data on the BE side (for
// example, through Stream load, Spark load, etc.), or when using the FE
// side with Nereids enabled, the decimals that exceed the current
// precision will be rounded.
return wrapWithQuotes(localDateTime.format(dateTimev2Formatter));
}

} catch (Exception e) {
throw new DorisRuntimeException(e.getMessage());
}
}
break;
default:
return valueLiteral.toString();
}
return valueLiteral.toString();
}
Expand All @@ -117,4 +161,8 @@ public String visit(TypeLiteralExpression typeLiteral) {
public String visit(Expression expression) {
return null;
}

private static String wrapWithQuotes(String value) {
return "'" + value + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
Expand Down Expand Up @@ -59,6 +60,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
private static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
private static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options";
private static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
private static final String TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN =
"tbl_read_tbl_timestamp_push_down";
private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
"tbl_read_tbl_push_down_with_union_all";
static final String TABLE_CSV_JM = "tbl_csv_jm_source";
Expand Down Expand Up @@ -311,6 +314,138 @@ public void testTableSourceFilterAndProjectionPushDown() throws Exception {
"testTableSourceFilterAndProjectionPushDown", expected, actual.toArray());
}

@Test
public void testTableSourceTimestampFilterAndProjectionPushDown() throws Exception {
initializeTimestampTable(TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

String sourceDDL =
String.format(
"CREATE TABLE doris_source_datetime_filter_and_projection_push_down ("
+ "`id` int ,\n"
+ "`name` timestamp,\n"
+ "`age` int,\n"
+ "`birthday` timestamp,\n"
+ "`brilliant_time` timestamp(6)\n"
+ ") WITH ("
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN,
getDorisUsername(),
getDorisPassword());
tEnv.executeSql(sourceDDL);

List<String> actualProjectionResult =
generateExecuteSQLResult(
tEnv,
"SELECT id,birthday,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down order by id");

List<String> actualPushDownDatetimeResult =
generateExecuteSQLResult(
tEnv,
"SELECT id,birthday FROM doris_source_datetime_filter_and_projection_push_down where birthday >= '2023-01-01 00:00:00' order by id");
List<String> actualPushDownMicrosecondResult =
generateExecuteSQLResult(
tEnv,
"SELECT id,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down where brilliant_time > '2023-01-01 00:00:00.000001' order by id");
List<String> actualPushDownNanosecondResult =
generateExecuteSQLResult(
tEnv,
"SELECT id,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down where brilliant_time > '2023-01-01 00:00:00.000009001' order by id");

List<String> actualPushDownNanosecondRoundDownResult =
generateExecuteSQLResult(
tEnv,
"SELECT id,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down where brilliant_time >= '2023-01-01 00:00:00.999999001' order by id");
List<String> actualPushDownNanosecondRoundUpResult =
generateExecuteSQLResult(
tEnv,
"SELECT id,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down where brilliant_time >= '2023-01-01 00:00:00.999999999' order by id");

String[] expectedProjectionResult =
new String[] {
"+I[1, 2023-01-01T00:00, 2023-01-01T00:00:00.000001]",
"+I[2, 2023-01-01T00:00:01, 2023-01-01T00:00:00.005]",
"+I[3, 2023-01-01T00:00:02, 2023-01-01T00:00:00.000009]",
"+I[4, 2023-01-01T00:00:02, 2023-01-01T00:00:00.999999]",
"+I[5, 2023-01-01T00:00:02, 2023-01-01T00:00:00.999999]",
"+I[6, 2023-01-01T00:00:02, 2023-01-01T00:00:01]"
};
String[] expectedPushDownDatetimeResult =
new String[] {
"+I[1, 2023-01-01T00:00]",
"+I[2, 2023-01-01T00:00:01]",
"+I[3, 2023-01-01T00:00:02]",
"+I[4, 2023-01-01T00:00:02]",
"+I[5, 2023-01-01T00:00:02]",
"+I[6, 2023-01-01T00:00:02]"
};
String[] expectedPushDownWithMicrosecondResult =
new String[] {
"+I[2, 2023-01-01T00:00:00.005]",
"+I[3, 2023-01-01T00:00:00.000009]",
"+I[4, 2023-01-01T00:00:00.999999]",
"+I[5, 2023-01-01T00:00:00.999999]",
"+I[6, 2023-01-01T00:00:01]"
};

String[] expectedPushDownWithNanosecondResult =
new String[] {
"+I[2, 2023-01-01T00:00:00.005]",
"+I[4, 2023-01-01T00:00:00.999999]",
"+I[5, 2023-01-01T00:00:00.999999]",
"+I[6, 2023-01-01T00:00:01]"
};

String[] expectedPushDownWithNanosecondRoundDownResult =
new String[] {
"+I[4, 2023-01-01T00:00:00.999999]",
"+I[5, 2023-01-01T00:00:00.999999]",
"+I[6, 2023-01-01T00:00:01]"
};

String[] expectedPushDownWithNanosecondRoundUpResult =
new String[] {
"+I[4, 2023-01-01T00:00:00.999999]",
"+I[5, 2023-01-01T00:00:00.999999]",
"+I[6, 2023-01-01T00:00:01]"
};
checkResultInAnyOrder(
"testTableSourceTimestampFilterAndProjectionPushDown",
expectedProjectionResult,
actualProjectionResult.toArray());
checkResultInAnyOrder(
"testTableSourceTimestampFilterAndProjectionPushDown",
expectedPushDownDatetimeResult,
actualPushDownDatetimeResult.toArray());
checkResultInAnyOrder(
"testTableSourceTimestampFilterAndProjectionPushDown",
expectedPushDownWithMicrosecondResult,
actualPushDownMicrosecondResult.toArray());
checkResultInAnyOrder(
"testTableSourceTimestampFilterAndProjectionPushDown",
expectedPushDownWithNanosecondResult,
actualPushDownNanosecondResult.toArray());
checkResultInAnyOrder(
"testTableSourceTimestampFilterAndProjectionPushDown",
expectedPushDownWithNanosecondRoundDownResult,
actualPushDownNanosecondRoundDownResult.toArray());
checkResultInAnyOrder(
"testTableSourceTimestampFilterAndProjectionPushDown",
expectedPushDownWithNanosecondRoundUpResult,
actualPushDownNanosecondRoundUpResult.toArray());
}

@Test
public void testTableSourceFilterWithUnionAll() throws Exception {
LOG.info("starting to execute testTableSourceFilterWithUnionAll case.");
Expand Down Expand Up @@ -566,6 +701,44 @@ private void initializeTable(String table) {
String.format("insert into %s.%s values ('apache',12)", DATABASE, table));
}

private void initializeTimestampTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
String.format(
"CREATE TABLE %s.%s ( \n"
+ "`id` int,\n"
+ "`name` varchar(256),\n"
+ "`age` int,\n"
+ "`birthday` datetime,\n"
+ "`brilliant_time` datetime(6),\n"
+ ") DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
DATABASE, table),
String.format(
"insert into %s.%s values (1,'Kevin',54,'2023-01-01T00:00:00','2023-01-01T00:00:00.000001')",
DATABASE, table),
String.format(
"insert into %s.%s values (2,'Dylan',25,'2023-01-01T00:00:01','2023-01-01T00:00:00.005000')",
DATABASE, table),
String.format(
"insert into %s.%s values (3,'Darren',65,'2023-01-01T00:00:02','2023-01-01T00:00:00.000009')",
DATABASE, table),
String.format(
"insert into %s.%s values (4,'Warren',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999')",
DATABASE, table),
String.format(
"insert into %s.%s values (5,'Simba',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999001')",
DATABASE, table),
String.format(
"insert into %s.%s values (6,'Jimmy',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999999')",
DATABASE, table));
}

private void initializeTableWithData(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
Expand Down Expand Up @@ -610,4 +783,17 @@ private static List<String> fetchRows(Iterator<Row> iter) {
}
return rows;
}

private List<String> generateExecuteSQLResult(StreamTableEnvironment tEnv, String executeSql)
throws Exception {
List<String> actualResultList = new ArrayList<>();
TableResult tableResult = tEnv.executeSql(executeSql);
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {

actualResultList.add(iterator.next().toString());
}
}
return actualResultList;
}
}

0 comments on commit c4eed25

Please sign in to comment.