Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hive] HiveCatalog infer warehouse from hadoop Conf #2432

Merged
merged 5 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/content/engines/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ Execute the following Flink SQL script in Flink SQL client to define a Paimon Hi
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://<hive-metastore-host-name>:<port>',
-- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
-- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
-- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
'warehouse' = 'hdfs:///path/to/table/store/warehouse'
-- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
);

-- Use paimon Hive catalog
Expand Down
4 changes: 2 additions & 2 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ hadoop-conf-dir parameter to the hive-site.xml file path.
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://<hive-metastore-host-name>:<port>',
-- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
-- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
-- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
'warehouse' = 'hdfs:///path/to/warehouse'
-- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
);

USE CATALOG my_hive;
Expand Down
8 changes: 4 additions & 4 deletions docs/content/migration/upsert-to-partitioned.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ fully compatible with Hive.
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://<hive-metastore-host-name>:<port>',
-- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
-- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
-- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
'warehouse' = 'hdfs:///path/to/warehouse'
-- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
);

USE CATALOG my_hive;
Expand Down Expand Up @@ -112,10 +112,10 @@ need to query the latest data. Therefore, Paimon provides a preview feature:
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://<hive-metastore-host-name>:<port>',
-- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
-- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
-- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
'warehouse' = 'hdfs:///path/to/warehouse'
-- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
);

USE CATALOG my_hive;
Expand Down
9 changes: 9 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.fs.FileIOUtils.checkAccess;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* File IO to read and write file.
Expand Down Expand Up @@ -179,6 +180,14 @@ default boolean isDir(Path path) throws IOException {
return getFileStatus(path).isDir();
}

default void checkOrMkdirs(Path path) throws IOException {
if (exists(path)) {
checkArgument(isDir(path), "The path '%s' should be a directory.", path);
} else {
mkdirs(path);
}
}

