Skip to content

Commit

Permalink
[Enhancement](s3-load) Add domain connection and aksk correction chec…
Browse files Browse the repository at this point in the history
…k for S3 load (apache#36711)

Add domain connection and aksk correction check for S3 load before
actual execution.
  • Loading branch information
liaoxin01 authored Jul 2, 2024
1 parent 6969ad0 commit 95cb544
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 9 deletions.
88 changes: 79 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB;
import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.cloud.storage.RemoteBase;
import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -500,7 +504,7 @@ public void analyze(Analyzer analyzer) throws UserException {
}
} else if (brokerDesc != null) {
etlJobType = EtlJobType.BROKER;
checkWhiteList();
checkS3Param();
} else if (isMysqlLoad) {
etlJobType = EtlJobType.LOCAL_FILE;
} else {
Expand All @@ -518,6 +522,26 @@ public void analyze(Analyzer analyzer) throws UserException {
user = ConnectContext.get().getQualifiedUser();
}


private String getProviderFromEndpoint() {
Map<String, String> properties = brokerDesc.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) {
return entry.getValue();
}
}
return S3Properties.S3_PROVIDER;
}

private String getBucketFromFilePath(String filePath) throws Exception {
String[] parts = filePath.split("\\/\\/");
if (parts.length < 2) {
throw new Exception("filePath is not valid");
}
String buckt = parts[1].split("\\/")[0];
return buckt;
}

public String getComment() {
return comment;
}
Expand Down Expand Up @@ -597,7 +621,7 @@ private void checkEndpoint(String endpoint) throws UserException {
}
}

public void checkWhiteList() throws UserException {
public void checkS3Param() throws UserException {
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT)
&& brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY)
Expand All @@ -606,17 +630,63 @@ public void checkWhiteList() throws UserException {
String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT);
endpoint = endpoint.replaceFirst("^http://", "");
endpoint = endpoint.replaceFirst("^https://", "");
List<String> whiteList = new ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
whiteList.removeIf(String::isEmpty);
if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
throw new UserException("endpoint: " + endpoint
+ " is not in s3 load endpoint white list: " + String.join(",", whiteList));
}
brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
checkWhiteList(endpoint);
if (AzureProperties.checkAzureProviderPropertyExist(brokerDescProperties)) {
return;
}
checkEndpoint(endpoint);
checkAkSk();
}
}

public void checkWhiteList(String endpoint) throws UserException {
List<String> whiteList = new ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
whiteList.removeIf(String::isEmpty);
if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
throw new UserException("endpoint: " + endpoint
+ " is not in s3 load endpoint white list: " + String.join(",", whiteList));
}
}

