diff --git a/linkis-commons/linkis-module/pom.xml b/linkis-commons/linkis-module/pom.xml index dc28af3b1b..81601b5094 100644 --- a/linkis-commons/linkis-module/pom.xml +++ b/linkis-commons/linkis-module/pom.xml @@ -47,8 +47,8 @@ ${springfox.version} - classgraph io.github.classgraph + classgraph diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala index 47496cf17a..9a70d3654b 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala @@ -70,10 +70,12 @@ import org.apache.hadoop.security.UserGroupInformation import java.io.ByteArrayOutputStream import java.security.PrivilegedExceptionAction +import java.time.Duration import java.util import java.util.concurrent.{ Callable, ConcurrentHashMap, + Future, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit @@ -82,6 +84,7 @@ import java.util.concurrent.{ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Await import com.google.common.util.concurrent.ThreadFactoryBuilder import org.slf4j.LoggerFactory @@ -135,7 +138,11 @@ class HiveEngineConcurrentConnExecutor( code: String ): ExecuteResponse = { LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code") - val taskId: String = engineExecutorContext.getJobId.get + val jobIdOpt: Option[String] = engineExecutorContext.getJobId + var taskId: String = null + jobIdOpt.foreach(jid => { + taskId = jid + }) CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf) val realCode = code.trim() @@ -158,40 +165,50 @@ class HiveEngineConcurrentConnExecutor( val proc = CommandProcessorFactory.get(tokens, hiveConf) LOG.debug("ugi is " + ugi.getUserName) - ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() { - override def run(): ExecuteResponse = { - proc match { - case any if HiveDriverProxy.isDriver(any) => - logger.info(s"driver is $any") - - val driver = new HiveDriverProxy(any) - driverCache.put(taskId, driver) - executeHQL( - engineExecutorContext.getJobId.get, - engineExecutorContext, - realCode, - driver - ) - case _ => - val resp = proc.run(realCode.substring(tokens(0).length).trim) - val result = new String(baos.toByteArray) - logger.info("RESULT => {}", result) - engineExecutorContext.appendStdout(result) - baos.reset() - if (resp.getResponseCode != 0) { + Utils.tryFinally { + val resp: ExecuteResponse = ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() { + override def run(): ExecuteResponse = { + proc match { + case any if HiveDriverProxy.isDriver(any) => + logger.info(s"driver is $any") + + val driver = new HiveDriverProxy(any) + if (StringUtils.isNotBlank(taskId)) { + driverCache.put(taskId, driver) + } + executeHQL( + engineExecutorContext.getJobId.get, + engineExecutorContext, + realCode, + driver + ) + case _ => + val resp = proc.run(realCode.substring(tokens(0).length).trim) + val result = new String(baos.toByteArray) + logger.info("RESULT => {}", result) + engineExecutorContext.appendStdout(result) + baos.reset() + if (resp.getResponseCode != 0) { + onComplete() + throw resp.getException + } onComplete() - throw resp.getException - } - onComplete() - SuccessExecuteResponse() + SuccessExecuteResponse() + } } - } - }) + }) + logger.info(s"HiveEngineConcurrentConnExecutor response is: ${resp}") + resp + } { + logger.info(s"HiveEngineConcurrentConnExecutor task final execute.") + } } } - - val future = backgroundOperationPool.submit(operation) - future.get() + Utils.tryAndWarn { + val future: Future[ExecuteResponse] = backgroundOperationPool.submit(operation) + logger.info(s"${future} and status ${future.isDone}") + future.get() + } } def logMemoryCache(): Unit = { diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index 5499cb3d62..c352e2e58f 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -192,7 +192,7 @@ class HiveEngineConnExecutor( this.proc = proc LOG.debug("ugi is " + ugi.getUserName) Utils.tryFinally { - ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() { + val resp: ExecuteResponse = ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() { override def run(): ExecuteResponse = { proc match { case any if HiveDriverProxy.isDriver(any) => @@ -222,7 +222,10 @@ class HiveEngineConnExecutor( } } }) + logger.info(s"HiveEngineConnExecutor response is: ${resp}") + resp } { + logger.info(s"HiveEngineConnExecutor task final execute.") if (this.driver != null) { Utils.tryQuietly { driver.close() diff --git a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java index fb59e6b3a1..f72ff96dfe 100644 --- a/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java +++ b/linkis-public-enhancements/linkis-udf-service/src/main/java/org/apache/linkis/udf/utils/UdfUtils.java @@ -100,11 +100,11 @@ private static String getRootPath(InputStream inputStream, String folder) throws while ((entry = tarInput.getNextTarEntry()) != null) { if (entry.isDirectory() && entry.getName().endsWith(delimiter)) { rootPathStr = entry.getName().replace(folder + FsPath.SEPARATOR, ""); - return rootPathStr; + return rootPathStr; } if (entry.getName().contains(delimiter)) { rootPathStr = entry.getName().substring(0, entry.getName().indexOf(delimiter)); - return rootPathStr; + return rootPathStr; } } } catch (Exception e) { @@ -158,16 +158,16 @@ public static InputStream getZipInputStreamByTarInputStream( return createZipFile(file.getInputStream(), packageName, rootPath); } else { throw new UdfException( - 80038, - "The name directory " - + packageName - + " specified by PKG-INFO does not exist. Please confirm that the " - + packageName - + " specified by PKG-INFO in the package matches the actual folder name (PKG-INFO指定Name目录" - + packageName - + "不存在,请确认包中PKG-INFO指定" - + packageName - + "和实际文件夹名称一致)"); + 80038, + "The name directory " + + packageName + + " specified by PKG-INFO does not exist. Please confirm that the " + + packageName + + " specified by PKG-INFO in the package matches the actual folder name (PKG-INFO指定Name目录" + + packageName + + "不存在,请确认包中PKG-INFO指定" + + packageName + + "和实际文件夹名称一致)"); } } diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/pom.xml b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/pom.xml index ce1e2d5753..e744489338 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/pom.xml +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/pom.xml @@ -76,8 +76,8 @@ ${springfox.version} - classgraph io.github.classgraph + classgraph