Skip to content

Commit

Permalink
Support paimon hdfs hive datasource (DataLinkDC#3841)
Browse files Browse the repository at this point in the history
Co-authored-by: gaoyan1998 <[email protected]>
Co-authored-by: GH Action - Upstream Sync <[email protected]>
  • Loading branch information
3 people authored Sep 27, 2024
1 parent dacbb56 commit dcc1b50
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.dinky.metadata.config;

import org.dinky.utils.TextUtil;

public enum CatalogType {
Hive("Hive"),
JDBC("JDBC"),
NONE("None"),
FileSystem("FileSystem");

private final String name;
Expand All @@ -35,6 +38,9 @@ public String getName() {
}

public static CatalogType getByName(String name) {
if (TextUtil.isEmpty(name)) {
return NONE;
}
for (CatalogType catalogType : CatalogType.values()) {
if (catalogType.getName().equals(name)) {
return catalogType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@

package org.dinky.metadata.config;

import org.dinky.utils.TextUtil;

import lombok.Getter;

@Getter
public enum FileSystemType {
LOCAL("local"),
HDFS("hdfs"),
S3("s3"),
OSS("oss"),
NONE("none"),
;

private final String type;
Expand All @@ -32,11 +37,10 @@ public enum FileSystemType {
this.type = type;
}

public String getType() {
return type;
}

public static FileSystemType fromType(String type) {
if (TextUtil.isEmpty(type)) {
return NONE;
}
for (FileSystemType value : FileSystemType.values()) {
if (value.getType().equalsIgnoreCase(type)) {
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

package org.dinky.metadata.config;

import org.dinky.data.exception.BusException;
import org.dinky.data.model.CustomConfig;
import org.dinky.data.model.S3Configuration;
import org.dinky.utils.TextUtil;

import org.apache.paimon.options.Options;

import java.util.List;
import java.util.Objects;

import lombok.AllArgsConstructor;
import lombok.Builder;
Expand All @@ -41,22 +41,51 @@ public class PaimonConfig implements IConnectConfig {
private List<CustomConfig> paimonConfig;
private String warehouse;
private S3 s3;
private Hadoop hadoop;
private String fileSystemType;
private String catalogType;

public Options getOptions() {
Options options = new Options();
options.set("warehouse", warehouse);
if (Objects.requireNonNull(FileSystemType.fromType(fileSystemType)) == FileSystemType.S3) {

if (CatalogType.getByName(catalogType) == CatalogType.Hive) {
options.set(PaimonHadoopConfig.METASTORE, "hive");
if (hadoop != null) {
if (!TextUtil.isEmpty(hadoop.getHadoopConfDir())) {
options.set(PaimonHadoopConfig.hadoopConfDir, hadoop.getHadoopConfDir());
}
if (!TextUtil.isEmpty(hadoop.getHiveConfDir())) {
options.set(PaimonHadoopConfig.hiveConfDir, hadoop.getHiveConfDir());
}
if (!TextUtil.isEmpty(hadoop.getUri())) {
options.set(PaimonHadoopConfig.URI, hadoop.getUri());
}
} else {
throw new BusException("Hadoop config is required for Hive catalog");
}
} else {
options.set("warehouse", warehouse);
}

if (FileSystemType.fromType(fileSystemType) == FileSystemType.S3) {
if (s3 != null) {
options.set(S3Configuration.ENDPOINT, s3.getEndpoint());
options.set(S3Configuration.ACCESS_KEY, s3.getAccessKey());
options.set(S3Configuration.SECRET_KEY, s3.getSecretKey());
options.set(S3Configuration.PATH_STYLE_ACCESS, String.valueOf(s3.isPathStyle()));
options.set(PaimonS3Configuration.ENDPOINT, s3.getEndpoint());
options.set(PaimonS3Configuration.ACCESS_KEY, s3.getAccessKey());
options.set(PaimonS3Configuration.SECRET_KEY, s3.getSecretKey());
options.set(PaimonS3Configuration.PATH_STYLE_ACCESS, String.valueOf(s3.isPathStyle()));
} else {
throw new IllegalArgumentException("S3 config is required for S3 file system");
throw new BusException("S3 config is required for S3 file system");
}
} else if (FileSystemType.fromType(fileSystemType) == FileSystemType.HDFS) {
if (hadoop != null) {
if (!TextUtil.isEmpty(hadoop.getHadoopConfDir())) {
options.set(PaimonHadoopConfig.hadoopConfDir, hadoop.getHadoopConfDir());
}
} else {
throw new BusException("Hadoop config is required for hadoop ");
}
}

if (paimonConfig != null) {
for (CustomConfig customConfig : paimonConfig) {
options.set(customConfig.getName(), customConfig.getValue());
Expand All @@ -72,4 +101,11 @@ public static class S3 {
private String secretKey;
private boolean pathStyle;
}

@Data
public static class Hadoop {
private String hiveConfDir;
private String hadoopConfDir;
private String uri;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.metadata.config;

public class PaimonHadoopConfig {
public static String METASTORE = "metastore";
public static String URI = "uri";
public static String hiveConfDir = "hive-conf-dir";
public static String hadoopConfDir = "hadoop-conf-dir";
public static String WAREHOUSE = "warehouse";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.metadata.config;

/**
* 配置类 S3Configuration 用于存储 S3 配置信息
*
*/
public class PaimonS3Configuration {
public static String ACCESS_KEY = "s3.access-key";
public static String SECRET_KEY = "s3.secret-key";
public static String ENDPOINT = "s3.endpoint";
public static String BUCKET_NAME = "s3.bucket-name";
public static String PATH_STYLE_ACCESS = "s3.path.style.access";
public static String REGION = "s3.region";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,49 +25,68 @@ import {
ProFormText
} from '@ant-design/pro-components';
import { l } from '@/utils/intl';
import React from 'react';
import { Segmented, Space } from 'antd';
import React, { useEffect } from 'react';
import { Space } from 'antd';
import { FormInstance } from 'antd/es/form/hooks/useForm';
import { Values } from 'async-validator';

type DataSourceJdbcProps = {
form: FormInstance<Values>;
};
export const FileSystemType = {
S3: 'S3',
HDFS: 'HDFS',
LOCAL: 'LOCAL'
};

export const CatalogType = {
FileSystem: 'FileSystem',
JDBC: 'JDBC',
Hive: 'Hive'
};

const PaimonSourceForm: React.FC<DataSourceJdbcProps> = (props) => {
const { form } = props;
const [fileSystemType, setFileSystemType] = React.useState<string | number>('S3'); // ['S3', 'HDFS', 'LOCAL']
const [catalogType, setCatalogType] = React.useState<string | number>('FileSystem'); // ['FileSystem', 'JDBC', 'Hive']
const [fileSystemType, setFileSystemType] = React.useState<string>();
const [catalogType, setCatalogType] = React.useState<string>();

useEffect(() => {
setCatalogType(form.getFieldsValue()?.connectConfig?.catalogType ?? CatalogType.FileSystem);
setFileSystemType(form.getFieldsValue()?.connectConfig?.fileSystemType ?? FileSystemType.LOCAL);
});

const renderConfig = () => {
return (
<Space direction={'horizontal'} size={60}>
<ProFormSegmented
name={['connectConfig', 'catalogType']}
label='Catalog Type'
request={async () => [
{ label: 'FileSystem', value: 'FileSystem', disabled: false },
{ label: 'JDBC', value: 'JDBC', disabled: true },
{ label: 'Hive', value: 'Hive', disabled: true }
{ label: CatalogType.FileSystem, value: CatalogType.FileSystem, disabled: false },
{ label: CatalogType.JDBC, value: CatalogType.JDBC, disabled: true },
{ label: CatalogType.Hive, value: CatalogType.Hive, disabled: false }
]}
required
fieldProps={{
onChange: (value) => setCatalogType(value)
onChange: (value) => setCatalogType(value + '')
}}
/>

<ProFormSegmented
name={['connectConfig', 'fileSystemType']}
label='File System Type'
request={async () => [
{ label: 'S3', value: 'S3', disabled: false },
{ label: 'HDFS', value: 'HDFS', disabled: true },
{ label: 'LOCAL', value: 'LOCAL', disabled: false }
]}
required
fieldProps={{
onChange: (value) => setFileSystemType(value)
}}
/>
{catalogType == CatalogType.FileSystem && (
<ProFormSegmented
name={['connectConfig', 'fileSystemType']}
label='File System Type'
request={async () => [
{ label: FileSystemType.LOCAL, value: FileSystemType.LOCAL, disabled: false },
{ label: FileSystemType.S3, value: FileSystemType.S3, disabled: false },
{ label: FileSystemType.HDFS, value: FileSystemType.HDFS, disabled: false }
]}
required
fieldProps={{
onChange: (value) => setFileSystemType(value + '')
}}
/>
)}
</Space>
);
};
Expand Down Expand Up @@ -112,18 +131,57 @@ const PaimonSourceForm: React.FC<DataSourceJdbcProps> = (props) => {
);
};

const renderHiveConfig = () => {
return (
<>
<ProFormText
name={['connectConfig', 'hadoop', 'hiveConfDir']}
label='hive-conf-dir'
width={'md'}
required={true}
/>
<ProFormText
name={['connectConfig', 'hadoop', 'hadoopConfDir']}
label='hadoop-conf-dir'
width={'md'}
/>
<ProFormText name={['connectConfig', 'hadoop', 'uri']} label='Uri' width={'md'} />
</>
);
};
const renderHdfsConfig = () => {
return (
<>
<ProFormText
name={['connectConfig', 'hadoop', 'hadoopConfDir']}
label='hadoop-conf-dir'
width={'md'}
required={true}
/>
</>
);
};

return (
<div>
{renderConfig()}
<br />
<ProFormGroup>
<ProFormText
name={['connectConfig', 'warehouse']}
label='warehouse'
width={'md'}
required={true}
/>
{fileSystemType === 'S3' && renderS3Config()}
{catalogType != CatalogType.Hive && (
<ProFormText
name={['connectConfig', 'warehouse']}
label='warehouse'
width={'md'}
required={true}
/>
)}
{catalogType === CatalogType.FileSystem &&
fileSystemType === FileSystemType.S3 &&
renderS3Config()}
{catalogType === CatalogType.FileSystem &&
fileSystemType === FileSystemType.HDFS &&
renderHdfsConfig()}
{catalogType === CatalogType.Hive && renderHiveConfig()}
</ProFormGroup>

<ProFormList
Expand Down

0 comments on commit dcc1b50

Please sign in to comment.