diff --git a/src/main/java/org/apache/flink/table/store/trino/FieldNameUtils.java b/src/main/java/org/apache/flink/table/store/trino/FieldNameUtils.java new file mode 100644 index 00000000..89d86a22 --- /dev/null +++ b/src/main/java/org/apache/flink/table/store/trino/FieldNameUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.trino; + +import org.apache.flink.table.types.logical.RowType; + + +import java.util.List; +import java.util.stream.Collectors; + +public class FieldNameUtils { + + public static List fieldNames(RowType rowType) { + return rowType.getFields().stream() + .map(RowType.RowField::getName) + .map(String::toLowerCase) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoFilterConverter.java b/src/main/java/org/apache/flink/table/store/trino/TrinoFilterConverter.java index 8fd0a5b6..46c62de1 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoFilterConverter.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoFilterConverter.java @@ -84,10 +84,11 @@ public Optional convert(TupleDomain tupleDomain) { Map domainMap = tupleDomain.getDomains().get(); List conjuncts = new ArrayList<>(); + List fieldNames = FieldNameUtils.fieldNames(rowType); for (Map.Entry entry : domainMap.entrySet()) { TrinoColumnHandle columnHandle = entry.getKey(); Domain domain = entry.getValue(); - int index = rowType.getFieldNames().indexOf(columnHandle.getColumnName()); + int index = fieldNames.indexOf(columnHandle.getColumnName()); if (index != -1) { try { conjuncts.add(toPredicate(index, columnHandle.getTrinoType(), domain)); diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceProvider.java b/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceProvider.java index b923a038..451b8515 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceProvider.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceProvider.java @@ -57,7 +57,7 @@ private ConnectorPageSource createPageSource(TrinoTableHandle tableHandle, Trino Table table = tableHandle.table(); TableRead read = table.newRead(); RowType rowType = table.rowType(); - List fieldNames = rowType.getFieldNames(); + List fieldNames = FieldNameUtils.fieldNames(rowType); List projectedFields = columns.stream() .map(TrinoColumnHandle.class::cast) diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoTableHandle.java b/src/main/java/org/apache/flink/table/store/trino/TrinoTableHandle.java index 0ab43626..e37a7b19 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoTableHandle.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoTableHandle.java @@ -19,6 +19,7 @@ package org.apache.flink.table.store.trino; import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.InstantiationUtil; import com.fasterxml.jackson.annotation.JsonCreator; @@ -137,10 +138,10 @@ public List columnMetadatas() { } public TrinoColumnHandle columnHandle(String field) { - int index = table().rowType().getFieldNames().indexOf(field); + List fieldNames = FieldNameUtils.fieldNames(table().rowType()); + int index = fieldNames.indexOf(field); if (index == -1) { - throw new RuntimeException(String.format("Cannot find field %s in schema %s", - field, table().rowType().getFieldNames())); + throw new RuntimeException(String.format("Cannot find field %s in schema %s", field, fieldNames)); } return TrinoColumnHandle.of(field, table().rowType().getTypeAt(index)); } diff --git a/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java b/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java index 12c51cd0..63673d88 100644 --- a/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java +++ b/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java @@ -158,7 +158,8 @@ private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exc Arrays.asList( new RowType.RowField("a", new IntType()), new RowType.RowField("b", new BigIntType()), - new RowType.RowField("c", new VarCharType()), + // test field name has upper case + new RowType.RowField("aCa", new VarCharType()), new RowType.RowField("d", new CharType(1)) )); return new SimpleTableTestHelper(tablePath, rowType); @@ -172,7 +173,7 @@ public void testComplexTypes() { @Test public void testProjection() { assertThat(sql("SELECT * FROM tablestore.default.t1")).isEqualTo("[[1, 2, 1, 1], [5, 6, 3, 3]]"); - assertThat(sql("SELECT a, c FROM tablestore.default.t1")).isEqualTo("[[1, 1], [5, 3]]"); + assertThat(sql("SELECT a, aCa FROM tablestore.default.t1")).isEqualTo("[[1, 1], [5, 3]]"); assertThat(sql("SELECT SUM(b) FROM tablestore.default.t1")).isEqualTo("[[8]]"); } @@ -183,7 +184,7 @@ public void testSystemTable() { @Test public void testFilter() { - assertThat(sql("SELECT a, c FROM tablestore.default.t2 WHERE a < 4")) + assertThat(sql("SELECT a, aCa FROM tablestore.default.t2 WHERE a < 4")) .isEqualTo("[[1, 1], [3, 2]]"); }