Skip to content

Commit

Permalink
[spark] Auto fetch metastore uri from hive-site.xml when use SparkGen…
Browse files Browse the repository at this point in the history
…ericCatalog (apache#1720)
  • Loading branch information
leaves12138 authored Aug 16, 2023
1 parent b4fd212 commit dcd43de
Show file tree
Hide file tree
Showing 9 changed files with 1,212 additions and 14 deletions.
4 changes: 4 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,12 @@ paimon-common/src/main/java/org/apache/paimon/options/ConfigOption.java
from http://flink.apache.org/ version 1.17.0

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
from http://iceberg.apache.org/ version 1.3.0

paimon-hive/paimon-hive-common/src/test/resources/hive-schema-3.1.0.derby.sql
from https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-3.1.0.derby.sql version 3.1.0

MIT License
-----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.utils.Preconditions;

import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.paimon.hive.HiveCatalog.createHiveConf;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
Expand All @@ -38,6 +39,8 @@
/** Factory to create {@link HiveCatalog}. */
public class HiveCatalogFactory implements CatalogFactory {

private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogFactory.class);

private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
ConfigOptions.key("metastore.client.class")
.stringType()
Expand All @@ -54,21 +57,27 @@ public String identifier() {

@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
String uri =
Preconditions.checkNotNull(
context.options().get(CatalogOptions.URI),
CatalogOptions.URI.key()
+ " must be set for paimon "
+ IDENTIFIER
+ " catalog");

String uri = context.options().get(CatalogOptions.URI);
String hiveConfDir = context.options().get(HIVE_CONF_DIR);
String hadoopConfDir = context.options().get(HADOOP_CONF_DIR);
HiveConf hiveConf = createHiveConf(hiveConfDir, hadoopConfDir);

// always using user-set parameters overwrite hive-site.xml parameters
context.options().toMap().forEach(hiveConf::set);
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
if (uri != null) {
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
}

if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) {
LOG.error(
"Can't find hive metastore uri to connect: "
+ " either set "
+ CatalogOptions.URI.key()
+ " for paimon "
+ IDENTIFIER
+ " catalog or set hive.metastore.uris in hive-site.xml or hadoop configurations."
+ " Will use empty metastore uris, which means we may use a embedded metastore. The may cause unpredictable consensus problem.");
}

String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);

Expand Down
16 changes: 16 additions & 0 deletions paimon-hive/paimon-hive-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,20 @@ under the License.
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

package org.apache.paimon.hive;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory;
import org.junit.Assert;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.Reader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute;
import static java.nio.file.attribute.PosixFilePermissions.fromString;

/**
* A metastore to run locally.
*
* <p>Mostly copied from iceberg.
*/
public class TestHiveMetastore {

private static final int DEFAULT_POOL_SIZE = 5;

// It's tricky to clear all static fields in an HMS instance in order to switch derby root dir.
// Therefore, we reuse the same derby root between tests and remove it after JVM exits.
private static final File HIVE_LOCAL_DIR;
private static final String DERBY_PATH;

static {
try {
HIVE_LOCAL_DIR =
createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile();
DERBY_PATH = new File(HIVE_LOCAL_DIR, "metastore_db").getPath();
File derbyLogFile = new File(HIVE_LOCAL_DIR, "derby.log");
System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath());
setupMetastoreDB("jdbc:derby:" + DERBY_PATH + ";create=true");
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
Path localDirPath =
new Path(HIVE_LOCAL_DIR.getAbsolutePath());
FileSystem fs;
try {
fs =
FileSystem.get(
localDirPath.toUri(),
new Configuration());
} catch (IOException e) {
throw new RuntimeException(e);
}
String errMsg = "Failed to delete " + localDirPath;
try {
Assert.assertTrue(
errMsg, fs.delete(localDirPath, true));
} catch (IOException e) {
throw new RuntimeException(errMsg, e);
}
}));
} catch (Exception e) {
throw new RuntimeException("Failed to setup local dir for hive metastore", e);
}
}

private HiveConf hiveConf;
private ExecutorService executorService;
private TServer server;
private HiveMetaStore.HMSHandler baseHandler;

