From 95cb544b3d856f1beb07b51a5704f634840b6fa3 Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Tue, 2 Jul 2024 21:44:24 +0800 Subject: [PATCH] [Enhancement](s3-load) Add domain connection and aksk correction check for S3 load (#36711) Add domain connection and aksk correction check for S3 load before actual execution. --- .../org/apache/doris/analysis/LoadStmt.java | 88 +++++++++- .../property/constants/S3Properties.java | 1 + ...ain_connection_and_ak_sk_correction.groovy | 161 ++++++++++++++++++ 3 files changed, 241 insertions(+), 9 deletions(-) create mode 100644 regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index d8d515fe6a49a1..1990078b46c4dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -21,10 +21,14 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB; import org.apache.doris.cloud.security.SecurityChecker; +import org.apache.doris.cloud.storage.RemoteBase; +import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; @@ -500,7 +504,7 @@ public void analyze(Analyzer analyzer) throws UserException { } } else if (brokerDesc != null) { etlJobType = EtlJobType.BROKER; - checkWhiteList(); + checkS3Param(); } else if (isMysqlLoad) { etlJobType = EtlJobType.LOCAL_FILE; } else { @@ -518,6 +522,26 @@ public void analyze(Analyzer analyzer) throws UserException { user = ConnectContext.get().getQualifiedUser(); } + + private String getProviderFromEndpoint() { + Map properties = brokerDesc.getProperties(); + for (Map.Entry entry : properties.entrySet()) { + if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) { + return entry.getValue(); + } + } + return S3Properties.S3_PROVIDER; + } + + private String getBucketFromFilePath(String filePath) throws Exception { + String[] parts = filePath.split("\\/\\/"); + if (parts.length < 2) { + throw new Exception("filePath is not valid"); + } + String buckt = parts[1].split("\\/")[0]; + return buckt; + } + public String getComment() { return comment; } @@ -597,7 +621,7 @@ private void checkEndpoint(String endpoint) throws UserException { } } - public void checkWhiteList() throws UserException { + public void checkS3Param() throws UserException { Map brokerDescProperties = brokerDesc.getProperties(); if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT) && brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY) @@ -606,17 +630,63 @@ public void checkWhiteList() throws UserException { String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT); endpoint = endpoint.replaceFirst("^http://", ""); endpoint = endpoint.replaceFirst("^https://", ""); - List whiteList = new ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list)); - whiteList.removeIf(String::isEmpty); - if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) { - throw new UserException("endpoint: " + endpoint - + " is not in s3 load endpoint white list: " + String.join(",", whiteList)); - } brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint); - if (AzureProperties.checkAzureProviderPropertyExist(properties)) { + checkWhiteList(endpoint); + if (AzureProperties.checkAzureProviderPropertyExist(brokerDescProperties)) { return; } checkEndpoint(endpoint); + checkAkSk(); + } + } + + public void checkWhiteList(String endpoint) throws UserException { + List whiteList = new ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list)); + whiteList.removeIf(String::isEmpty); + if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) { + throw new UserException("endpoint: " + endpoint + + " is not in s3 load endpoint white list: " + String.join(",", whiteList)); } } + + private void checkAkSk() throws UserException { + RemoteBase remote = null; + ObjectInfo objectInfo = null; + try { + Map brokerDescProperties = brokerDesc.getProperties(); + String provider = getProviderFromEndpoint(); + for (DataDescription dataDescription : dataDescriptions) { + for (String filePath : dataDescription.getFilePaths()) { + String bucket = getBucketFromFilePath(filePath); + objectInfo = new ObjectInfo(ObjectStoreInfoPB.Provider.valueOf(provider.toUpperCase()), + brokerDescProperties.get(S3Properties.Env.ACCESS_KEY), + brokerDescProperties.get(S3Properties.Env.SECRET_KEY), + bucket, brokerDescProperties.get(S3Properties.Env.ENDPOINT), + brokerDescProperties.get(S3Properties.Env.REGION), ""); + remote = RemoteBase.newInstance(objectInfo); + // RemoteBase#headObject does not throw exception if key does not exist. + remote.headObject("1"); + remote.listObjects(null); + remote.close(); + } + } + } catch (Exception e) { + LOG.warn("Failed check object info={}", objectInfo, e); + String message = e.getMessage(); + if (message != null) { + int index = message.indexOf("Error message="); + if (index != -1) { + message = message.substring(index); + } + } + throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR, + "Incorrect object storage info, " + message); + } finally { + if (remote != null) { + remote.close(); + } + } + + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java index d1b3b17e2da307..a0ef74c7a9659a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java @@ -58,6 +58,7 @@ public class S3Properties extends BaseProperties { public static final String MAX_CONNECTIONS = "s3.connection.maximum"; public static final String REQUEST_TIMEOUT_MS = "s3.connection.request.timeout"; public static final String CONNECTION_TIMEOUT_MS = "s3.connection.timeout"; + public static final String S3_PROVIDER = "S3"; // required by storage policy public static final String ROOT_PATH = "s3.root.path"; diff --git a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy new file mode 100644 index 00000000000000..889da246d3b129 --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_domain_connection_and_ak_sk_correction", "load_p0") { + // create table + def tableName = 'test_domain_connection_and_ak_sk_correction' + def tableNameOrders = 'test_domain_connection_and_ak_sk_correction_orders' + sql """ DROP TABLE IF EXISTS ${tableName} FORCE""" + sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + P_PARTKEY INTEGER NOT NULL, + P_NAME VARCHAR(55) NOT NULL, + P_MFGR CHAR(25) NOT NULL, + P_BRAND CHAR(10) NOT NULL, + P_TYPE VARCHAR(25) NOT NULL, + P_SIZE INTEGER NOT NULL, + P_CONTAINER CHAR(10) NOT NULL, + P_RETAILPRICE DECIMAL(15,2) NOT NULL, + P_COMMENT VARCHAR(23) NOT NULL + ) + DUPLICATE KEY(P_PARTKEY, P_NAME) + DISTRIBUTED BY HASH(P_PARTKEY) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableNameOrders} ( + O_ORDERKEY INTEGER NOT NULL, + O_CUSTKEY INTEGER NOT NULL, + O_ORDERSTATUS CHAR(1) NOT NULL, + O_TOTALPRICE DECIMAL(15,2) NOT NULL, + O_ORDERDATE DATE NOT NULL, + O_ORDERPRIORITY CHAR(15) NOT NULL, + O_CLERK CHAR(15) NOT NULL, + O_SHIPPRIORITY INTEGER NOT NULL, + O_COMMENT VARCHAR(79) NOT NULL + ) + DUPLICATE KEY(O_ORDERKEY, O_CUSTKEY) + DISTRIBUTED BY HASH(O_ORDERKEY) BUCKETS 32 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + + def label = UUID.randomUUID().toString().replace("-", "") + def result = sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "|" + (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp) + ) + WITH S3 + ( + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_REGION" = "${getS3Region()}" + ); + """ + logger.info("the first sql result is {}", result) + + label = UUID.randomUUID().toString().replace("-", "") + try { + result = sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "|" + (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp) + ) + WITH S3 + ( + "AWS_ENDPOINT" = "${getS3Endpoint()}1", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_REGION" = "${getS3Region()}" + ); + """ + logger.info("the second sql result is {}", result) + assertTrue(false. "The endpoint is wrong, so the connection test should fale") + } catch (Exception e) { + logger.info("the second sql exception result is {}", e.getMessage()) + assertTrue(e.getMessage().contains("Incorrect object storage info"), e.getMessage()) + } + + label = UUID.randomUUID().toString().replace("-", "") + try { + result = sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "|" + (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp) + ) + WITH S3 + ( + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_ACCESS_KEY" = "${getS3AK()}1", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_REGION" = "${getS3Region()}" + ); + """ + logger.info("the third sql result is {}", result) + assertTrue(false. "AK is wrong, so the correction of AKSK test should fale") + } catch (Exception e) { + logger.info("the third sql exception result is {}", e.getMessage()) + assertTrue(e.getMessage().contains("Incorrect object storage info"), e.getMessage()) + } + + label = UUID.randomUUID().toString().replace("-", "") + try { + result = sql """ + LOAD LABEL ${label} + ( + DATA INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "|" + (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp), + DATA INFILE("s3://${getS3BucketName()}1/regression/tpch/sf1/orders.tbl.1", "s3://${getS3BucketName()}/regression/tpch/sf1/orders.tbl.2") + INTO TABLE ${tableNameOrders} + COLUMNS TERMINATED BY "|" + (o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, temp) + ) + WITH S3 + ( + "AWS_ENDPOINT" = "${getS3Endpoint()}", + "AWS_ACCESS_KEY" = "${getS3AK()}", + "AWS_SECRET_KEY" = "${getS3SK()}", + "AWS_REGION" = "${getS3Region()}" + ); + """ + logger.info("the fourth sql result is {}", result) + assertTrue(false. "in the second DATA INFILE, the first bucket is wrong, so the sql should fail") + } catch (Exception e) { + logger.info("the fourth sql exception result is {}", e.getMessage()) + assertTrue(e.getMessage().contains("Incorrect object storage info"), e.getMessage()) + } + sql """ DROP TABLE IF EXISTS ${tableName} FORCE""" + sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE""" +} \ No newline at end of file