From c1451f1d03866baddcc3e442b1086c8ca6b7c24e Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 7 Mar 2023 14:12:01 +0800 Subject: [PATCH] Add support to trino 409+ --- pom.xml | 8 ++++++++ .../flink/table/store/trino/TrinoFilterConverter.java | 3 +-- .../flink/table/store/trino/TrinoPageSourceBase.java | 7 ------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 83b012c3..15944cb6 100644 --- a/pom.xml +++ b/pom.xml @@ -209,6 +209,14 @@ under the License. 17 + + trino-409 + + 409 + 388 + 17 + + diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoFilterConverter.java b/src/main/java/org/apache/flink/table/store/trino/TrinoFilterConverter.java index 46c62de1..d39794f1 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoFilterConverter.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoFilterConverter.java @@ -34,7 +34,6 @@ import io.trino.spi.type.BooleanType; import io.trino.spi.type.DateType; import io.trino.spi.type.DecimalType; -import io.trino.spi.type.Decimals; import io.trino.spi.type.DoubleType; import io.trino.spi.type.IntegerType; import io.trino.spi.type.LongTimestampWithTimeZone; @@ -242,7 +241,7 @@ private Object getLiteralValue(Type type, Object trinoNativeValue) { if (type instanceof DecimalType) { DecimalType decimalType = (DecimalType) type; BigDecimal bigDecimal; - if (Decimals.isShortDecimal(decimalType)) { + if (trinoNativeValue instanceof Long) { bigDecimal = BigDecimal.valueOf((long) trinoNativeValue) .movePointLeft(decimalType.getScale()); diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceBase.java b/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceBase.java index 1eb246ad..c73845ff 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceBase.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceBase.java @@ -27,8 +27,6 @@ import org.apache.flink.table.store.file.utils.RecordReader; import org.apache.flink.table.store.file.utils.RecordReader.RecordIterator; import org.apache.flink.table.store.utils.RowDataUtils; -import org.apache.flink.table.types.logical.BinaryType; -import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.LogicalType; import io.airlift.slice.Slice; @@ -54,15 +52,12 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; -import java.util.Map; import static io.airlift.slice.Slices.wrappedBuffer; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.Decimals.encodeShortScaledValue; -import static io.trino.spi.type.Decimals.isLongDecimal; -import static io.trino.spi.type.Decimals.isShortDecimal; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.LongTimestampWithTimeZone.fromEpochMillisAndFraction; import static io.trino.spi.type.TimeType.TIME_MICROS; @@ -164,7 +159,6 @@ private void appendTo(Type type, LogicalType logicalType, Object value, BlockBui } else if (type.equals(INTEGER)) { type.writeLong(output, ((Number) value).intValue()); } else if (type instanceof DecimalType) { - verify(isShortDecimal(type), "The type should be short decimal"); DecimalType decimalType = (DecimalType) type; BigDecimal decimal = ((DecimalData) value).toBigDecimal(); type.writeLong(output, encodeShortScaledValue(decimal, decimalType.getScale())); @@ -214,7 +208,6 @@ private static void writeSlice(BlockBuilder output, Type type, Object value) { private static void writeObject(BlockBuilder output, Type type, Object value) { if (type instanceof DecimalType) { - verify(isLongDecimal(type), "The type should be long decimal"); DecimalType decimalType = (DecimalType) type; BigDecimal decimal = ((DecimalData) value).toBigDecimal(); type.writeObject(output, Decimals.encodeScaledValue(decimal, decimalType.getScale()));