private void checkAkSk() throws UserException {
RemoteBase remote = null;
ObjectInfo objectInfo = null;
try {
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
String provider = getProviderFromEndpoint();
for (DataDescription dataDescription : dataDescriptions) {
for (String filePath : dataDescription.getFilePaths()) {
String bucket = getBucketFromFilePath(filePath);
objectInfo = new ObjectInfo(ObjectStoreInfoPB.Provider.valueOf(provider.toUpperCase()),
brokerDescProperties.get(S3Properties.Env.ACCESS_KEY),
brokerDescProperties.get(S3Properties.Env.SECRET_KEY),
bucket, brokerDescProperties.get(S3Properties.Env.ENDPOINT),
brokerDescProperties.get(S3Properties.Env.REGION), "");
remote = RemoteBase.newInstance(objectInfo);
// RemoteBase#headObject does not throw exception if key does not exist.
remote.headObject("1");
remote.listObjects(null);
remote.close();
}
}
} catch (Exception e) {
LOG.warn("Failed check object info={}", objectInfo, e);
String message = e.getMessage();
if (message != null) {
int index = message.indexOf("Error message=");
if (index != -1) {
message = message.substring(index);
}
}
throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Incorrect object storage info, " + message);
} finally {
if (remote != null) {
remote.close();
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class S3Properties extends BaseProperties {
public static final String MAX_CONNECTIONS = "s3.connection.maximum";
public static final String REQUEST_TIMEOUT_MS = "s3.connection.request.timeout";
public static final String CONNECTION_TIMEOUT_MS = "s3.connection.timeout";
public static final String S3_PROVIDER = "S3";

// required by storage policy
public static final String ROOT_PATH = "s3.root.path";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_domain_connection_and_ak_sk_correction", "load_p0") {
// create table
def tableName = 'test_domain_connection_and_ak_sk_correction'
def tableNameOrders = 'test_domain_connection_and_ak_sk_correction_orders'
sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
P_PARTKEY INTEGER NOT NULL,
P_NAME VARCHAR(55) NOT NULL,
P_MFGR CHAR(25) NOT NULL,
P_BRAND CHAR(10) NOT NULL,
P_TYPE VARCHAR(25) NOT NULL,
P_SIZE INTEGER NOT NULL,
P_CONTAINER CHAR(10) NOT NULL,
P_RETAILPRICE DECIMAL(15,2) NOT NULL,
P_COMMENT VARCHAR(23) NOT NULL
)
DUPLICATE KEY(P_PARTKEY, P_NAME)
DISTRIBUTED BY HASH(P_PARTKEY) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
"""
sql """
CREATE TABLE IF NOT EXISTS ${tableNameOrders} (
O_ORDERKEY INTEGER NOT NULL,
O_CUSTKEY INTEGER NOT NULL,
O_ORDERSTATUS CHAR(1) NOT NULL,
O_TOTALPRICE DECIMAL(15,2) NOT NULL,
O_ORDERDATE DATE NOT NULL,
O_ORDERPRIORITY CHAR(15) NOT NULL,
O_CLERK CHAR(15) NOT NULL,
O_SHIPPRIORITY INTEGER NOT NULL,
O_COMMENT VARCHAR(79) NOT NULL
)
DUPLICATE KEY(O_ORDERKEY, O_CUSTKEY)
DISTRIBUTED BY HASH(O_ORDERKEY) BUCKETS 32
PROPERTIES (
"replication_num" = "1"
);
"""


def label = UUID.randomUUID().toString().replace("-", "")
def result = sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY "|"
(p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp)
)
WITH S3
(
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_REGION" = "${getS3Region()}"
);
"""
logger.info("the first sql result is {}", result)

label = UUID.randomUUID().toString().replace("-", "")
try {
result = sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY "|"
(p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp)
)
WITH S3
(
"AWS_ENDPOINT" = "${getS3Endpoint()}1",
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_REGION" = "${getS3Region()}"
);
"""
logger.info("the second sql result is {}", result)
assertTrue(false. "The endpoint is wrong, so the connection test should fale")
} catch (Exception e) {
logger.info("the second sql exception result is {}", e.getMessage())
assertTrue(e.getMessage().contains("Incorrect object storage info"), e.getMessage())
}

label = UUID.randomUUID().toString().replace("-", "")
try {
result = sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY "|"
(p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp)
)
WITH S3
(
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_ACCESS_KEY" = "${getS3AK()}1",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_REGION" = "${getS3Region()}"
);
"""
logger.info("the third sql result is {}", result)
assertTrue(false. "AK is wrong, so the correction of AKSK test should fale")
} catch (Exception e) {
logger.info("the third sql exception result is {}", e.getMessage())
assertTrue(e.getMessage().contains("Incorrect object storage info"), e.getMessage())
}

label = UUID.randomUUID().toString().replace("-", "")
try {
result = sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY "|"
(p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp),
DATA INFILE("s3://${getS3BucketName()}1/regression/tpch/sf1/orders.tbl.1", "s3://${getS3BucketName()}/regression/tpch/sf1/orders.tbl.2")
INTO TABLE ${tableNameOrders}
COLUMNS TERMINATED BY "|"
(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, temp)
)
WITH S3
(
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_REGION" = "${getS3Region()}"
);
"""
logger.info("the fourth sql result is {}", result)
assertTrue(false. "in the second DATA INFILE, the first bucket is wrong, so the sql should fail")
} catch (Exception e) {
logger.info("the fourth sql exception result is {}", e.getMessage())
assertTrue(e.getMessage().contains("Incorrect object storage info"), e.getMessage())
}
sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""
}

0 comments on commit 95cb544

Please sign in to comment.