Skip to content

Commit

Permalink
hive concurrent fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aiceflower committed Dec 9, 2024
1 parent f8368d6 commit 63e8f11
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 46 deletions.
2 changes: 1 addition & 1 deletion linkis-commons/linkis-module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
<version>${springfox.version}</version>
<exclusions>
<exclusion>
<artifactId>classgraph</artifactId>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ import org.apache.hadoop.security.UserGroupInformation

import java.io.ByteArrayOutputStream
import java.security.PrivilegedExceptionAction
import java.time.Duration
import java.util
import java.util.concurrent.{
Callable,
ConcurrentHashMap,
Future,
LinkedBlockingQueue,
ThreadPoolExecutor,
TimeUnit
Expand All @@ -82,6 +84,7 @@ import java.util.concurrent.{
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -135,7 +138,11 @@ class HiveEngineConcurrentConnExecutor(
code: String
): ExecuteResponse = {
LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code")
val taskId: String = engineExecutorContext.getJobId.get
val jobIdOpt: Option[String] = engineExecutorContext.getJobId
var taskId: String = null
jobIdOpt.foreach(jid => {
taskId = jid
})
CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)

val realCode = code.trim()
Expand All @@ -158,40 +165,50 @@ class HiveEngineConcurrentConnExecutor(

val proc = CommandProcessorFactory.get(tokens, hiveConf)
LOG.debug("ugi is " + ugi.getUserName)
ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
override def run(): ExecuteResponse = {
proc match {
case any if HiveDriverProxy.isDriver(any) =>
logger.info(s"driver is $any")

val driver = new HiveDriverProxy(any)
driverCache.put(taskId, driver)
executeHQL(
engineExecutorContext.getJobId.get,
engineExecutorContext,
realCode,
driver
)
case _ =>
val resp = proc.run(realCode.substring(tokens(0).length).trim)
val result = new String(baos.toByteArray)
logger.info("RESULT => {}", result)
engineExecutorContext.appendStdout(result)
baos.reset()
if (resp.getResponseCode != 0) {
Utils.tryFinally {
val resp: ExecuteResponse = ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
override def run(): ExecuteResponse = {
proc match {
case any if HiveDriverProxy.isDriver(any) =>
logger.info(s"driver is $any")

val driver = new HiveDriverProxy(any)
if (StringUtils.isNotBlank(taskId)) {
driverCache.put(taskId, driver)
}
executeHQL(
engineExecutorContext.getJobId.get,
engineExecutorContext,
realCode,
driver
)
case _ =>
val resp = proc.run(realCode.substring(tokens(0).length).trim)
val result = new String(baos.toByteArray)
logger.info("RESULT => {}", result)
engineExecutorContext.appendStdout(result)
baos.reset()
if (resp.getResponseCode != 0) {
onComplete()
throw resp.getException
}
onComplete()
throw resp.getException
}
onComplete()
SuccessExecuteResponse()
SuccessExecuteResponse()
}
}
}
})
})
logger.info(s"HiveEngineConcurrentConnExecutor response is: ${resp}")
resp
} {
logger.info(s"HiveEngineConcurrentConnExecutor task final execute.")
}
}
}

val future = backgroundOperationPool.submit(operation)
future.get()
Utils.tryAndWarn {
val future: Future[ExecuteResponse] = backgroundOperationPool.submit(operation)
logger.info(s"${future} and status ${future.isDone}")
future.get()
}
}

def logMemoryCache(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class HiveEngineConnExecutor(
this.proc = proc
LOG.debug("ugi is " + ugi.getUserName)
Utils.tryFinally {
ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
val resp: ExecuteResponse = ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
override def run(): ExecuteResponse = {
proc match {
case any if HiveDriverProxy.isDriver(any) =>
Expand Down Expand Up @@ -222,7 +222,10 @@ class HiveEngineConnExecutor(
}
}
})
logger.info(s"HiveEngineConnExecutor response is: ${resp}")
resp
} {
logger.info(s"HiveEngineConnExecutor task final execute.")
if (this.driver != null) {
Utils.tryQuietly {
driver.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ private static String getRootPath(InputStream inputStream, String folder) throws
while ((entry = tarInput.getNextTarEntry()) != null) {
if (entry.isDirectory() && entry.getName().endsWith(delimiter)) {
rootPathStr = entry.getName().replace(folder + FsPath.SEPARATOR, "");
return rootPathStr;
return rootPathStr;
}
if (entry.getName().contains(delimiter)) {
rootPathStr = entry.getName().substring(0, entry.getName().indexOf(delimiter));
return rootPathStr;
return rootPathStr;
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -158,16 +158,16 @@ public static InputStream getZipInputStreamByTarInputStream(
return createZipFile(file.getInputStream(), packageName, rootPath);
} else {
throw new UdfException(
80038,
"The name directory "
+ packageName
+ " specified by PKG-INFO does not exist. Please confirm that the "
+ packageName
+ " specified by PKG-INFO in the package matches the actual folder name (PKG-INFO指定Name目录"
+ packageName
+ "不存在,请确认包中PKG-INFO指定"
+ packageName
+ "和实际文件夹名称一致)");
80038,
"The name directory "
+ packageName
+ " specified by PKG-INFO does not exist. Please confirm that the "
+ packageName
+ " specified by PKG-INFO in the package matches the actual folder name (PKG-INFO指定Name目录"
+ packageName
+ "不存在,请确认包中PKG-INFO指定"
+ packageName
+ "和实际文件夹名称一致)");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@
<version>${springfox.version}</version>
<exclusions>
<exclusion>
<artifactId>classgraph</artifactId>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down

0 comments on commit 63e8f11

Please sign in to comment.