/**
* Starts a TestHiveMetastore with the default connection pool size (5) and the default
* HiveConf.
*/
public void start() {
start(new HiveConf(new Configuration(), TestHiveMetastore.class), DEFAULT_POOL_SIZE);
}

/**
* Starts a TestHiveMetastore with a provided connection pool size and HiveConf.
*
* @param conf The hive configuration to use
* @param poolSize The number of threads in the executor pool
*/
public void start(HiveConf conf, int poolSize) {
try {
TServerSocket socket = new TServerSocket(9083);
int port = socket.getServerSocket().getLocalPort();
initConf(conf, port);

this.hiveConf = conf;
this.server = newThriftServer(socket, poolSize, hiveConf);
this.executorService = Executors.newSingleThreadExecutor();
this.executorService.submit(() -> server.serve());

// in Hive3, setting this as a system prop ensures that it will be picked up whenever a
// new
// HiveConf is created
System.setProperty(
HiveConf.ConfVars.METASTOREURIS.varname,
hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
} catch (Exception e) {
throw new RuntimeException("Cannot start TestHiveMetastore", e);
}
}

public void stop() throws Exception {
reset();
if (server != null) {
server.stop();
}
if (executorService != null) {
executorService.shutdown();
}
if (baseHandler != null) {
baseHandler.shutdown();
}
}

public void reset() throws Exception {
Path warehouseRoot = new Path(HIVE_LOCAL_DIR.getAbsolutePath());
FileSystem fs = FileSystem.get(warehouseRoot.toUri(), hiveConf);
for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
if (!fileStatus.getPath().getName().equals("derby.log")
&& !fileStatus.getPath().getName().equals("metastore_db")) {
fs.delete(fileStatus.getPath(), true);
}
}
}

private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf)
throws Exception {
HiveConf serverConf = new HiveConf(conf);
serverConf.set(
HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
"jdbc:derby:" + DERBY_PATH + ";create=true");
baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", serverConf);
IHMSHandler handler = RetryingHMSHandler.getProxy(serverConf, baseHandler, false);

TThreadPoolServer.Args args =
new TThreadPoolServer.Args(socket)
.processor(new TSetIpAddressProcessor<>(handler))
.transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.minWorkerThreads(poolSize)
.maxWorkerThreads(poolSize);

return new TThreadPoolServer(args);
}

private void initConf(HiveConf conf, int port) {
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port);
conf.set(
HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
"file:" + HIVE_LOCAL_DIR.getAbsolutePath());
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
conf.set(
HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname,
"false");
conf.set("iceberg.hive.client-pool-size", "2");
conf.set(
HiveConf.ConfVars.HIVE_IN_TEST.varname,
HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue());
}

private static void setupMetastoreDB(String dbURL) throws SQLException, IOException {
Connection connection = DriverManager.getConnection(dbURL);
ClassLoader classLoader = ClassLoader.getSystemClassLoader();
InputStream inputStream = classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql");
try (Reader reader = new InputStreamReader(inputStream)) {
runScript(connection, reader);
}
}

// This method is copied from iceberg `ScriptRunner`
private static void runScript(Connection conn, Reader reader) throws SQLException, IOException {
StringBuilder command = null;
try {
LineNumberReader lineReader = new LineNumberReader(reader);
String line;
while ((line = lineReader.readLine()) != null) {
if (command == null) {
command = new StringBuilder();
}
String trimmedLine = line.trim();
if (trimmedLine.startsWith("--")) {
// Do nothing
} else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) {
// Do nothing
} else if (trimmedLine.length() < 1 || trimmedLine.startsWith("--")) {
// Do nothing
} else if (trimmedLine.endsWith(";")) {
command.append(line.substring(0, line.lastIndexOf(";")));
command.append(" ");
Statement statement = conn.createStatement();

statement.execute(command.toString());

if (!conn.getAutoCommit()) {
conn.commit();
}
command = null;
try {
statement.close();
} catch (Exception e) {
// Ignore to workaround a bug in Jakarta DBCP
}
Thread.yield();
} else {
command.append(line);
command.append(" ");
}
}
} catch (IOException | SQLException e) {
e.fillInStackTrace();
throw e;
} finally {
conn.rollback();
}
}
}
Loading

0 comments on commit dcd43de

Please sign in to comment.