Skip to content

Commit

Permalink
once client support submit multiple jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Sep 13, 2024
1 parent fab1cae commit 34f643a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object LinkisJobBuilder {
private var threadPool: ScheduledThreadPoolExecutor = Utils.defaultScheduler
private var serverUrl: String = _

private var authTokenValue: String = CommonVars[String](
var authTokenValue: String = CommonVars[String](
"wds.linkis.client.test.common.tokenValue",
"LINKIS_CLI_TEST"
).getValue // This is the default authToken, we usually suggest set different ones for users.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,37 @@

package org.apache.linkis.computation.client

import org.apache.linkis.computation.client.interactive.InteractiveJob
import org.apache.linkis.computation.client.once.OnceJob
import org.apache.linkis.bml.client.BmlClientFactory
import org.apache.linkis.computation.client.interactive.{InteractiveJob, InteractiveJobBuilder}
import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob}
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder}
import org.apache.linkis.httpclient.dws.config.DWSClientConfig
import org.apache.linkis.ujes.client.UJESClientImpl

import java.io.Closeable

class LinkisJobClient(clientConfig: DWSClientConfig) extends Closeable {

private val ujseClient = new UJESClientImpl(clientConfig)

private lazy val linkisManagerCLient = LinkisManagerClient(ujseClient)

override def close(): Unit = {
if (null != linkisManagerCLient) {
linkisManagerCLient.close()
}
}

def onceJobBuilder(): SimpleOnceJobBuilder =
SimpleOnceJob.builder(SimpleOnceJobBuilder.getBmlClient(clientConfig), linkisManagerCLient)

def interactiveJobBuilder(): InteractiveJobBuilder = {
val builder = InteractiveJob.builder()
builder.setUJESClient(ujseClient)
}

}

