From 678821f59d3d98d140077b4145c61f5193e823b8 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 30 Nov 2023 09:55:11 +0800 Subject: [PATCH] [Bug] Fix type overflow bug (#249) Co-authored-by: wudi <> --- .../java/org/apache/doris/flink/catalog/DorisTypeMapper.java | 5 +++-- .../main/java/org/apache/doris/flink/rest/RestService.java | 4 ++-- .../apache/doris/flink/sink/schema/SchemaChangeManager.java | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index fda3c93c8..9c99b4f9b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -124,8 +124,9 @@ public String visit(CharType charType) { @Override public String visit(VarCharType varCharType) { - int length = varCharType.getLength(); - return length * 4 > 65533 ? STRING : String.format("%s(%s)", VARCHAR, length * 4); + //Flink varchar length max value is int, it may overflow after multiplying by 4 + long length = varCharType.getLength(); + return length * 4 >= 65533 ? STRING : String.format("%s(%s)", VARCHAR, length * 4); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index cd022097e..219806e5b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -458,10 +458,10 @@ public static Schema getSchema(DorisOptions options, DorisReadOptions readOption public static boolean isUniqueKeyType(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisRuntimeException { - //Enable 2pc in multi-table scenario + //disable 2pc in multi-table scenario if(StringUtils.isBlank(options.getTableIdentifier())){ logger.info("table model verification is skipped in multi-table scenarios."); - return false; + return true; } try { return UNIQUE_KEYS_TYPE.equals(getSchema(options, readOptions, logger).getKeysType()); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index d9c3345d5..296fb2f68 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -120,6 +120,7 @@ public boolean execute(String ddl, String database) throws IOException, IllegalA if(StringUtils.isNullOrWhitespaceOnly(ddl)){ return false; } + LOG.info("Execute SQL: {}", ddl); Map param = new HashMap<>(); param.put("stmt", ddl); String requestUrl = String.format(SCHEMA_CHANGE_API,