diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceBase.java b/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceBase.java index e7c24547..1eb246ad 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceBase.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceBase.java @@ -27,6 +27,8 @@ import org.apache.flink.table.store.file.utils.RecordReader; import org.apache.flink.table.store.file.utils.RecordReader.RecordIterator; import org.apache.flink.table.store.utils.RowDataUtils; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.LogicalType; import io.airlift.slice.Slice; @@ -200,7 +202,7 @@ private void appendTo(Type type, LogicalType logicalType, Object value, BlockBui } private static void writeSlice(BlockBuilder output, Type type, Object value) { - if (type instanceof VarcharType) { + if (type instanceof VarcharType || type instanceof io.trino.spi.type.CharType) { type.writeSlice(output, wrappedBuffer(((BinaryStringData) value).toBytes())); } else if (type instanceof VarbinaryType) { type.writeSlice(output, wrappedBuffer((byte[]) value)); diff --git a/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java b/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java index 4f4c1dab..71394d27 100644 --- a/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java +++ b/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.StringData; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.UpdateSchema; import org.apache.flink.table.store.table.FileStoreTable; @@ -29,6 +28,7 @@ import org.apache.flink.table.store.table.sink.TableCommit; import org.apache.flink.table.store.table.sink.TableWrite; import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; @@ -50,6 +50,7 @@ import static io.airlift.testing.Closeables.closeAllSuppress; import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.apache.flink.table.data.StringData.fromString; import static org.assertj.core.api.Assertions.assertThat; /** ITCase for trino connector. */ @@ -65,19 +66,19 @@ protected QueryRunner createQueryRunner() throws Exception { // flink sink Path tablePath1 = new Path(warehouse, DB + ".db/t1"); SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1); - testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); - testHelper1.write(GenericRowData.of(3, 4L, StringData.fromString("2"))); - testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); - testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2"))); + testHelper1.write(GenericRowData.of(1, 2L, fromString("1"), fromString("1"))); + testHelper1.write(GenericRowData.of(3, 4L, fromString("2"), fromString("2"))); + testHelper1.write(GenericRowData.of(5, 6L, fromString("3"), fromString("3"))); + testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), fromString("2"))); testHelper1.commit(); Path tablePath2 = new Path(warehouse, "default.db/t2"); SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2); - testHelper2.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); - testHelper2.write(GenericRowData.of(3, 4L, StringData.fromString("2"))); + testHelper2.write(GenericRowData.of(1, 2L, fromString("1"), fromString("1"))); + testHelper2.write(GenericRowData.of(3, 4L, fromString("2"), fromString("2"))); testHelper2.commit(); - testHelper2.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); - testHelper2.write(GenericRowData.of(7, 8L, StringData.fromString("4"))); + testHelper2.write(GenericRowData.of(5, 6L, fromString("3"), fromString("3"))); + testHelper2.write(GenericRowData.of(7, 8L, fromString("4"), fromString("4"))); testHelper2.commit(); { @@ -101,9 +102,9 @@ protected QueryRunner createQueryRunner() throws Exception { FileStoreTable table = FileStoreTableFactory.create(tablePath3); TableWrite writer = table.newWrite("user"); TableCommit commit = table.newCommit("user"); - writer.write(GenericRowData.of(StringData.fromString("1"), 1, 1L, 1L, 1)); - writer.write(GenericRowData.of(StringData.fromString("1"), 1, 2L, 2L, 2)); - writer.write(GenericRowData.of(StringData.fromString("2"), 3, 3L, 3L, 3)); + writer.write(GenericRowData.of(fromString("1"), 1, 1L, 1L, 1)); + writer.write(GenericRowData.of(fromString("1"), 1, 2L, 2L, 2)); + writer.write(GenericRowData.of(fromString("2"), 3, 3L, 3L, 3)); commit.commit(0, writer.prepareCommit(true, 0)); } @@ -128,7 +129,7 @@ protected QueryRunner createQueryRunner() throws Exception { TableCommit commit = table.newCommit("user"); writer.write(GenericRowData.of(1, new GenericMapData(new HashMap<>() { { - put(StringData.fromString("1"), StringData.fromString("2")); + put(fromString("1"), fromString("2")); } }))); commit.commit(0, writer.prepareCommit(true, 0)); @@ -157,7 +158,9 @@ private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exc Arrays.asList( new RowType.RowField("a", new IntType()), new RowType.RowField("b", new BigIntType()), - new RowType.RowField("c", new VarCharType()))); + new RowType.RowField("c", new VarCharType()), + new RowType.RowField("d", new CharType(1)) + )); return new SimpleTableTestHelper(tablePath, rowType); } @@ -168,7 +171,7 @@ public void testComplexTypes() { @Test public void testProjection() { - assertThat(sql("SELECT * FROM tablestore.default.t1")).isEqualTo("[[1, 2, 1], [5, 6, 3]]"); + assertThat(sql("SELECT * FROM tablestore.default.t1")).isEqualTo("[[1, 2, 1, 1], [5, 6, 3, 3]]"); assertThat(sql("SELECT a, c FROM tablestore.default.t1")).isEqualTo("[[1, 1], [5, 3]]"); assertThat(sql("SELECT SUM(b) FROM tablestore.default.t1")).isEqualTo("[[8]]"); }