From cd5059297dfa84b3b06d3a919ba3f5c3a452d885 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 12 Jan 2024 10:20:07 +0800 Subject: [PATCH] add array map json type for flink catalog --- .../doris/flink/catalog/DorisCatalog.java | 1 - .../doris/flink/catalog/DorisTypeMapper.java | 10 ++++ .../doris/flink/catalog/doris/DorisType.java | 3 ++ .../doris/flink/catalog/CatalogExample.java | 50 +++++++++++++++++++ 4 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java index 99ca0a4f5..96518d338 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java @@ -283,7 +283,6 @@ private Schema createTableSchema(String databaseName, String tableName) { String columnType = resultSet.getString("DATA_TYPE"); long columnSize = resultSet.getLong("COLUMN_SIZE"); long columnDigit = resultSet.getLong("DECIMAL_DIGITS"); - DataType flinkType = DorisTypeMapper.toFlinkType( columnName, columnType, (int) columnSize, (int) columnDigit); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index c38870022..cc5fe4b8a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -40,6 +40,7 @@ import org.apache.doris.flink.catalog.doris.DorisType; +import static org.apache.doris.flink.catalog.doris.DorisType.ARRAY; import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT; import static org.apache.doris.flink.catalog.doris.DorisType.BOOLEAN; import static org.apache.doris.flink.catalog.doris.DorisType.CHAR; @@ -52,10 +53,13 @@ import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE; import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT; import static org.apache.doris.flink.catalog.doris.DorisType.INT; +import static org.apache.doris.flink.catalog.doris.DorisType.JSON; import static org.apache.doris.flink.catalog.doris.DorisType.JSONB; import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT; +import static org.apache.doris.flink.catalog.doris.DorisType.MAP; import static org.apache.doris.flink.catalog.doris.DorisType.SMALLINT; import static org.apache.doris.flink.catalog.doris.DorisType.STRING; +import static org.apache.doris.flink.catalog.doris.DorisType.STRUCT; import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT; import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR; @@ -101,6 +105,12 @@ public static DataType toFlinkType( case LARGEINT: case STRING: case JSONB: + case JSON: + // Currently, the subtype of the generic cannot be obtained, + // so it is mapped to string + case ARRAY: + case MAP: + case STRUCT: return DataTypes.STRING(); case DATE: case DATE_V2: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java index d2423202b..3779143fa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java @@ -39,4 +39,7 @@ public class DorisType { public static final String BITMAP = "BITMAP"; public static final String ARRAY = "ARRAY"; public static final String JSONB = "JSONB"; + public static final String JSON = "JSON"; + public static final String MAP = "MAP"; + public static final String STRUCT = "STRUCT"; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java new file mode 100644 index 000000000..1d7cf1df2 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.catalog; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +public class CatalogExample { + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + tEnv.executeSql( + "CREATE CATALOG doris_catalog WITH(\n" + + "'type' = 'doris',\n" + + "'default-database' = 'test',\n" + + "'username' = 'root',\n" + + "'password' = '',\n" + + "'fenodes' = '1127.0.0.1:8030',\n" + + "'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n" + + "'sink.label-prefix' = 'label'\n" + + ")"); + // define a dynamic aggregating query + final Table result = tEnv.sqlQuery("SELECT * from doris_catalog.test.type_test"); + + // print the result to the console + tEnv.toRetractStream(result, Row.class).print(); + env.execute(); + } +}