Skip to content

Commit

Permalink
solve filter timestamp; add pk field is null
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jul 12, 2024
1 parent 45cf036 commit 01773e7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand Down Expand Up @@ -262,10 +264,10 @@ public static Object convertJavaObject(DataType literalType, Object o) {
return (int) ChronoUnit.DAYS.between(epochDay, localDate);
case TIME_WITHOUT_TIME_ZONE:
LocalTime localTime;
if (o instanceof java.sql.Time) {
localTime = ((java.sql.Time) o).toLocalTime();
} else if (o instanceof java.time.LocalTime) {
localTime = (java.time.LocalTime) o;
if (o instanceof Time) {
localTime = ((Time) o).toLocalTime();
} else if (o instanceof LocalTime) {
localTime = (LocalTime) o;
} else {
throw new UnsupportedOperationException(
"Unexpected time literal of class " + o.getClass().getName());
Expand All @@ -278,6 +280,16 @@ public static Object convertJavaObject(DataType literalType, Object o) {
int scale = decimalType.getScale();
return Decimal.fromBigDecimal((BigDecimal) o, precision, scale);
case TIMESTAMP_WITHOUT_TIME_ZONE:
Timestamp ts;
System.out.println(ZoneId.systemDefault());
if (o instanceof Instant) {
Instant instant = (Instant) o;
LocalDateTime localDateTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime();
ts = Timestamp.fromLocalDateTime(localDateTime);
} else {
throw new UnsupportedOperationException("Unsupported object: " + o);
}
return ts;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
Timestamp timestamp;
if (o instanceof java.sql.Timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -171,6 +172,25 @@ public void testTimestamp() {
assertThat(instantExpression).isEqualTo(rawExpression);
}

@Test
public void testInstant() {
// Asia/Shanghai
ZoneId zoneId = ZoneId.systemDefault();
System.out.println(zoneId);
java.sql.Timestamp timestamp = java.sql.Timestamp.valueOf("2024-04-11 11:01:00");

// 2024-04-11 11:01:00.0
System.out.println(timestamp);
Instant instant1 = timestamp.toInstant();

// // 2024-04-11T03:01:00Z
System.out.println(instant1);

LocalDateTime localDateTime1 = instant1.atZone(ZoneId.systemDefault()).toLocalDateTime();
// 2024-04-11T11:01
System.out.println(localDateTime1);
}

@Test
public void testDate() {
RowType rowType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,98 @@
package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.execution.SparkPlanner

/**
* @author askwang
* @date 2024/7/6
*/
class AskwangSQLQueryTest extends PaimonSparkTestBase{

test("select timestamp field by where for append table") {
test("sql query with filter timestamp") {
withTable("tb") {
spark.conf.set("spark.sql.planChangeLog.level", "INFO")
spark.sql("SET TIME ZONE 'Asia/Shanghai';")
// spark.conf.set("spark.sql.planChangeLog.level", "INFO")
spark.conf.set("spark.sql.datetime.java8API.enabled", "true")
println("version: " + sparkVersion)
spark.sql(s"CREATE TABLE tb (id INT, dt TIMESTAMP) using paimon TBLPROPERTIES ('file.format'='parquet')")
val ds = sql("INSERT INTO `tb` VALUES (1,cast(\"2024-04-11 11:01:00\" as Timestamp))")
val data = sql("SELECT * FROM `tb` where dt ='2024-04-11 11:01:00' ")
println(data.queryExecution)
println(spark.conf.get("spark.sql.session.timeZone"))
println(data.show())
println("=====")
println(data.explain(true))
}
}

// not
test("writ pk table with pk null int type") {
withTable("tb") {
spark.conf.set("spark.sql.datetime.java8API.enabled", "false")
spark.sql(s"CREATE TABLE tb (id INT, dt string) " +
s"using paimon " +
s"TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1')")
val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as int),cast(NULL as string))")
val data = sql("SELECT * FROM `tb`")
println(data.show())
}
}

// ok
test("writ pk table with pk null string type") {
withTable("tb") {
spark.conf.set("spark.sql.datetime.java8API.enabled", "false")
spark.sql(s"CREATE TABLE tb (id string, dt string) " +
s"using paimon " +
s"TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1')")
val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as string),cast(NULL as string))")
val data = sql("SELECT * FROM `tb`")
println(data.show())
}
}

// not ok
test("writ pk table with pk null long type") {
withTable("tb") {
spark.conf.set("spark.sql.datetime.java8API.enabled", "false")
spark.sql(s"CREATE TABLE tb (id long, dt string) " +
s"using paimon " +
s"TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1')")
// val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as long),cast(NULL as string))")

val query2 = "INSERT INTO `tb` VALUES (cast(NULL as long),cast(NULL as string))"
val query = "INSERT INTO `tb` VALUES (NULL, NULL)"

explainPlan(query, spark)
}
}

def explainPlan(query: String, spark: SparkSession) = {
val (parser, analyzer, optimizer, planner) = analysisEntry(spark)
val parsedPlan = parser.parsePlan(query)
val analyzedPlan = analyzer.execute(parsedPlan)
val optimizedPlan = optimizer.execute(analyzedPlan)
val sparkPlan = planner.plan(optimizedPlan).next()
println("[askwang] ================parsedPlan===================")
println(parsedPlan)
println("[askwang] ================analyzedPlan===================")
println(analyzedPlan)
println("[askwang] ================optimizedPlan===================")
println(optimizedPlan)
println("[askwang] ================sparkPlan===================")
println(sparkPlan)
}

def analysisEntry(spark: SparkSession): (ParserInterface, Analyzer, Optimizer, SparkPlanner) = {
val parser = spark.sessionState.sqlParser
val analyzer = spark.sessionState.analyzer
val optimizer = spark.sessionState.optimizer
val planner = spark.sessionState.planner
(parser, analyzer, optimizer, planner)
}


}

0 comments on commit 01773e7

Please sign in to comment.