/**
* This class is only used to provide a unified entry for user to build a LinkisJob conveniently and
* simply. Please keep this class lightweight enough, do not set too many field to confuse user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.linkis.computation.client.once.simple

import org.apache.linkis.bml.client.BmlClient
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobMetrics
import org.apache.linkis.computation.client.job.AbstractSubmittableLinkisJob
import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob, SubmittableOnceJob}
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
import org.apache.linkis.computation.client.once.result.CreateEngineConnResult
import org.apache.linkis.computation.client.operator.OnceJobOperator

import java.util.Locale
Expand Down Expand Up @@ -109,15 +109,13 @@ class SubmittableSimpleOnceJob(
with AbstractSubmittableLinkisJob {

private var ecmServiceInstance: ServiceInstance = _
private var createEngineConnResult: CreateEngineConnResult = _

def getECMServiceInstance: ServiceInstance = ecmServiceInstance
def getCreateEngineConnResult: CreateEngineConnResult = createEngineConnResult

override protected def doSubmit(): Unit = {
logger.info(s"Ready to create a engineConn: ${createEngineConnAction.getRequestPayload}.")
createEngineConnResult = linkisManagerClient.createEngineConn(createEngineConnAction)
lastNodeInfo = createEngineConnResult.getNodeInfo
val nodeInfo = linkisManagerClient.createEngineConn(createEngineConnAction)
lastNodeInfo = nodeInfo.getNodeInfo
serviceInstance = getServiceInstance(lastNodeInfo)
ticketId = getTicketId(lastNodeInfo)
ecmServiceInstance = getECMServiceInstance(lastNodeInfo)
Expand Down Expand Up @@ -160,6 +158,11 @@ object SimpleOnceJob {

def builder(): SimpleOnceJobBuilder = new SimpleOnceJobBuilder

def builder(
bmlClient: BmlClient,
linkisManagerClient: LinkisManagerClient
): SimpleOnceJobBuilder = new SimpleOnceJobBuilder(bmlClient, linkisManagerClient)

/**
* Build a submitted SimpleOnceJob by id and user.
* @param id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.linkis.computation.client.once.simple
import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobBuilder
import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig
import org.apache.linkis.computation.client.once.LinkisManagerClient
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
import org.apache.linkis.computation.client.once.simple.SimpleOnceJobBuilder._
Expand All @@ -28,6 +29,8 @@ import org.apache.linkis.governance.common.entity.job.OnceExecutorContent
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils.BmlResource
import org.apache.linkis.httpclient.dws.DWSHttpClient
import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy
import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.ujes.client.exception.UJESJobException
Expand All @@ -38,12 +41,19 @@ import java.util
import scala.collection.convert.WrapAsJava._
import scala.collection.convert.WrapAsScala._

class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[SubmittableSimpleOnceJob] {
class SimpleOnceJobBuilder private[simple] (
private val bmlClient: BmlClient,
private val linkisManagerClient: LinkisManagerClient
) extends LinkisJobBuilder[SubmittableSimpleOnceJob] {

private var createService: String = _
private var maxSubmitTime: Long = _
private var description: String = _

def this() = {
this(null, null)
}

def setCreateService(createService: String): this.type = {
this.createService = createService
this
Expand All @@ -69,10 +79,26 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab
val contentMap = OnceExecutorContentUtils.contentToMap(onceExecutorContent)
val bytes = DWSHttpClient.jacksonJson.writeValueAsBytes(contentMap)
val response =
getBmlClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes))
getThisBMLClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes))
OnceExecutorContentUtils.resourceToValue(BmlResource(response.resourceId, response.version))
}

protected def getThisBMLClient(): BmlClient = {
if (null == this.bmlClient) {
getBmlClient(LinkisJobBuilder.getDefaultClientConfig)
} else {
this.bmlClient
}
}

protected def getThisLinkisManagerClient(): LinkisManagerClient = {
if (null == this.linkisManagerClient) {
getLinkisManagerClient
} else {
this.linkisManagerClient
}
}

override def build(): SubmittableSimpleOnceJob = {
ensureNotNull(labels, "labels")
ensureNotNull(jobContent, "jobContent")
Expand All @@ -99,7 +125,7 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab
.setMaxSubmitTime(maxSubmitTime)
.setDescription(description)
.build()
new SubmittableSimpleOnceJob(getLinkisManagerClient, createEngineConnAction)
new SubmittableSimpleOnceJob(getThisLinkisManagerClient, createEngineConnAction)
}

implicit def toMap(map: util.Map[String, Any]): util.Map[String, String] = map.map {
Expand Down Expand Up @@ -128,10 +154,27 @@ object SimpleOnceJobBuilder {
private var bmlClient: BmlClient = _
private var linkisManagerClient: LinkisManagerClient = _

def getBmlClient: BmlClient = {
def getBmlClient(clientConfig: DWSClientConfig): BmlClient = {
if (bmlClient == null) synchronized {
if (bmlClient == null) {
bmlClient = BmlClientFactory.createBmlClient(LinkisJobBuilder.getDefaultClientConfig)
val newClientConfig = DWSClientConfigBuilder
.newBuilder()
.addServerUrl(clientConfig.getServerUrl)
.connectionTimeout(clientConfig.getConnectTimeout)
.discoveryEnabled(clientConfig.isDiscoveryEnabled)
.loadbalancerEnabled(clientConfig.isLoadbalancerEnabled)
.maxConnectionSize(clientConfig.getMaxConnection)
.retryEnabled(clientConfig.isRetryEnabled)
.setRetryHandler(clientConfig.getRetryHandler)
.readTimeout(
clientConfig.getReadTimeout
) // We think 90s is enough, if SocketTimeoutException is throw, just set a new clientConfig to modify it.
.setAuthenticationStrategy(new TokenAuthenticationStrategy())
.setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY)
.setAuthTokenValue(LinkisJobBuilder.authTokenValue)
.setDWSVersion(clientConfig.getDWSVersion)
.build()
bmlClient = BmlClientFactory.createBmlClient(newClientConfig)
Utils.addShutdownHook(() => bmlClient.close())
}
}
Expand Down

0 comments on commit 34f643a

Please sign in to comment.