From 34f643ae448e4364e426c80401674b56977f5134 Mon Sep 17 00:00:00 2001 From: peacewong Date: Fri, 13 Sep 2024 20:33:10 +0800 Subject: [PATCH] once client support submit multiple jobs --- .../computation/client/LinkisJobBuilder.scala | 2 +- .../computation/client/LinkisJobClient.scala | 30 ++++++++++- .../client/once/simple/SimpleOnceJob.scala | 13 +++-- .../once/simple/SimpleOnceJobBuilder.scala | 53 +++++++++++++++++-- 4 files changed, 85 insertions(+), 13 deletions(-) diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala index 9cc2863559..eff8411603 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala @@ -174,7 +174,7 @@ object LinkisJobBuilder { private var threadPool: ScheduledThreadPoolExecutor = Utils.defaultScheduler private var serverUrl: String = _ - private var authTokenValue: String = CommonVars[String]( + var authTokenValue: String = CommonVars[String]( "wds.linkis.client.test.common.tokenValue", "LINKIS_CLI_TEST" ).getValue // This is the default authToken, we usually suggest set different ones for users. diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala index d44c479abb..80e8e7ad42 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala @@ -17,11 +17,37 @@ package org.apache.linkis.computation.client -import org.apache.linkis.computation.client.interactive.InteractiveJob -import org.apache.linkis.computation.client.once.OnceJob +import org.apache.linkis.bml.client.BmlClientFactory +import org.apache.linkis.computation.client.interactive.{InteractiveJob, InteractiveJobBuilder} +import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob} +import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder} +import org.apache.linkis.httpclient.dws.config.DWSClientConfig +import org.apache.linkis.ujes.client.UJESClientImpl import java.io.Closeable +class LinkisJobClient(clientConfig: DWSClientConfig) extends Closeable { + + private val ujseClient = new UJESClientImpl(clientConfig) + + private lazy val linkisManagerCLient = LinkisManagerClient(ujseClient) + + override def close(): Unit = { + if (null != linkisManagerCLient) { + linkisManagerCLient.close() + } + } + + def onceJobBuilder(): SimpleOnceJobBuilder = + SimpleOnceJob.builder(SimpleOnceJobBuilder.getBmlClient(clientConfig), linkisManagerCLient) + + def interactiveJobBuilder(): InteractiveJobBuilder = { + val builder = InteractiveJob.builder() + builder.setUJESClient(ujseClient) + } + +} + /** * This class is only used to provide a unified entry for user to build a LinkisJob conveniently and * simply. Please keep this class lightweight enough, do not set too many field to confuse user. diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala index 4992b17814..13d96c238a 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala @@ -17,13 +17,13 @@ package org.apache.linkis.computation.client.once.simple +import org.apache.linkis.bml.client.BmlClient import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.utils.Utils import org.apache.linkis.computation.client.LinkisJobMetrics import org.apache.linkis.computation.client.job.AbstractSubmittableLinkisJob import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob, SubmittableOnceJob} import org.apache.linkis.computation.client.once.action.CreateEngineConnAction -import org.apache.linkis.computation.client.once.result.CreateEngineConnResult import org.apache.linkis.computation.client.operator.OnceJobOperator import java.util.Locale @@ -109,15 +109,13 @@ class SubmittableSimpleOnceJob( with AbstractSubmittableLinkisJob { private var ecmServiceInstance: ServiceInstance = _ - private var createEngineConnResult: CreateEngineConnResult = _ def getECMServiceInstance: ServiceInstance = ecmServiceInstance - def getCreateEngineConnResult: CreateEngineConnResult = createEngineConnResult override protected def doSubmit(): Unit = { logger.info(s"Ready to create a engineConn: ${createEngineConnAction.getRequestPayload}.") - createEngineConnResult = linkisManagerClient.createEngineConn(createEngineConnAction) - lastNodeInfo = createEngineConnResult.getNodeInfo + val nodeInfo = linkisManagerClient.createEngineConn(createEngineConnAction) + lastNodeInfo = nodeInfo.getNodeInfo serviceInstance = getServiceInstance(lastNodeInfo) ticketId = getTicketId(lastNodeInfo) ecmServiceInstance = getECMServiceInstance(lastNodeInfo) @@ -160,6 +158,11 @@ object SimpleOnceJob { def builder(): SimpleOnceJobBuilder = new SimpleOnceJobBuilder + def builder( + bmlClient: BmlClient, + linkisManagerClient: LinkisManagerClient + ): SimpleOnceJobBuilder = new SimpleOnceJobBuilder(bmlClient, linkisManagerClient) + /** * Build a submitted SimpleOnceJob by id and user. * @param id diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala index dc4451ff0f..d7c4746188 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala @@ -20,6 +20,7 @@ package org.apache.linkis.computation.client.once.simple import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory} import org.apache.linkis.common.utils.Utils import org.apache.linkis.computation.client.LinkisJobBuilder +import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig import org.apache.linkis.computation.client.once.LinkisManagerClient import org.apache.linkis.computation.client.once.action.CreateEngineConnAction import org.apache.linkis.computation.client.once.simple.SimpleOnceJobBuilder._ @@ -28,6 +29,8 @@ import org.apache.linkis.governance.common.entity.job.OnceExecutorContent import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils.BmlResource import org.apache.linkis.httpclient.dws.DWSHttpClient +import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy +import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder} import org.apache.linkis.manager.label.constant.LabelKeyConstant import org.apache.linkis.protocol.utils.TaskUtils import org.apache.linkis.ujes.client.exception.UJESJobException @@ -38,12 +41,19 @@ import java.util import scala.collection.convert.WrapAsJava._ import scala.collection.convert.WrapAsScala._ -class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[SubmittableSimpleOnceJob] { +class SimpleOnceJobBuilder private[simple] ( + private val bmlClient: BmlClient, + private val linkisManagerClient: LinkisManagerClient +) extends LinkisJobBuilder[SubmittableSimpleOnceJob] { private var createService: String = _ private var maxSubmitTime: Long = _ private var description: String = _ + def this() = { + this(null, null) + } + def setCreateService(createService: String): this.type = { this.createService = createService this @@ -69,10 +79,26 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab val contentMap = OnceExecutorContentUtils.contentToMap(onceExecutorContent) val bytes = DWSHttpClient.jacksonJson.writeValueAsBytes(contentMap) val response = - getBmlClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes)) + getThisBMLClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes)) OnceExecutorContentUtils.resourceToValue(BmlResource(response.resourceId, response.version)) } + protected def getThisBMLClient(): BmlClient = { + if (null == this.bmlClient) { + getBmlClient(LinkisJobBuilder.getDefaultClientConfig) + } else { + this.bmlClient + } + } + + protected def getThisLinkisManagerClient(): LinkisManagerClient = { + if (null == this.linkisManagerClient) { + getLinkisManagerClient + } else { + this.linkisManagerClient + } + } + override def build(): SubmittableSimpleOnceJob = { ensureNotNull(labels, "labels") ensureNotNull(jobContent, "jobContent") @@ -99,7 +125,7 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab .setMaxSubmitTime(maxSubmitTime) .setDescription(description) .build() - new SubmittableSimpleOnceJob(getLinkisManagerClient, createEngineConnAction) + new SubmittableSimpleOnceJob(getThisLinkisManagerClient, createEngineConnAction) } implicit def toMap(map: util.Map[String, Any]): util.Map[String, String] = map.map { @@ -128,10 +154,27 @@ object SimpleOnceJobBuilder { private var bmlClient: BmlClient = _ private var linkisManagerClient: LinkisManagerClient = _ - def getBmlClient: BmlClient = { + def getBmlClient(clientConfig: DWSClientConfig): BmlClient = { if (bmlClient == null) synchronized { if (bmlClient == null) { - bmlClient = BmlClientFactory.createBmlClient(LinkisJobBuilder.getDefaultClientConfig) + val newClientConfig = DWSClientConfigBuilder + .newBuilder() + .addServerUrl(clientConfig.getServerUrl) + .connectionTimeout(clientConfig.getConnectTimeout) + .discoveryEnabled(clientConfig.isDiscoveryEnabled) + .loadbalancerEnabled(clientConfig.isLoadbalancerEnabled) + .maxConnectionSize(clientConfig.getMaxConnection) + .retryEnabled(clientConfig.isRetryEnabled) + .setRetryHandler(clientConfig.getRetryHandler) + .readTimeout( + clientConfig.getReadTimeout + ) // We think 90s is enough, if SocketTimeoutException is throw, just set a new clientConfig to modify it. + .setAuthenticationStrategy(new TokenAuthenticationStrategy()) + .setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY) + .setAuthTokenValue(LinkisJobBuilder.authTokenValue) + .setDWSVersion(clientConfig.getDWSVersion) + .build() + bmlClient = BmlClientFactory.createBmlClient(newClientConfig) Utils.addShutdownHook(() => bmlClient.close()) } }