diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md index 8a3ab9163952..a210099e8230 100644 --- a/docs/content/engines/hive.md +++ b/docs/content/engines/hive.md @@ -100,10 +100,10 @@ Execute the following Flink SQL script in Flink SQL client to define a Paimon Hi CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', - 'uri' = 'thrift://:', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - 'warehouse' = 'hdfs:///path/to/table/store/warehouse' + -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); -- Use paimon Hive catalog diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index 7faddc5a5d6a..19f11a2a9cf0 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -103,10 +103,10 @@ hadoop-conf-dir parameter to the hive-site.xml file path. CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', - 'uri' = 'thrift://:', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - 'warehouse' = 'hdfs:///path/to/warehouse' + -- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); USE CATALOG my_hive; diff --git a/docs/content/migration/upsert-to-partitioned.md b/docs/content/migration/upsert-to-partitioned.md index 8b8830c8f406..6c5ccd10a972 100644 --- a/docs/content/migration/upsert-to-partitioned.md +++ b/docs/content/migration/upsert-to-partitioned.md @@ -51,10 +51,10 @@ fully compatible with Hive. CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', - 'uri' = 'thrift://:', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - 'warehouse' = 'hdfs:///path/to/warehouse' + -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); USE CATALOG my_hive; @@ -112,10 +112,10 @@ need to query the latest data. Therefore, Paimon provides a preview feature: CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', - 'uri' = 'thrift://:', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - 'warehouse' = 'hdfs:///path/to/warehouse' + -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); USE CATALOG my_hive; diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index ef5035a66f4c..6b83901e7050 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -48,6 +48,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.fs.FileIOUtils.checkAccess; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** * File IO to read and write file. @@ -179,6 +180,14 @@ default boolean isDir(Path path) throws IOException { return getFileStatus(path).isDir(); } + default void checkOrMkdirs(Path path) throws IOException { + if (exists(path)) { + checkArgument(isDir(path), "The path '%s' should be a directory.", path); + } else { + mkdirs(path); + } + } + /** Read file to UTF_8 decoding. */ default String readFileUtf8(Path path) throws IOException { try (SeekableInputStream in = newInputStream(path)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java index 8a6bab4b10d8..c153f0ee39c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.utils.Preconditions; import java.io.IOException; @@ -30,7 +31,6 @@ import static org.apache.paimon.options.CatalogOptions.METASTORE; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** * Factory to create {@link Catalog}. Each factory should have a unique identifier. @@ -40,7 +40,15 @@ @Public public interface CatalogFactory extends Factory { - Catalog create(FileIO fileIO, Path warehouse, CatalogContext context); + default Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { + throw new UnsupportedOperationException( + "Use create(context) for " + this.getClass().getSimpleName()); + } + + default Catalog create(CatalogContext context) { + throw new UnsupportedOperationException( + "Use create(fileIO, warehouse, context) for " + this.getClass().getSimpleName()); + } static Path warehouse(CatalogContext context) { String warehouse = @@ -59,29 +67,27 @@ static Catalog createCatalog(CatalogContext options) { } static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) { + Options options = context.options(); + String metastore = options.get(METASTORE); + CatalogFactory catalogFactory = + FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, metastore); + + try { + return catalogFactory.create(context); + } catch (UnsupportedOperationException ignore) { + } + // manual validation // because different catalog types may have different options // we can't list them all in the optionalOptions() method String warehouse = warehouse(context).toUri().toString(); - String metastore = context.options().get(METASTORE); - CatalogFactory catalogFactory = - FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, metastore); - Path warehousePath = new Path(warehouse); FileIO fileIO; try { fileIO = FileIO.get(warehousePath, context); - if (fileIO.exists(warehousePath)) { - checkArgument( - fileIO.isDir(warehousePath), - "The %s path '%s' should be a directory.", - WAREHOUSE.key(), - warehouse); - } else { - fileIO.mkdirs(warehousePath); - } + fileIO.checkOrMkdirs(warehousePath); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java index e32997000f4c..d07779297545 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java @@ -27,9 +27,6 @@ import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -67,9 +64,7 @@ public FlinkGenericCatalog createCatalog(Context context) { @VisibleForTesting public static FlinkGenericCatalog createCatalog( ClassLoader cl, Map optionMap, String name, Catalog flinkCatalog) { - String warehouse = extractWarehouse(flinkCatalog); Options options = Options.fromMap(optionMap); - options.set(CatalogOptions.WAREHOUSE, warehouse); options.set(CatalogOptions.METASTORE, "hive"); FlinkCatalog paimon = new FlinkCatalog( @@ -86,20 +81,4 @@ public static FlinkGenericCatalog createCatalog( private static CatalogFactory createHiveCatalogFactory(ClassLoader cl) { return FactoryUtil.discoverFactory(cl, CatalogFactory.class, "hive"); } - - private static String extractWarehouse(Catalog catalog) { - try { - Field field = catalog.getClass().getDeclaredField("hiveConf"); - field.setAccessible(true); - Object hiveConf = field.get(catalog); - - Method method = hiveConf.getClass().getMethod("get", String.class); - return (String) method.invoke(hiveConf, "hive.metastore.warehouse.dir"); - } catch (NoSuchFieldException - | IllegalAccessException - | NoSuchMethodException - | InvocationTargetException e) { - throw new RuntimeException(e); - } - } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 60aef7942a4b..731c0935854c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -21,12 +21,15 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.Lock; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.options.OptionsUtils; import org.apache.paimon.schema.Schema; @@ -39,6 +42,7 @@ import org.apache.flink.table.hive.LegacyHiveClasses; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -58,6 +62,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -71,8 +76,12 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; +import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; +import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; +import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; @@ -551,17 +560,16 @@ static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) } public static HiveConf createHiveConf( - @Nullable String hiveConfDir, @Nullable String hadoopConfDir) { + @Nullable String hiveConfDir, + @Nullable String hadoopConfDir, + Configuration defaultHadoopConf) { // try to load from system env. if (isNullOrWhitespaceOnly(hiveConfDir)) { hiveConfDir = possibleHiveConfPath(); } - if (isNullOrWhitespaceOnly(hadoopConfDir)) { - hadoopConfDir = possibleHadoopConfPath(); - } // create HiveConf from hadoop configuration with hadoop conf directory configured. - Configuration hadoopConf = null; + Configuration hadoopConf = defaultHadoopConf; if (!isNullOrWhitespaceOnly(hadoopConfDir)) { hadoopConf = getHadoopConfiguration(hadoopConfDir); if (hadoopConf == null) { @@ -575,9 +583,6 @@ public static HiveConf createHiveConf( + ") exist in the folder.")); } } - if (hadoopConf == null) { - hadoopConf = new Configuration(); - } LOG.info("Setting hive conf dir as {}", hiveConfDir); if (hiveConfDir != null) { @@ -619,6 +624,60 @@ public static boolean isEmbeddedMetastore(HiveConf hiveConf) { return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); } + public static Catalog createHiveCatalog(CatalogContext context) { + HiveConf hiveConf = createHiveConf(context); + String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE); + if (warehouseStr == null) { + warehouseStr = + hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal); + } + Path warehouse = new Path(warehouseStr); + Path uri = + warehouse.toUri().getScheme() == null + ? new Path(FileSystem.getDefaultUri(hiveConf)) + : warehouse; + FileIO fileIO; + try { + fileIO = FileIO.get(uri, context); + fileIO.checkOrMkdirs(warehouse); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return new HiveCatalog( + fileIO, + hiveConf, + context.options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS), + context.options(), + warehouse.toUri().toString()); + } + + public static HiveConf createHiveConf(CatalogContext context) { + String uri = context.options().get(CatalogOptions.URI); + String hiveConfDir = context.options().get(HIVE_CONF_DIR); + String hadoopConfDir = context.options().get(HADOOP_CONF_DIR); + HiveConf hiveConf = + HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, context.hadoopConf()); + + // always using user-set parameters overwrite hive-site.xml parameters + context.options().toMap().forEach(hiveConf::set); + if (uri != null) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + } + + if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) { + LOG.error( + "Can't find hive metastore uri to connect: " + + " either set " + + CatalogOptions.URI.key() + + " for paimon " + + IDENTIFIER + + " catalog or set hive.metastore.uris in hive-site.xml or hadoop configurations." + + " Will use empty metastore uris, which means we may use a embedded metastore. The may cause unpredictable consensus problem."); + } + + return hiveConf; + } + /** * Returns a new Hadoop Configuration object using the path to the hadoop conf configured. * @@ -659,22 +718,6 @@ public static Configuration getHadoopConfiguration(String hadoopConfDir) { return null; } - public static String possibleHadoopConfPath() { - String possiblePath = null; - if (System.getenv("HADOOP_CONF_DIR") != null) { - possiblePath = System.getenv("HADOOP_CONF_DIR"); - } else if (System.getenv("HADOOP_HOME") != null) { - String possiblePath1 = System.getenv("HADOOP_HOME") + "/conf"; - String possiblePath2 = System.getenv("HADOOP_HOME") + "/etc/hadoop"; - if (new File(possiblePath1).exists()) { - possiblePath = possiblePath1; - } else if (new File(possiblePath2).exists()) { - possiblePath = possiblePath2; - } - } - return possiblePath; - } - public static String possibleHiveConfPath() { return System.getenv("HIVE_CONF_DIR"); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java index be582fe2b691..cc51df265095 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java @@ -21,19 +21,12 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; -import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; -import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.paimon.hive.HiveCatalog.createHiveConf; -import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; -import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; /** Factory to create {@link HiveCatalog}. */ @@ -41,7 +34,7 @@ public class HiveCatalogFactory implements CatalogFactory { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogFactory.class); - private static final ConfigOption METASTORE_CLIENT_CLASS = + public static final ConfigOption METASTORE_CLIENT_CLASS = ConfigOptions.key("metastore.client.class") .stringType() .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient") @@ -56,32 +49,7 @@ public String identifier() { } @Override - public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { - String uri = context.options().get(CatalogOptions.URI); - String hiveConfDir = context.options().get(HIVE_CONF_DIR); - String hadoopConfDir = context.options().get(HADOOP_CONF_DIR); - HiveConf hiveConf = createHiveConf(hiveConfDir, hadoopConfDir); - - // always using user-set parameters overwrite hive-site.xml parameters - context.options().toMap().forEach(hiveConf::set); - if (uri != null) { - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); - } - - if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) { - LOG.error( - "Can't find hive metastore uri to connect: " - + " either set " - + CatalogOptions.URI.key() - + " for paimon " - + IDENTIFIER - + " catalog or set hive.metastore.uris in hive-site.xml or hadoop configurations." - + " Will use empty metastore uris, which means we may use a embedded metastore. The may cause unpredictable consensus problem."); - } - - String clientClassName = context.options().get(METASTORE_CLIENT_CLASS); - - return new HiveCatalog( - fileIO, hiveConf, clientClassName, context.options(), warehouse.toUri().toString()); + public Catalog create(CatalogContext context) { + return HiveCatalog.createHiveCatalog(context); } } diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index d03b6a50cc03..05da1bccf1ec 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -20,10 +20,12 @@ import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.CommonTestUtils; +import org.apache.paimon.utils.HadoopUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -104,7 +106,9 @@ public void testCheckIdentifierUpperCase() throws Exception { @Test public void testHadoopConfDir() { - HiveConf hiveConf = HiveCatalog.createHiveConf(null, HADOOP_CONF_DIR); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + null, HADOOP_CONF_DIR, HadoopUtils.getHadoopConfiguration(new Options())); assertThat(hiveConf.get("fs.defaultFS")).isEqualTo("dummy-fs"); } @@ -118,7 +122,9 @@ public void testHiveConfDir() { } private void testHiveConfDirImpl() { - HiveConf hiveConf = HiveCatalog.createHiveConf(HIVE_CONF_DIR, null); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + HIVE_CONF_DIR, null, HadoopUtils.getHadoopConfiguration(new Options())); assertThat(hiveConf.get("hive.metastore.uris")).isEqualTo("dummy-hms"); } @@ -134,7 +140,9 @@ public void testHadoopConfDirFromEnv() { // add HADOOP_CONF_DIR to system environment CommonTestUtils.setEnv(newEnv, false); - HiveConf hiveConf = HiveCatalog.createHiveConf(null, null); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + null, null, HadoopUtils.getHadoopConfiguration(new Options())); assertThat(hiveConf.get("fs.defaultFS")).isEqualTo("dummy-fs"); } @@ -153,7 +161,9 @@ private void testHiveConfDirFromEnvImpl() { // add HIVE_CONF_DIR to system environment CommonTestUtils.setEnv(newEnv, false); - HiveConf hiveConf = HiveCatalog.createHiveConf(null, null); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + null, null, HadoopUtils.getHadoopConfiguration(new Options())); assertThat(hiveConf.get("hive.metastore.uris")).isEqualTo("dummy-hms"); } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java index 6a323eabc880..ff8f67a81bad 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java @@ -109,7 +109,7 @@ public void before() throws IOException { fileIO.mkdirs(warehouse); HiveCatalogFactory hiveCatalogFactory = new HiveCatalogFactory(); - catalog = (HiveCatalog) hiveCatalogFactory.create(fileIO, warehouse, catalogContext); + catalog = (HiveCatalog) hiveCatalogFactory.create(catalogContext); hmsClient = catalog.getHmsClient();