/** Read file to UTF_8 decoding. */
default String readFileUtf8(Path path) throws IOException {
try (SeekableInputStream in = newInputStream(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.Preconditions;

import java.io.IOException;
import java.io.UncheckedIOException;

import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* Factory to create {@link Catalog}. Each factory should have a unique identifier.
Expand All @@ -40,7 +40,15 @@
@Public
public interface CatalogFactory extends Factory {

Catalog create(FileIO fileIO, Path warehouse, CatalogContext context);
default Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
throw new UnsupportedOperationException(
"Use create(context) for " + this.getClass().getSimpleName());
}

default Catalog create(CatalogContext context) {
throw new UnsupportedOperationException(
"Use create(fileIO, warehouse, context) for " + this.getClass().getSimpleName());
}

static Path warehouse(CatalogContext context) {
String warehouse =
Expand All @@ -59,29 +67,27 @@ static Catalog createCatalog(CatalogContext options) {
}

static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) {
Options options = context.options();
String metastore = options.get(METASTORE);
CatalogFactory catalogFactory =
FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, metastore);

try {
return catalogFactory.create(context);
} catch (UnsupportedOperationException ignore) {
}

// manual validation
// because different catalog types may have different options
// we can't list them all in the optionalOptions() method
String warehouse = warehouse(context).toUri().toString();

String metastore = context.options().get(METASTORE);
CatalogFactory catalogFactory =
FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, metastore);

Path warehousePath = new Path(warehouse);
FileIO fileIO;

try {
fileIO = FileIO.get(warehousePath, context);
if (fileIO.exists(warehousePath)) {
checkArgument(
fileIO.isDir(warehousePath),
"The %s path '%s' should be a directory.",
WAREHOUSE.key(),
warehouse);
} else {
fileIO.mkdirs(warehousePath);
}
fileIO.checkOrMkdirs(warehousePath);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -67,9 +64,7 @@ public FlinkGenericCatalog createCatalog(Context context) {
@VisibleForTesting
public static FlinkGenericCatalog createCatalog(
ClassLoader cl, Map<String, String> optionMap, String name, Catalog flinkCatalog) {
String warehouse = extractWarehouse(flinkCatalog);
Options options = Options.fromMap(optionMap);
options.set(CatalogOptions.WAREHOUSE, warehouse);
options.set(CatalogOptions.METASTORE, "hive");
FlinkCatalog paimon =
new FlinkCatalog(
Expand All @@ -86,20 +81,4 @@ public static FlinkGenericCatalog createCatalog(
private static CatalogFactory createHiveCatalogFactory(ClassLoader cl) {
return FactoryUtil.discoverFactory(cl, CatalogFactory.class, "hive");
}

private static String extractWarehouse(Catalog catalog) {
try {
Field field = catalog.getClass().getDeclaredField("hiveConf");
field.setAccessible(true);
Object hiveConf = field.get(catalog);

Method method = hiveConf.getClass().getMethod("get", String.class);
return (String) method.invoke(hiveConf, "hive.metastore.warehouse.dir");
} catch (NoSuchFieldException
| IllegalAccessException
| NoSuchMethodException
| InvocationTargetException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;
Expand All @@ -39,6 +42,7 @@

import org.apache.flink.table.hive.LegacyHiveClasses;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
Expand All @@ -58,6 +62,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -71,8 +76,12 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
Expand Down Expand Up @@ -551,17 +560,16 @@ static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName)
}

public static HiveConf createHiveConf(
@Nullable String hiveConfDir, @Nullable String hadoopConfDir) {
@Nullable String hiveConfDir,
@Nullable String hadoopConfDir,
Configuration defaultHadoopConf) {
// try to load from system env.
if (isNullOrWhitespaceOnly(hiveConfDir)) {
hiveConfDir = possibleHiveConfPath();
}
if (isNullOrWhitespaceOnly(hadoopConfDir)) {
hadoopConfDir = possibleHadoopConfPath();
}

// create HiveConf from hadoop configuration with hadoop conf directory configured.
Configuration hadoopConf = null;
Configuration hadoopConf = defaultHadoopConf;
if (!isNullOrWhitespaceOnly(hadoopConfDir)) {
hadoopConf = getHadoopConfiguration(hadoopConfDir);
if (hadoopConf == null) {
Expand All @@ -575,9 +583,6 @@ public static HiveConf createHiveConf(
+ ") exist in the folder."));
}
}
if (hadoopConf == null) {
hadoopConf = new Configuration();
}

LOG.info("Setting hive conf dir as {}", hiveConfDir);
if (hiveConfDir != null) {
Expand Down Expand Up @@ -619,6 +624,60 @@ public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
}

public static Catalog createHiveCatalog(CatalogContext context) {
HiveConf hiveConf = createHiveConf(context);
String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE);
if (warehouseStr == null) {
warehouseStr =
hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal);
}
Path warehouse = new Path(warehouseStr);
Path uri =
warehouse.toUri().getScheme() == null
? new Path(FileSystem.getDefaultUri(hiveConf))
: warehouse;
FileIO fileIO;
try {
fileIO = FileIO.get(uri, context);
fileIO.checkOrMkdirs(warehouse);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return new HiveCatalog(
fileIO,
hiveConf,
context.options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
context.options(),
warehouse.toUri().toString());
}

public static HiveConf createHiveConf(CatalogContext context) {
String uri = context.options().get(CatalogOptions.URI);
String hiveConfDir = context.options().get(HIVE_CONF_DIR);
String hadoopConfDir = context.options().get(HADOOP_CONF_DIR);
HiveConf hiveConf =
HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, context.hadoopConf());

// always using user-set parameters overwrite hive-site.xml parameters
context.options().toMap().forEach(hiveConf::set);
if (uri != null) {
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
}

if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) {
LOG.error(
"Can't find hive metastore uri to connect: "
+ " either set "
+ CatalogOptions.URI.key()
+ " for paimon "
+ IDENTIFIER
+ " catalog or set hive.metastore.uris in hive-site.xml or hadoop configurations."
+ " Will use empty metastore uris, which means we may use a embedded metastore. The may cause unpredictable consensus problem.");
}

return hiveConf;
}

/**
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
*
Expand Down Expand Up @@ -659,22 +718,6 @@ public static Configuration getHadoopConfiguration(String hadoopConfDir) {
return null;
}

public static String possibleHadoopConfPath() {
String possiblePath = null;
if (System.getenv("HADOOP_CONF_DIR") != null) {
possiblePath = System.getenv("HADOOP_CONF_DIR");
} else if (System.getenv("HADOOP_HOME") != null) {
String possiblePath1 = System.getenv("HADOOP_HOME") + "/conf";
String possiblePath2 = System.getenv("HADOOP_HOME") + "/etc/hadoop";
if (new File(possiblePath1).exists()) {
possiblePath = possiblePath1;
} else if (new File(possiblePath2).exists()) {
possiblePath = possiblePath2;
}
}
return possiblePath;
}

public static String possibleHiveConfPath() {
return System.getenv("HIVE_CONF_DIR");
}
Expand Down
Loading
Loading