Skip to content

Commit

Permalink
[hive] In hiveCatalog we do not need catalog.warehouse config
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyboy123 committed Nov 29, 2023
1 parent 686a6eb commit a0cde26
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -40,13 +41,10 @@
@Public
public interface CatalogFactory extends Factory {

Catalog create(FileIO fileIO, Path warehouse, CatalogContext context);
Catalog create(@Nullable FileIO fileIO, @Nullable Path warehouse, CatalogContext context);

static Path warehouse(CatalogContext context) {
String warehouse =
Preconditions.checkNotNull(
context.options().get(WAREHOUSE),
"Paimon '" + WAREHOUSE.key() + "' path must be set");
String warehouse = context.options().get(WAREHOUSE);
return new Path(warehouse);
}

Expand All @@ -59,18 +57,26 @@ static Catalog createCatalog(CatalogContext options) {
}

static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) {
String metastore = context.options().get(METASTORE);
// manual validation
// because different catalog types may have different options
// we can't list them all in the optionalOptions() method
String warehouse = warehouse(context).toUri().toString();

String metastore = context.options().get(METASTORE);
String warehouse = null;
try {
warehouse = warehouse(context).toUri().toString();
} catch (IllegalArgumentException e) {
// do nothing, because catalog warehouse is null
}
CatalogFactory catalogFactory =
FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, metastore);

Path warehousePath = new Path(warehouse);
FileIO fileIO;

Path warehousePath = null;
try {
warehousePath = new Path(warehouse);
} catch (IllegalArgumentException e) {
// do nothing, because warehouse is null
}
FileIO fileIO = null;
try {
fileIO = FileIO.get(warehousePath, context);
if (fileIO.exists(warehousePath)) {
Expand All @@ -84,8 +90,9 @@ static Catalog createCatalog(CatalogContext context, ClassLoader classLoader) {
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (NullPointerException e) {
// do nothing, because warehousePath is null
}

return catalogFactory.create(fileIO, warehousePath, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.TableType;
import org.apache.paimon.utils.Preconditions;

import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;

/** Factory to create {@link FileSystemCatalog}. */
public class FileSystemCatalogFactory implements CatalogFactory {
Expand All @@ -36,6 +38,8 @@ public String identifier() {

@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
Preconditions.checkNotNull(
warehouse, "Paimon '" + WAREHOUSE.key() + "' path must be set");
if (!TableType.MANAGED.equals(context.options().get(TABLE_TYPE))) {
throw new IllegalArgumentException(
"Only managed table is supported in File system catalog.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,29 +556,7 @@ public static HiveConf createHiveConf(
if (isNullOrWhitespaceOnly(hiveConfDir)) {
hiveConfDir = possibleHiveConfPath();
}
if (isNullOrWhitespaceOnly(hadoopConfDir)) {
hadoopConfDir = possibleHadoopConfPath();
}

// create HiveConf from hadoop configuration with hadoop conf directory configured.
Configuration hadoopConf = null;
if (!isNullOrWhitespaceOnly(hadoopConfDir)) {
hadoopConf = getHadoopConfiguration(hadoopConfDir);
if (hadoopConf == null) {
String possiableUsedConfFiles =
"core-site.xml | hdfs-site.xml | yarn-site.xml | mapred-site.xml";
throw new RuntimeException(
"Failed to load the hadoop conf from specified path:" + hadoopConfDir,
new FileNotFoundException(
"Please check the path none of the conf files ("
+ possiableUsedConfFiles
+ ") exist in the folder."));
}
}
if (hadoopConf == null) {
hadoopConf = new Configuration();
}

Configuration hadoopConf = createHadoopConfiguration(hadoopConfDir);
LOG.info("Setting hive conf dir as {}", hiveConfDir);
if (hiveConfDir != null) {
// ignore all the static conf file URLs that HiveConf may have set
Expand Down Expand Up @@ -615,6 +593,36 @@ public static HiveConf createHiveConf(
}
}

public static Configuration createHadoopConfiguration(@Nullable String hadoopConfDir) {

LOG.info("Setting hadoop conf dir as {}", hadoopConfDir);

if (isNullOrWhitespaceOnly(hadoopConfDir)) {
hadoopConfDir = possibleHadoopConfPath();
}

// create HiveConf from hadoop configuration with hadoop conf directory configured.
Configuration hadoopConf = null;
if (!isNullOrWhitespaceOnly(hadoopConfDir)) {
hadoopConf = getHadoopConfiguration(hadoopConfDir);
if (hadoopConf == null) {
String possiableUsedConfFiles =
"core-site.xml | hdfs-site.xml | yarn-site.xml | mapred-site.xml";
throw new RuntimeException(
"Failed to load the hadoop conf from specified path:" + hadoopConfDir,
new FileNotFoundException(
"Please check the path none of the conf files ("
+ possiableUsedConfFiles
+ ") exist in the folder."));
}
}
if (hadoopConf == null) {
hadoopConf = new Configuration();
}

return hadoopConf;
}

public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,21 @@
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.utils.StringUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;

import static org.apache.paimon.hive.HiveCatalog.createHadoopConfiguration;
import static org.apache.paimon.hive.HiveCatalog.createHiveConf;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
Expand Down Expand Up @@ -61,7 +71,7 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
String hiveConfDir = context.options().get(HIVE_CONF_DIR);
String hadoopConfDir = context.options().get(HADOOP_CONF_DIR);
HiveConf hiveConf = createHiveConf(hiveConfDir, hadoopConfDir);

Configuration hadoopConfiguration = createHadoopConfiguration(hadoopConfDir);
// always using user-set parameters overwrite hive-site.xml parameters
context.options().toMap().forEach(hiveConf::set);
if (uri != null) {
Expand All @@ -80,8 +90,36 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
}

String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);

if (Objects.isNull(warehouse)) {
warehouse = getHiveWarehouse(hiveConf, hadoopConfiguration);
}
try {
if (Objects.isNull(fileIO)) {
fileIO = FileIO.get(warehouse, context);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return new HiveCatalog(
fileIO, hiveConf, clientClassName, context.options(), warehouse.toUri().toString());
}

private Path getHiveWarehouse(HiveConf hiveConf, Configuration hadoopConfiguration) {
String warehouse = hiveConf.get(ConfVars.METASTOREWAREHOUSE.varname);
// if hive.metastore.uris not found, set local file as default conf
if (StringUtils.isNullOrWhitespaceOnly(warehouse)) {
warehouse = "/tmp/paimon/warehouse";
}
try {
if (new Path(warehouse).toUri().getScheme() == null) {
warehouse =
FileSystem.get(new URI(warehouse), hadoopConfiguration).getUri().toString();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
return new Path(warehouse);
}
}

0 comments on commit a0cde26

Please sign in to comment.