Skip to content

Commit

Permalink
6
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jan 10, 2025
1 parent cd560f4 commit a035d20
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public CatalogMgr() {
public static CatalogMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
if (LOG.isDebugEnabled()) {
LOG.debug("debug: read json: {}", json);
LOG.debug("Reading catalog configuration from JSON: {}", json);
}
return GsonUtils.GSON.fromJson(json, CatalogMgr.class);
}
Expand All @@ -137,7 +137,7 @@ private void addCatalog(CatalogIf catalog) {

private CatalogIf removeCatalog(long catalogId) {
CatalogIf catalog = idToCatalog.remove(catalogId);
LOG.info("Removed catalog with id {}, name {}", catalogId, catalog == null ? "N/A" : catalog.getName());
LOG.info("Removed catalog [id: {}, name: {}]", catalogId, catalog == null ? "N/A" : catalog.getName());
if (catalog != null) {
catalog.onClose();
nameToCatalog.remove(catalog.getName());
Expand Down Expand Up @@ -243,7 +243,7 @@ private void createCatalogImpl(CatalogIf catalog, String catalogName,
try {
if (nameToCatalog.containsKey(catalog.getName())) {
if (ifNotExists) {
LOG.warn("Catalog {} is already exist.", catalogName);
LOG.warn("Catalog '{}' already exists", catalogName);
return;
}
throw new DdlException("Catalog had already exist with name: " + catalogName);
Expand Down Expand Up @@ -280,7 +280,7 @@ public void dropCatalog(String catalogName, boolean ifExists) throws UserExcepti
writeLock();
try {
if (ifExists && !nameToCatalog.containsKey(catalogName)) {
LOG.warn("Non catalog {} is found.", catalogName);
LOG.warn("No catalog found with name '{}'", catalogName);
return;
}
CatalogIf<DatabaseIf<TableIf>> catalog = nameToCatalog.get(catalogName);
Expand Down Expand Up @@ -815,7 +815,7 @@ public void addExternalPartitions(String catalogName, String dbName, String tabl
return;
}
if (!(table instanceof HMSExternalTable)) {
LOG.warn("only support HMSTable");
LOG.warn("Operation only supported for HMS (Hive Metastore) tables");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties.Type;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.persist.gson.GsonUtils;

Expand Down Expand Up @@ -127,4 +128,9 @@ public static CatalogProperty read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, CatalogProperty.class);
}

public void initialize(Type type) {
metastoreProperties = new MetastoreProperties(type, properties);
storageProperties = new StorageProperties(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
Expand Down Expand Up @@ -63,7 +62,7 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
public PaimonExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props, String comment) {
super(catalogId, name, InitCatalogLog.Type.PAIMON, comment);
props = PropertyConverter.convertToMetaProperties(props);
// props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties.Type;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
Expand All @@ -37,6 +38,7 @@ public class PaimonHMSExternalCatalog extends PaimonExternalCatalog {
public PaimonHMSExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props, String comment) {
super(catalogId, name, resource, props, comment);
catalogProperty.initialize(Type.HMS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at

//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.property.catalog;

import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.metastore.AWSGlueProperties;
import org.apache.doris.datasource.property.metastore.HMSProperties;
import org.apache.doris.datasource.property.metastore.IcebergRestProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.datasource.property.storage.HDFSProperties;
import org.apache.doris.datasource.property.storage.S3Properties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.property.storage.StorageProperties.Type;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;

import java.net.URI;
import java.nio.file.Paths;
import java.util.Map;

public class IcebergCatalogProperties {

private Map<String, String> catalogProps = Maps.newHashMap();
private Configuration hadoopConf = new Configuration(false);

public IcebergCatalogProperties(String warehouse, MetastoreProperties metaProps, StorageProperties storeProps)
throws UserException {
init(warehouse, metaProps, storeProps);
}

private void init(String warehouse, MetastoreProperties metaProps, StorageProperties storeProps)
throws UserException {
initMetastoreProperties(metaProps);
initFileIOProperties(warehouse, storeProps);
}

private void initMetastoreProperties(MetastoreProperties metaProps)
throws UserException {
switch (metaProps.getType()) {
case HMS:
HMSProperties hmsProperties = (HMSProperties) metaProps;
hmsProperties.toIcebergHiveCatalogProperties(catalogProps);
break;
case GLUE:
AWSGlueProperties glueProperties = (AWSGlueProperties) metaProps;
glueProperties.toIcebergGlueCatalogProperties(catalogProps);
break;
case ICEBERG_REST:
IcebergRestProperties restProperties = (IcebergRestProperties) metaProps;
restProperties.toIcebergRestCatalogProperties(catalogProps);
break;
case FILE_SYSTEM:
break;
default:
throw new UserException("Unsupported metastore type: " + metaProps.getType());
}
}

private void initFileIOProperties(String warehouse, StorageProperties storeProps)
throws UserException {
String finalWarehouse = warehouse;
// init file io properties
URI uri = Paths.get(warehouse).toUri();
// need to set file io properties based on the warehouse scheme.
String scheme = Strings.nullToEmpty(uri.getScheme());
switch (scheme) {
case "":
case "file":
break;
case "hdfs":
initHadoopFileIOProps(storeProps);
break;
case "s3":
case "oss":
case "cos":
case "obs":
case "tos":
case "bos":
case "gcs":
// Use S3FileIO for all S3-like storage,
// replace the scheme with s3.
finalWarehouse = "s3://" + uri.getAuthority() + uri.getPath();
initS3FileIOProps(storeProps);
break;
default:
throw new UserException("Unsupported warehouse type: " + scheme);
}
catalogProps.put("warehouse", finalWarehouse);
}

private void initHadoopFileIOProps(StorageProperties storeProps) throws UserException {
if (storeProps.getType() != Type.HDFS) {
throw new UserException("The warehouse is on HDFS-like storage, but the storage property is not for HDFS: "
+ storeProps.getType());
}
HDFSProperties hdfsProps = (HDFSProperties) storeProps;
hdfsProps.toHadoopConfiguration(hadoopConf);
}

private void initS3FileIOProps(StorageProperties storeProps) throws UserException {
if (storeProps.getType() != Type.S3) {
throw new UserException("The warehouse is on S3-like storage, but the storage property is not for S3: "
+ storeProps.getType());
}
S3Properties s3Props = (S3Properties) storeProps;
s3Props.toIcebergS3FileIOProperties(catalogProps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class AWSGlueProperties extends MetastoreProperties {

@ConnectorProperty(names = {"glue.endpoint", "aws.region"},
@ConnectorProperty(names = {"glue.endpoint", "aws.endpoint"},
description = "The endpoint of the AWS Glue.")
private String glueEndpoint = "";

Expand Down Expand Up @@ -70,33 +70,23 @@ protected void checkRequiredProperties() {
}
}

public GlueCatalogCredentials getGlueCatalogCredentials() {
return new GlueCatalogCredentials(glueEndpoint, glueAccessKey, glueSecretKey);
}

public AWSCatalogMetastoreClientCredentials getAWSCatalogMetastoreClientCredentials() {
return new AWSCatalogMetastoreClientCredentials(glueEndpoint, glueAccessKey, glueSecretKey);
}

@Getter
public static class GlueCatalogCredentials {
private Map<String, String> credentials = Maps.newHashMap();

// Used for GlueCatalog in IcebergGlueExternalCatalog
public void toIcebergGlueCatalogProperties(Map<String, String> catalogProps) {
// See AwsClientProperties.java for property keys
public GlueCatalogCredentials(String endpoint, String ak, String sk) {
credentials.put("client.credentials-provider",
"com.amazonaws.glue.catalog.credentials.ConfigurationAWSCredentialsProvider2x");
credentials.put("client.credentials-provider.glue.access_key", ak);
credentials.put("client.credentials-provider.glue.secret_key", sk);
credentials.put("client.region", getRegionFromGlueEndpoint(endpoint));
}
catalogProps.put("client.credentials-provider",
"com.amazonaws.glue.catalog.credentials.ConfigurationAWSCredentialsProvider2x");
catalogProps.put("client.credentials-provider.glue.access_key", glueAccessKey);
catalogProps.put("client.credentials-provider.glue.secret_key", glueSecretKey);
catalogProps.put("client.region", getRegionFromGlueEndpoint());
}

private String getRegionFromGlueEndpoint(String endpoint) {
// https://glue.ap-northeast-1.amazonaws.com
// -> ap-northeast-1
return endpoint.split("\\.")[1];
}
private String getRegionFromGlueEndpoint() {
// https://glue.ap-northeast-1.amazonaws.com
// -> ap-northeast-1
return glueEndpoint.split("\\.")[1];
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,16 @@ public void toPaimonOptionsAndConf(Options options, Configuration conf) {
conf.set("hive.metastore.client.keytab", hiveMetastoreClientKeytab);
}
}

public void toIcebergHiveCatalogProperties(Map<String, String> catalogProps) {
catalogProps.put("uri", hiveMetastoreUri);
Map<String, String> allProps = loadConfigFromFile(getResourceConfigPropName());
allProps.forEach(catalogProps::put);
catalogProps.put("hive.metastore.authentication.type", hiveMetastoreAuthenticationType);
if ("catalogProps".equalsIgnoreCase(hiveMetastoreAuthenticationType)) {
catalogProps.put("hive.metastore.service.principal", hiveMetastoreServicePrincipal);
catalogProps.put("hive.metastore.client.principal", hiveMetastoreClientPrincipal);
catalogProps.put("hive.metastore.client.keytab", hiveMetastoreClientKeytab);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ public class IcebergRestProperties extends MetastoreProperties {
private String icebergRestUri = "";

@ConnectorProperty(names = {"iceberg.rest.security.type"},
required = false,
supported = false,
description = "The security type of the iceberg rest catalog service.")
private String icebergRestSecurityType = "none";

@ConnectorProperty(names = {"iceberg.rest.prefix"},
required = false,
supported = false,
description = "The prefix of the iceberg rest catalog service.")
private String icebergRestPrefix = "";

Expand All @@ -42,4 +46,11 @@ public IcebergRestProperties(Map<String, String> origProps) {
@Override
protected void checkRequiredProperties() {
}

public void toIcebergRestCatalogProperties(Map<String, String> catalogProps) {
// See CatalogUtil.java
catalogProps.put("type", "rest");
// See CatalogProperties.java
catalogProps.put("uri", icebergRestUri);
}
}
Loading

0 comments on commit a035d20

Please sign in to comment.