From 732ccae5a7a9062e942b5ee3f41230b6d8a6bf34 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 30 Nov 2023 23:20:05 +0800 Subject: [PATCH 1/5] [hive] HiveCatalog infer warehouse from hadoop Conf --- .../apache/paimon/catalog/CatalogFactory.java | 19 +++++-- .../flink/FlinkGenericCatalogFactory.java | 21 -------- .../org/apache/paimon/hive/HiveCatalog.java | 28 ++-------- .../paimon/hive/HiveCatalogFactory.java | 54 ++++++++++++++++--- .../apache/paimon/hive/HiveCatalogTest.java | 18 +++++-- 5 files changed, 80 insertions(+), 60 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java index 8a6bab4b10d8..4184ef54ff9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.utils.Preconditions; import java.io.IOException; @@ -42,6 +43,11 @@ public interface CatalogFactory extends Factory { Catalog create(FileIO fileIO, Path warehouse, CatalogContext context); + default Catalog create(CatalogContext context) { + throw new UnsupportedOperationException( + "Please provide 'warehouse' for catalog: " + this.getClass().getSimpleName()); + } + static Path warehouse(CatalogContext context) { String warehouse = Preconditions.checkNotNull( @@ -59,15 +65,20 @@ static Catalog createCatalog(CatalogContext options) { } static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) { + Options options = context.options(); + String metastore = options.get(METASTORE); + CatalogFactory catalogFactory = + FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, metastore); + + if (!options.contains(WAREHOUSE)) { + return catalogFactory.create(context); + } + // manual validation // because different catalog types may have different options // we can't list them all in the optionalOptions() method String warehouse = warehouse(context).toUri().toString(); - String metastore = context.options().get(METASTORE); - CatalogFactory catalogFactory = - FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, metastore); - Path warehousePath = new Path(warehouse); FileIO fileIO; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java index e32997000f4c..d07779297545 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java @@ -27,9 +27,6 @@ import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -67,9 +64,7 @@ public FlinkGenericCatalog createCatalog(Context context) { @VisibleForTesting public static FlinkGenericCatalog createCatalog( ClassLoader cl, Map optionMap, String name, Catalog flinkCatalog) { - String warehouse = extractWarehouse(flinkCatalog); Options options = Options.fromMap(optionMap); - options.set(CatalogOptions.WAREHOUSE, warehouse); options.set(CatalogOptions.METASTORE, "hive"); FlinkCatalog paimon = new FlinkCatalog( @@ -86,20 +81,4 @@ public static FlinkGenericCatalog createCatalog( private static CatalogFactory createHiveCatalogFactory(ClassLoader cl) { return FactoryUtil.discoverFactory(cl, CatalogFactory.class, "hive"); } - - private static String extractWarehouse(Catalog catalog) { - try { - Field field = catalog.getClass().getDeclaredField("hiveConf"); - field.setAccessible(true); - Object hiveConf = field.get(catalog); - - Method method = hiveConf.getClass().getMethod("get", String.class); - return (String) method.invoke(hiveConf, "hive.metastore.warehouse.dir"); - } catch (NoSuchFieldException - | IllegalAccessException - | NoSuchMethodException - | InvocationTargetException e) { - throw new RuntimeException(e); - } - } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 60aef7942a4b..31cf8a7cf913 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -551,17 +551,16 @@ static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName) } public static HiveConf createHiveConf( - @Nullable String hiveConfDir, @Nullable String hadoopConfDir) { + @Nullable String hiveConfDir, + @Nullable String hadoopConfDir, + Configuration defaultHadoopConf) { // try to load from system env. if (isNullOrWhitespaceOnly(hiveConfDir)) { hiveConfDir = possibleHiveConfPath(); } - if (isNullOrWhitespaceOnly(hadoopConfDir)) { - hadoopConfDir = possibleHadoopConfPath(); - } // create HiveConf from hadoop configuration with hadoop conf directory configured. - Configuration hadoopConf = null; + Configuration hadoopConf = defaultHadoopConf; if (!isNullOrWhitespaceOnly(hadoopConfDir)) { hadoopConf = getHadoopConfiguration(hadoopConfDir); if (hadoopConf == null) { @@ -575,9 +574,6 @@ public static HiveConf createHiveConf( + ") exist in the folder.")); } } - if (hadoopConf == null) { - hadoopConf = new Configuration(); - } LOG.info("Setting hive conf dir as {}", hiveConfDir); if (hiveConfDir != null) { @@ -659,22 +655,6 @@ public static Configuration getHadoopConfiguration(String hadoopConfDir) { return null; } - public static String possibleHadoopConfPath() { - String possiblePath = null; - if (System.getenv("HADOOP_CONF_DIR") != null) { - possiblePath = System.getenv("HADOOP_CONF_DIR"); - } else if (System.getenv("HADOOP_HOME") != null) { - String possiblePath1 = System.getenv("HADOOP_HOME") + "/conf"; - String possiblePath2 = System.getenv("HADOOP_HOME") + "/etc/hadoop"; - if (new File(possiblePath1).exists()) { - possiblePath = possiblePath1; - } else if (new File(possiblePath2).exists()) { - possiblePath = possiblePath2; - } - } - return possiblePath; - } - public static String possibleHiveConfPath() { return System.getenv("HIVE_CONF_DIR"); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java index be582fe2b691..4dcfe1629b43 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java @@ -27,10 +27,16 @@ import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.UncheckedIOException; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; import static org.apache.paimon.hive.HiveCatalog.createHiveConf; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; @@ -55,20 +61,57 @@ public String identifier() { return IDENTIFIER; } + @Override + public Catalog create(CatalogContext context) { + HiveConf hiveConf = createHiveConf(context); + Path warehouse = + new Path( + hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal)); + FileIO fileIO; + try { + fileIO = FileIO.get(warehouse, context); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return create(fileIO, warehouse, context, hiveConf); + } + @Override public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { + return create(fileIO, warehouse, context, createHiveConf(context)); + } + + private Catalog create( + FileIO fileIO, Path warehouse, CatalogContext context, HiveConf hiveConf) { + if (warehouse.toUri().getScheme() == null) { + try { + fileIO = FileIO.get(new Path(FileSystem.getDefaultUri(hiveConf)), context); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return new HiveCatalog( + fileIO, + hiveConf, + context.options().get(METASTORE_CLIENT_CLASS), + context.options(), + warehouse.toUri().toString()); + } + + private static HiveConf createHiveConf(CatalogContext context) { String uri = context.options().get(CatalogOptions.URI); String hiveConfDir = context.options().get(HIVE_CONF_DIR); String hadoopConfDir = context.options().get(HADOOP_CONF_DIR); - HiveConf hiveConf = createHiveConf(hiveConfDir, hadoopConfDir); + HiveConf hiveConf = + HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, context.hadoopConf()); // always using user-set parameters overwrite hive-site.xml parameters context.options().toMap().forEach(hiveConf::set); if (uri != null) { - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + hiveConf.set(ConfVars.METASTOREURIS.varname, uri); } - if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) { + if (hiveConf.get(ConfVars.METASTOREURIS.varname) == null) { LOG.error( "Can't find hive metastore uri to connect: " + " either set " @@ -79,9 +122,6 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { + " Will use empty metastore uris, which means we may use a embedded metastore. The may cause unpredictable consensus problem."); } - String clientClassName = context.options().get(METASTORE_CLIENT_CLASS); - - return new HiveCatalog( - fileIO, hiveConf, clientClassName, context.options(), warehouse.toUri().toString()); + return hiveConf; } } diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index d03b6a50cc03..05da1bccf1ec 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -20,10 +20,12 @@ import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.CommonTestUtils; +import org.apache.paimon.utils.HadoopUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -104,7 +106,9 @@ public void testCheckIdentifierUpperCase() throws Exception { @Test public void testHadoopConfDir() { - HiveConf hiveConf = HiveCatalog.createHiveConf(null, HADOOP_CONF_DIR); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + null, HADOOP_CONF_DIR, HadoopUtils.getHadoopConfiguration(new Options())); assertThat(hiveConf.get("fs.defaultFS")).isEqualTo("dummy-fs"); } @@ -118,7 +122,9 @@ public void testHiveConfDir() { } private void testHiveConfDirImpl() { - HiveConf hiveConf = HiveCatalog.createHiveConf(HIVE_CONF_DIR, null); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + HIVE_CONF_DIR, null, HadoopUtils.getHadoopConfiguration(new Options())); assertThat(hiveConf.get("hive.metastore.uris")).isEqualTo("dummy-hms"); } @@ -134,7 +140,9 @@ public void testHadoopConfDirFromEnv() { // add HADOOP_CONF_DIR to system environment CommonTestUtils.setEnv(newEnv, false); - HiveConf hiveConf = HiveCatalog.createHiveConf(null, null); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + null, null, HadoopUtils.getHadoopConfiguration(new Options())); assertThat(hiveConf.get("fs.defaultFS")).isEqualTo("dummy-fs"); } @@ -153,7 +161,9 @@ private void testHiveConfDirFromEnvImpl() { // add HIVE_CONF_DIR to system environment CommonTestUtils.setEnv(newEnv, false); - HiveConf hiveConf = HiveCatalog.createHiveConf(null, null); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + null, null, HadoopUtils.getHadoopConfiguration(new Options())); assertThat(hiveConf.get("hive.metastore.uris")).isEqualTo("dummy-hms"); } From 77856d6ba34eb3c9ea746f164aba960acc5944a8 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 1 Dec 2023 09:53:42 +0800 Subject: [PATCH 2/5] fi --- .../java/org/apache/paimon/fs/FileIO.java | 9 +++++ .../apache/paimon/catalog/CatalogFactory.java | 21 +++++------- paimon-flink/paimon-flink-common/pom.xml | 13 +++++++ .../paimon/hive/HiveCatalogFactory.java | 34 +++++++------------ 4 files changed, 42 insertions(+), 35 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index ef5035a66f4c..6b83901e7050 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -48,6 +48,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.fs.FileIOUtils.checkAccess; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** * File IO to read and write file. @@ -179,6 +180,14 @@ default boolean isDir(Path path) throws IOException { return getFileStatus(path).isDir(); } + default void checkOrMkdirs(Path path) throws IOException { + if (exists(path)) { + checkArgument(isDir(path), "The path '%s' should be a directory.", path); + } else { + mkdirs(path); + } + } + /** Read file to UTF_8 decoding. */ default String readFileUtf8(Path path) throws IOException { try (SeekableInputStream in = newInputStream(path)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java index 4184ef54ff9a..c153f0ee39c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java @@ -31,7 +31,6 @@ import static org.apache.paimon.options.CatalogOptions.METASTORE; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** * Factory to create {@link Catalog}. Each factory should have a unique identifier. @@ -41,11 +40,14 @@ @Public public interface CatalogFactory extends Factory { - Catalog create(FileIO fileIO, Path warehouse, CatalogContext context); + default Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { + throw new UnsupportedOperationException( + "Use create(context) for " + this.getClass().getSimpleName()); + } default Catalog create(CatalogContext context) { throw new UnsupportedOperationException( - "Please provide 'warehouse' for catalog: " + this.getClass().getSimpleName()); + "Use create(fileIO, warehouse, context) for " + this.getClass().getSimpleName()); } static Path warehouse(CatalogContext context) { @@ -70,8 +72,9 @@ static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) { CatalogFactory catalogFactory = FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, metastore); - if (!options.contains(WAREHOUSE)) { + try { return catalogFactory.create(context); + } catch (UnsupportedOperationException ignore) { } // manual validation @@ -84,15 +87,7 @@ static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) { try { fileIO = FileIO.get(warehousePath, context); - if (fileIO.exists(warehousePath)) { - checkArgument( - fileIO.isDir(warehousePath), - "The %s path '%s' should be a directory.", - WAREHOUSE.key(), - warehouse); - } else { - fileIO.mkdirs(warehousePath); - } + fileIO.checkOrMkdirs(warehousePath); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index c5a7d32a765c..73482b442244 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -210,6 +210,19 @@ under the License. ${flink.version} test + + + org.apache.hive + hive-exec + ${hive.version} + test + + + * + * + + + diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java index 4dcfe1629b43..0acbc01f17f6 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java @@ -37,7 +37,6 @@ import java.io.UncheckedIOException; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; -import static org.apache.paimon.hive.HiveCatalog.createHiveConf; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; @@ -64,32 +63,23 @@ public String identifier() { @Override public Catalog create(CatalogContext context) { HiveConf hiveConf = createHiveConf(context); - Path warehouse = - new Path( - hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal)); + String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE); + if (warehouseStr == null) { + warehouseStr = + hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal); + } + Path warehouse = new Path(warehouseStr); + Path uri = + warehouse.toUri().getScheme() == null + ? new Path(FileSystem.getDefaultUri(hiveConf)) + : warehouse; FileIO fileIO; try { - fileIO = FileIO.get(warehouse, context); + fileIO = FileIO.get(uri, context); + fileIO.checkOrMkdirs(warehouse); } catch (IOException e) { throw new UncheckedIOException(e); } - return create(fileIO, warehouse, context, hiveConf); - } - - @Override - public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { - return create(fileIO, warehouse, context, createHiveConf(context)); - } - - private Catalog create( - FileIO fileIO, Path warehouse, CatalogContext context, HiveConf hiveConf) { - if (warehouse.toUri().getScheme() == null) { - try { - fileIO = FileIO.get(new Path(FileSystem.getDefaultUri(hiveConf)), context); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } return new HiveCatalog( fileIO, hiveConf, From 9c10f219ff27054d56c21d7818d727261fb582c8 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 1 Dec 2023 10:54:20 +0800 Subject: [PATCH 3/5] fix --- docs/content/engines/hive.md | 2 +- docs/content/how-to/creating-catalogs.md | 2 +- docs/content/migration/upsert-to-partitioned.md | 4 ++-- .../test/java/org/apache/paimon/hive/HiveLocationTest.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md index 8a3ab9163952..74b164086e4d 100644 --- a/docs/content/engines/hive.md +++ b/docs/content/engines/hive.md @@ -103,7 +103,7 @@ CREATE CATALOG my_hive WITH ( 'uri' = 'thrift://:', -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - 'warehouse' = 'hdfs:///path/to/table/store/warehouse' + -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); -- Use paimon Hive catalog diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index 7faddc5a5d6a..5bea97850bc4 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -106,7 +106,7 @@ CREATE CATALOG my_hive WITH ( 'uri' = 'thrift://:', -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - 'warehouse' = 'hdfs:///path/to/warehouse' + -- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); USE CATALOG my_hive; diff --git a/docs/content/migration/upsert-to-partitioned.md b/docs/content/migration/upsert-to-partitioned.md index 8b8830c8f406..74a547f839f3 100644 --- a/docs/content/migration/upsert-to-partitioned.md +++ b/docs/content/migration/upsert-to-partitioned.md @@ -54,7 +54,7 @@ CREATE CATALOG my_hive WITH ( 'uri' = 'thrift://:', -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - 'warehouse' = 'hdfs:///path/to/warehouse' + -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); USE CATALOG my_hive; @@ -115,7 +115,7 @@ CREATE CATALOG my_hive WITH ( 'uri' = 'thrift://:', -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - 'warehouse' = 'hdfs:///path/to/warehouse' + -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); USE CATALOG my_hive; diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java index 6a323eabc880..ff8f67a81bad 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java @@ -109,7 +109,7 @@ public void before() throws IOException { fileIO.mkdirs(warehouse); HiveCatalogFactory hiveCatalogFactory = new HiveCatalogFactory(); - catalog = (HiveCatalog) hiveCatalogFactory.create(fileIO, warehouse, catalogContext); + catalog = (HiveCatalog) hiveCatalogFactory.create(catalogContext); hmsClient = catalog.getHmsClient(); From a8198e4c2f1eadeca9ab96eb233a99f1a7a7dc1a Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 1 Dec 2023 11:05:31 +0800 Subject: [PATCH 4/5] fix --- paimon-flink/paimon-flink-common/pom.xml | 13 ---- .../org/apache/paimon/hive/HiveCatalog.java | 63 ++++++++++++++++++ .../paimon/hive/HiveCatalogFactory.java | 66 +------------------ 3 files changed, 65 insertions(+), 77 deletions(-) diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 73482b442244..c5a7d32a765c 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -210,19 +210,6 @@ under the License. ${flink.version} test - - - org.apache.hive - hive-exec - ${hive.version} - test - - - * - * - - - diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 31cf8a7cf913..731c0935854c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -21,12 +21,15 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.Lock; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.options.OptionsUtils; import org.apache.paimon.schema.Schema; @@ -39,6 +42,7 @@ import org.apache.flink.table.hive.LegacyHiveClasses; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; @@ -58,6 +62,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -71,8 +76,12 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; +import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; +import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; +import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; @@ -615,6 +624,60 @@ public static boolean isEmbeddedMetastore(HiveConf hiveConf) { return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); } + public static Catalog createHiveCatalog(CatalogContext context) { + HiveConf hiveConf = createHiveConf(context); + String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE); + if (warehouseStr == null) { + warehouseStr = + hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal); + } + Path warehouse = new Path(warehouseStr); + Path uri = + warehouse.toUri().getScheme() == null + ? new Path(FileSystem.getDefaultUri(hiveConf)) + : warehouse; + FileIO fileIO; + try { + fileIO = FileIO.get(uri, context); + fileIO.checkOrMkdirs(warehouse); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return new HiveCatalog( + fileIO, + hiveConf, + context.options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS), + context.options(), + warehouse.toUri().toString()); + } + + public static HiveConf createHiveConf(CatalogContext context) { + String uri = context.options().get(CatalogOptions.URI); + String hiveConfDir = context.options().get(HIVE_CONF_DIR); + String hadoopConfDir = context.options().get(HADOOP_CONF_DIR); + HiveConf hiveConf = + HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, context.hadoopConf()); + + // always using user-set parameters overwrite hive-site.xml parameters + context.options().toMap().forEach(hiveConf::set); + if (uri != null) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + } + + if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) { + LOG.error( + "Can't find hive metastore uri to connect: " + + " either set " + + CatalogOptions.URI.key() + + " for paimon " + + IDENTIFIER + + " catalog or set hive.metastore.uris in hive-site.xml or hadoop configurations." + + " Will use empty metastore uris, which means we may use a embedded metastore. The may cause unpredictable consensus problem."); + } + + return hiveConf; + } + /** * Returns a new Hadoop Configuration object using the path to the hadoop conf configured. * diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java index 0acbc01f17f6..cc51df265095 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogFactory.java @@ -21,24 +21,12 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; -import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.UncheckedIOException; - -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; -import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; -import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; /** Factory to create {@link HiveCatalog}. */ @@ -46,7 +34,7 @@ public class HiveCatalogFactory implements CatalogFactory { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogFactory.class); - private static final ConfigOption METASTORE_CLIENT_CLASS = + public static final ConfigOption METASTORE_CLIENT_CLASS = ConfigOptions.key("metastore.client.class") .stringType() .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient") @@ -62,56 +50,6 @@ public String identifier() { @Override public Catalog create(CatalogContext context) { - HiveConf hiveConf = createHiveConf(context); - String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE); - if (warehouseStr == null) { - warehouseStr = - hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal); - } - Path warehouse = new Path(warehouseStr); - Path uri = - warehouse.toUri().getScheme() == null - ? new Path(FileSystem.getDefaultUri(hiveConf)) - : warehouse; - FileIO fileIO; - try { - fileIO = FileIO.get(uri, context); - fileIO.checkOrMkdirs(warehouse); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - return new HiveCatalog( - fileIO, - hiveConf, - context.options().get(METASTORE_CLIENT_CLASS), - context.options(), - warehouse.toUri().toString()); - } - - private static HiveConf createHiveConf(CatalogContext context) { - String uri = context.options().get(CatalogOptions.URI); - String hiveConfDir = context.options().get(HIVE_CONF_DIR); - String hadoopConfDir = context.options().get(HADOOP_CONF_DIR); - HiveConf hiveConf = - HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, context.hadoopConf()); - - // always using user-set parameters overwrite hive-site.xml parameters - context.options().toMap().forEach(hiveConf::set); - if (uri != null) { - hiveConf.set(ConfVars.METASTOREURIS.varname, uri); - } - - if (hiveConf.get(ConfVars.METASTOREURIS.varname) == null) { - LOG.error( - "Can't find hive metastore uri to connect: " - + " either set " - + CatalogOptions.URI.key() - + " for paimon " - + IDENTIFIER - + " catalog or set hive.metastore.uris in hive-site.xml or hadoop configurations." - + " Will use empty metastore uris, which means we may use a embedded metastore. The may cause unpredictable consensus problem."); - } - - return hiveConf; + return HiveCatalog.createHiveCatalog(context); } } From 258a2c153623fd83905c27af8c481816ab28f54b Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 1 Dec 2023 11:29:38 +0800 Subject: [PATCH 5/5] doc uri too --- docs/content/engines/hive.md | 2 +- docs/content/how-to/creating-catalogs.md | 2 +- docs/content/migration/upsert-to-partitioned.md | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md index 74b164086e4d..a210099e8230 100644 --- a/docs/content/engines/hive.md +++ b/docs/content/engines/hive.md @@ -100,7 +100,7 @@ Execute the following Flink SQL script in Flink SQL client to define a Paimon Hi CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', - 'uri' = 'thrift://:', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index 5bea97850bc4..19f11a2a9cf0 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -103,7 +103,7 @@ hadoop-conf-dir parameter to the hive-site.xml file path. CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', - 'uri' = 'thrift://:', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment -- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf diff --git a/docs/content/migration/upsert-to-partitioned.md b/docs/content/migration/upsert-to-partitioned.md index 74a547f839f3..6c5ccd10a972 100644 --- a/docs/content/migration/upsert-to-partitioned.md +++ b/docs/content/migration/upsert-to-partitioned.md @@ -51,7 +51,7 @@ fully compatible with Hive. CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', - 'uri' = 'thrift://:', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf @@ -112,7 +112,7 @@ need to query the latest data. Therefore, Paimon provides a preview feature: CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', - 'uri' = 'thrift://:', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment -- 'warehouse' = 'hdfs:///path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf