diff --git a/linkis-commons/linkis-httpclient/pom.xml b/linkis-commons/linkis-httpclient/pom.xml index 473b591a08..da075dd194 100644 --- a/linkis-commons/linkis-httpclient/pom.xml +++ b/linkis-commons/linkis-httpclient/pom.xml @@ -42,31 +42,6 @@ httpmime ${httpmime.version} - - - org.json4s - json4s-jackson_${scala.binary.version} - ${json4s.version} - - - org.scala-lang - scala-library - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-core - - - - diff --git a/linkis-commons/linkis-module/pom.xml b/linkis-commons/linkis-module/pom.xml index b70331e0d1..a660dd7b5c 100644 --- a/linkis-commons/linkis-module/pom.xml +++ b/linkis-commons/linkis-module/pom.xml @@ -399,22 +399,6 @@ io.netty netty-all - - org.json4s - json4s-jackson_${scala.binary.version} - ${json4s.version} - - - org.scala-lang - scala-library - - - com.fasterxml.jackson.core - jackson-databind - - - - redis.clients jedis diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/JavaCollectionSerializer.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/JavaCollectionSerializer.scala deleted file mode 100644 index 5c9e163689..0000000000 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/JavaCollectionSerializer.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.transform - -import org.apache.linkis.server.BDPJettyServerHelper - -import org.json4s.{CustomSerializer, JArray, JObject} -import org.json4s.jackson.JsonMethods.parse -import org.json4s.jackson.Serialization.write - -// TODO is now only the simplest implementation, and there is a need to optimize it later.(TODO 现在只做最简单的实现,后续有需要再优化) - -object JavaCollectionSerializer - extends CustomSerializer[java.util.List[_]](implicit formats => - ( - { case j: JArray => - BDPJettyServerHelper.gson.fromJson(write(j), classOf[java.util.List[_]]) - }, - { case list: java.util.List[_] => - parse(BDPJettyServerHelper.gson.toJson(list)) - } - ) - ) - -object JavaMapSerializer - extends CustomSerializer[java.util.Map[_, _]](implicit formats => - ( - { case j: JObject => - BDPJettyServerHelper.gson.fromJson(write(j), classOf[java.util.Map[_, _]]) - }, - { case map: java.util.Map[_, _] => - parse(BDPJettyServerHelper.gson.toJson(map)) - } - ) - ) diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCFormats.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCFormats.scala deleted file mode 100644 index 4e94584a37..0000000000 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCFormats.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.transform - -import org.json4s.Serializer - -trait RPCFormats { - - def getSerializers: Array[Serializer[_]] - -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCProduct.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCProduct.scala index a2b9ae95ec..b4f4bd759d 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCProduct.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCProduct.scala @@ -17,7 +17,6 @@ package org.apache.linkis.rpc.transform -import org.apache.linkis.DataWorkCloudApplication import org.apache.linkis.common.utils.Logging import org.apache.linkis.protocol.message.RequestProtocol import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary @@ -28,13 +27,6 @@ import org.apache.linkis.server.{EXCEPTION_MSG, Message} import org.apache.commons.lang3.ClassUtils -import java.lang.reflect.{ParameterizedType, Type} -import java.util - -import scala.collection.JavaConverters.mapAsScalaMapConverter - -import org.json4s.{DefaultFormats, Formats, Serializer} - private[linkis] trait RPCProduct { def toMessage(t: Any): Message @@ -52,21 +44,10 @@ private[linkis] object RPCProduct extends Logging { private[rpc] val CLASS_VALUE = "rpc_object_class" private[rpc] val OBJECT_VALUE = "rpc_object_value" - private[rpc] implicit var formats: Formats = - DefaultFormats + JavaCollectionSerializer + JavaMapSerializer - private var serializerClasses: List[Class[_]] = List.empty private val rpcProduct: RPCProduct = new RPCProduct { - private val rpcFormats = - DataWorkCloudApplication.getApplicationContext.getBeansOfType(classOf[RPCFormats]) - - if (rpcFormats != null && !rpcFormats.isEmpty) { - val serializers = rpcFormats.asScala.map(_._2.getSerializers).toArray.flatMap(_.iterator) - setFormats(serializers) - } - override def toMessage(t: Any): Message = { if (t == null) { throw new DWCURIException( @@ -105,29 +86,6 @@ private[linkis] object RPCProduct extends Logging { } - private[rpc] def setFormats(serializer: Array[Serializer[_]]): Unit = { - this.formats = (serializer :+ JavaCollectionSerializer :+ JavaMapSerializer).foldLeft( - DefaultFormats.asInstanceOf[Formats] - )(_ + _) - serializerClasses = formats.customSerializers - .map(s => getActualTypeClass(s.getClass.getGenericSuperclass)) - .filter(_ != null) ++: List(classOf[util.List[_]], classOf[util.Map[_, _]]) - logger.info( - "RPC Serializers: " + this.formats.customSerializers - .map(_.getClass.getSimpleName) + ", serializerClasses: " + - "" + serializerClasses - ) - } - - private def getActualTypeClass(classType: Type): Class[_] = classType match { - case p: ParameterizedType => - val params = p.getActualTypeArguments - if (params == null || params.isEmpty) null - else getActualTypeClass(params(0)) - case c: Class[_] => c - case _ => null - } - private[rpc] def isRequestProtocol(obj: Any): Boolean = obj.isInstanceOf[RequestProtocol] private[rpc] def isScalaClass(obj: Any): Boolean = diff --git a/linkis-commons/linkis-rpc/src/test/scala/org/apache/linkis/rpc/RPCFormatsTest.scala b/linkis-commons/linkis-rpc/src/test/scala/org/apache/linkis/rpc/RPCFormatsTest.scala deleted file mode 100644 index fbaf1b9b22..0000000000 --- a/linkis-commons/linkis-rpc/src/test/scala/org/apache/linkis/rpc/RPCFormatsTest.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc - -import org.apache.linkis.rpc.transform.{JavaCollectionSerializer, JavaMapSerializer} - -import org.apache.commons.lang3.ClassUtils - -import java.lang.reflect.ParameterizedType -import java.util - -import org.json4s.{CustomSerializer, DefaultFormats, Extraction} -import org.json4s.JsonAST.JObject -import org.json4s.JsonDSL._ -import org.json4s.jackson.Serialization -import org.json4s.reflect.ManifestFactory - -object RPCFormatsTest { - - trait ResultResource - class AvailableResource(val ticketId: String) extends ResultResource - - object ResultResourceSerializer - extends CustomSerializer[ResultResource](implicit formats => - ( - { case JObject(List(("AvailableResource", JObject(List(("ticketId", ticketId)))))) => - new AvailableResource(ticketId.extract[String]) - }, - { case r: AvailableResource => - ("AvailableResource", ("ticketId", Extraction.decompose(r.ticketId))) - } - ) - ) - - def testRPC1(args: Array[String]): Unit = { - implicit val formats = DefaultFormats + ResultResourceSerializer - val serializerClasses = formats.customSerializers - .map(_.getClass.getGenericSuperclass match { - case p: ParameterizedType => - val params = p.getActualTypeArguments - if (params == null || params.isEmpty) null - else params(0).asInstanceOf[Class[_]] - }) - .filter(_ != null) - val a = new AvailableResource("aaa") - val str = Serialization.write(a) - println(str) - val clazz = classOf[AvailableResource] - println(serializerClasses) - val realClass1 = serializerClasses.find(ClassUtils.isAssignable(clazz, _)) - println(realClass1) - val realClass = realClass1.getOrElse(clazz) - val obj = Serialization.read(str)(formats, ManifestFactory.manifestOf(realClass)) - println(obj) - println(classOf[Array[_]].getClass.getName) - } - - case class TestCollection1(a: String, list: java.util.List[String]) - case class TestCollection2(a: String, list: java.util.Map[String, Integer]) - - def testRPC2(args: Array[String]): Unit = { - implicit val formats = DefaultFormats + JavaCollectionSerializer + JavaMapSerializer - // val a = TestCollection1("1", new util.ArrayList[String]()) - val a = TestCollection2("1", new util.HashMap[String, Integer]()) - // a.list.add("1111") - a.list.put("1111", 2) - val str = Serialization.write(a) - println(str) - val realClass = classOf[TestCollection2] - val obj = Serialization.read(str)(formats, ManifestFactory.manifestOf(realClass)) - println(obj) - } - - def main(args: Array[String]): Unit = { - testRPC2(args) - } - -} diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index bb8f80edce..388f0f9582 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -56,20 +56,6 @@ protobuf-java ${protobuf.version} - - - org.json4s - json4s-jackson_${scala.binary.version} - ${json4s.version} - provided - - - org.scala-lang - scala-library - - - - org.springframework spring-core diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/MethodEntity.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/MethodEntity.scala index ec9cf4f15f..fac0a2d01b 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/MethodEntity.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/domain/MethodEntity.scala @@ -20,8 +20,6 @@ package org.apache.linkis.storage.domain import java.lang.reflect.Type import com.google.gson.GsonBuilder -import org.json4s.DefaultFormats -import org.json4s.jackson.Serialization.write /** * @param id @@ -58,8 +56,6 @@ case class MethodEntity( object MethodEntitySerializer { - implicit val formats = DefaultFormats - import org.json4s.jackson.JsonMethods._ val gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create /** @@ -67,14 +63,14 @@ object MethodEntitySerializer { * @param code * @return */ - def deserializer(code: String): MethodEntity = parse(code).extract[MethodEntity] + def deserializer(code: String): MethodEntity = gson.fromJson(code, classOf[MethodEntity]) /** * Serialize MethodEntity to code 序列化MethodEntity为code * @param methodEntity * @return */ - def serializer(methodEntity: MethodEntity): String = write(methodEntity) + def serializer(methodEntity: MethodEntity): String = gson.toJson(methodEntity) /** * Serialize a java object as a string 序列化java对象为字符串 diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/pom.xml b/linkis-computation-governance/linkis-client/linkis-computation-client/pom.xml index 36090c566a..612b9bd2a8 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/pom.xml +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/pom.xml @@ -69,10 +69,6 @@ org.apache.linkis linkis-hadoop-common - - org.json4s - json4s-jackson_${scala.binary.version} - diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/JobProgressResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/JobProgressResult.scala index 0cf163c286..79efe20050 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/JobProgressResult.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/JobProgressResult.scala @@ -17,6 +17,7 @@ package org.apache.linkis.ujes.client.response +import org.apache.linkis.common.utils.JsonUtils import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult import org.apache.linkis.protocol.engine.JobProgressInfo @@ -24,9 +25,6 @@ import java.util import scala.collection.JavaConverters._ -import org.json4s._ -import org.json4s.jackson.Serialization._ - @DWSHttpMessageResult("/api/rest_j/v\\d+/entrance/(\\S+)/progress") class JobProgressResult extends UJESJobResult { @@ -34,15 +32,20 @@ class JobProgressResult extends UJESJobResult { private var progressInfo: util.List[util.Map[String, AnyRef]] = _ private var progressInfos: Array[JobProgressInfo] = _ - private implicit val formats = DefaultFormats - def setProgress(progress: Float): Unit = this.progress = progress def getProgress: Float = progress def setProgressInfo(progressInfo: util.List[util.Map[String, AnyRef]]): Unit = { this.progressInfo = progressInfo - progressInfos = - progressInfo.asScala.map(map => read[JobProgressInfo](write(map.asScala.toMap))).toArray + progressInfos = progressInfo.asScala + .map(map => + JsonUtils.jackson + .readValue( + JsonUtils.jackson.writeValueAsString(map, classOf[Map[String, Any]]), + classOf[JobProgressInfo] + ) + ) + .toArray } def getProgressInfo: Array[JobProgressInfo] = progressInfos diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml index 9f46630b66..cf560f270f 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml @@ -73,13 +73,6 @@ provided - - org.json4s - json4s-jackson_${scala.binary.version} - ${json4s.version} - provided - - diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala index eb9206c963..4fcddf4c35 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala @@ -48,7 +48,7 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener request } - private def getLabelsFromArgs(params: Array[String]): util.Map[String, AnyRef] = { + def getLabelsFromArgs(params: Array[String]): util.Map[String, AnyRef] = { import scala.collection.JavaConverters._ val labelRegex = """label\.(.+)\.(.+)=(.+)""".r val labels = new util.HashMap[String, AnyRef]() @@ -61,7 +61,7 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener labels } - private def getEMRegiterResourceFromConfiguration: NodeResource = { + def getEMRegiterResourceFromConfiguration: NodeResource = { val maxResource = new LoadInstanceResource( ECMUtils.inferDefaultMemory(), ECM_MAX_CORES_AVAILABLE, diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterServiceTest.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterServiceTest.java new file mode 100644 index 0000000000..baf058d0c8 --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterServiceTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.ecm.server.service.impl; + +import org.apache.linkis.common.ServiceInstance; +import org.apache.linkis.manager.common.protocol.em.RegisterEMRequest; +import org.apache.linkis.manager.label.constant.LabelKeyConstant; +import org.apache.linkis.rpc.serializer.ProtostuffSerializeUtil; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import static org.apache.linkis.manager.label.conf.LabelCommonConfig.ENGINE_CONN_MANAGER_SPRING_NAME; + +public class DefaultECMRegisterServiceTest { + @Test + void testECM() { + DefaultECMRegisterService defaultECMRegisterService = new DefaultECMRegisterService(); + RegisterEMRequest request = new RegisterEMRequest(); + ServiceInstance instance = new ServiceInstance(); + instance.setInstance("127.0.0.1:9001"); + instance.setApplicationName("ecm"); + request.setUser("hadoop"); + request.setServiceInstance(instance); + request.setAlias(instance.getApplicationName()); + + Map labels = new HashMap<>(); + labels.put( + LabelKeyConstant.SERVER_ALIAS_KEY, + Collections.singletonMap("alias", ENGINE_CONN_MANAGER_SPRING_NAME)); + request.setLabels(defaultECMRegisterService.getLabelsFromArgs(null)); + request.setNodeResource(defaultECMRegisterService.getEMRegiterResourceFromConfiguration()); + String res = ProtostuffSerializeUtil.serialize(request); + ProtostuffSerializeUtil.deserialize(res, RegisterEMRequest.class); + } +} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml b/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml index 99f4613974..2cc2788b91 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml @@ -27,6 +27,10 @@ linkis-application-manager + + com.jayway.jsonpath + json-path + org.apache.linkis @@ -94,14 +98,6 @@ ${project.version} provided - - - org.json4s - json4s-jackson_${scala.binary.version} - ${json4s.version} - provided - - com.google.code.gson gson diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/rpc/ManagerRPCFormats.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/rpc/ManagerRPCFormats.scala deleted file mode 100644 index c710cdd303..0000000000 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/rpc/ManagerRPCFormats.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.manager.am.rpc - -import org.apache.linkis.manager.common.entity.resource.ResourceSerializer -import org.apache.linkis.manager.common.serializer.NodeResourceSerializer -import org.apache.linkis.manager.rm.ResultResourceSerializer -import org.apache.linkis.rpc.transform.RPCFormats - -import org.springframework.stereotype.Component - -import org.json4s.Serializer - -@Component -class ManagerRPCFormats extends RPCFormats { - - override def getSerializers: Array[Serializer[_]] = - Array(ResultResourceSerializer, ResourceSerializer, NodeResourceSerializer) - -} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala index d6ceccef96..576af4517a 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala @@ -305,11 +305,10 @@ class DefaultEngineCreateService resource, timeout ) match { - case AvailableResource(ticketId) => - (ticketId, resource) + case AvailableResource(ticketId) => (ticketId, resource) case NotEnoughResource(reason) => - logger.warn(s"not engough resource: $reason") - throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, s"not engough resource: : $reason") + logger.warn(s"not enough resource: $reason") + throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, s"not enough resource: : $reason") } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala index aa4aa5ce05..08c71bf31d 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala @@ -23,10 +23,8 @@ import org.apache.linkis.manager.common.entity.node.{EMNode, EngineNode} import org.apache.linkis.manager.common.entity.resource.{ DriverAndYarnResource, Resource, - ResourceSerializer, ResourceType } -import org.apache.linkis.manager.common.serializer.NodeResourceSerializer import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel import org.apache.linkis.server.BDPJettyServerHelper @@ -35,14 +33,11 @@ import java.util import scala.collection.JavaConverters._ import com.google.gson.JsonObject -import org.json4s.DefaultFormats -import org.json4s.jackson.Serialization.write object AMUtils { lazy val GSON = BDPJettyServerHelper.gson - implicit val formats = DefaultFormats + ResourceSerializer + NodeResourceSerializer val mapper = BDPJettyServerHelper.jacksonJson def copyToEMVo(EMNodes: Array[EMNode]): util.ArrayList[EMNodeVo] = { @@ -60,26 +55,32 @@ object AMUtils { if (node.getNodeResource.getMaxResource != null) { EMNodeVo.setMaxResource( mapper - .readValue(write(node.getNodeResource.getMaxResource), classOf[util.Map[String, Any]]) + .readValue( + mapper.writeValueAsString(node.getNodeResource.getMaxResource), + classOf[util.Map[String, Any]] + ) ) } if (node.getNodeResource.getMinResource != null) { EMNodeVo.setMinResource( mapper - .readValue(write(node.getNodeResource.getMinResource), classOf[util.Map[String, Any]]) + .readValue( + mapper.writeValueAsString(node.getNodeResource.getMinResource), + classOf[util.Map[String, Any]] + ) ) } if (node.getNodeResource.getUsedResource != null) { EMNodeVo.setUsedResource( mapper.readValue( - write(node.getNodeResource.getUsedResource), + mapper.writeValueAsString(node.getNodeResource.getUsedResource), classOf[util.Map[String, Any]] ) ) } else { EMNodeVo.setUsedResource( mapper.readValue( - write(Resource.initResource(ResourceType.Default)), + mapper.writeValueAsString(Resource.initResource(ResourceType.Default)), classOf[util.Map[String, Any]] ) ) @@ -87,7 +88,7 @@ object AMUtils { if (node.getNodeResource.getLockedResource != null) { EMNodeVo.setLockedResource( mapper.readValue( - write(node.getNodeResource.getLockedResource), + mapper.writeValueAsString(node.getNodeResource.getLockedResource), classOf[util.Map[String, Any]] ) ) @@ -95,7 +96,7 @@ object AMUtils { if (node.getNodeResource.getExpectedResource != null) { EMNodeVo.setExpectedResource( mapper.readValue( - write(node.getNodeResource.getExpectedResource), + mapper.writeValueAsString(node.getNodeResource.getExpectedResource), classOf[util.Map[String, Any]] ) ) @@ -103,7 +104,7 @@ object AMUtils { if (node.getNodeResource.getLeftResource != null) { EMNodeVo.setLeftResource( mapper.readValue( - write(node.getNodeResource.getLeftResource), + mapper.writeValueAsString(node.getNodeResource.getLeftResource), classOf[util.Map[String, Any]] ) ) @@ -167,7 +168,7 @@ object AMUtils { if (!node.getLabels.isEmpty) { val engineTypeLabel = - node.getLabels.asScala.find(_.isInstanceOf[EngineTypeLabel]).getOrElse(null) + node.getLabels.asScala.find(_.isInstanceOf[EngineTypeLabel]).orNull if (engineTypeLabel != null) { AMEngineNodeVo.setEngineType( engineTypeLabel.asInstanceOf[EngineTypeLabel] getEngineType @@ -197,12 +198,13 @@ object AMUtils { case _ => node.getNodeResource.getUsedResource } AMEngineNodeVo.setUsedResource( - mapper.readValue(write(realResource), classOf[util.Map[String, Any]]) + mapper + .readValue(mapper.writeValueAsString(realResource), classOf[util.Map[String, Any]]) ) } else { AMEngineNodeVo.setUsedResource( mapper.readValue( - write(Resource.initResource(ResourceType.Default)), + mapper.writeValueAsString(Resource.initResource(ResourceType.Default)), classOf[util.Map[String, Any]] ) ) @@ -210,7 +212,7 @@ object AMUtils { if (node.getNodeResource.getLockedResource != null) { AMEngineNodeVo.setLockedResource( mapper.readValue( - write(node.getNodeResource.getLockedResource), + mapper.writeValueAsString(node.getNodeResource.getLockedResource), classOf[util.Map[String, Any]] ) ) @@ -218,7 +220,7 @@ object AMUtils { if (node.getNodeResource.getExpectedResource != null) { AMEngineNodeVo.setExpectedResource( mapper.readValue( - write(node.getNodeResource.getExpectedResource), + mapper.writeValueAsString(node.getNodeResource.getExpectedResource), classOf[util.Map[String, Any]] ) ) @@ -226,7 +228,7 @@ object AMUtils { if (node.getNodeResource.getLeftResource != null) { AMEngineNodeVo.setLeftResource( mapper.readValue( - write(node.getNodeResource.getLeftResource), + mapper.writeValueAsString(node.getNodeResource.getLeftResource), classOf[util.Map[String, Any]] ) ) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala index d7b3349cc7..4ba592d5ba 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala @@ -17,7 +17,7 @@ package org.apache.linkis.manager.rm.external.yarn -import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.common.utils.{JsonUtils, Logging, Utils} import org.apache.linkis.manager.common.conf.RMConfiguration import org.apache.linkis.manager.common.entity.resource.{ CommonNodeResource, @@ -41,6 +41,8 @@ import org.apache.http.client.methods.HttpGet import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder.json + import java.text.MessageFormat import java.util import java.util.Base64 @@ -49,9 +51,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.json4s.JsonAST._ -import org.json4s.JValue -import org.json4s.jackson.JsonMethods.parse +import com.jayway.jsonpath.JsonPath +import com.jayway.jsonpath.ReadContext class YarnResourceRequester extends ExternalResourceRequester with Logging { @@ -77,31 +78,27 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { logger.info(s"rmWebAddress: $rmWebAddress") val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName - def getYarnResource(jValue: Option[JValue]) = jValue.map(r => - new YarnResource( - (r \ "memory").asInstanceOf[JInt].values.toLong * 1024L * 1024L, - (r \ "vCores").asInstanceOf[JInt].values.toInt, - 0, - queueName - ) - ) - - def maxEffectiveHandle(queueValue: Option[JValue]): Option[YarnResource] = { + def maxEffectiveHandle(queueValue: Option[Any]): Option[YarnResource] = { val metrics = getResponseByUrl("metrics", rmWebAddress) - val totalResouceInfoResponse = ( - (metrics \ "clusterMetrics" \ "totalMB").asInstanceOf[JInt].values.toLong, - (metrics \ "clusterMetrics" \ "totalVirtualCores").asInstanceOf[JInt].values.toLong - ) + val ctx = JsonPath.parse(metrics) + val totalMB = ctx.read("$.clusterMetrics.totalMB").asInstanceOf[Long] + val totalVirtualCores = + ctx.read("$.clusterMetrics.totalVirtualCores").asInstanceOf[Long] + val totalResouceInfoResponse = (totalMB, totalVirtualCores) + queueValue.map(r => { - val absoluteCapacity = r \ "absoluteCapacity" match { - case jDecimal: JDecimal => - jDecimal.values.toDouble - case jDouble: JDouble => - jDouble.values - case _ => + val absoluteCapacity = JsonPath.read(r, "$.absoluteCapacity") + + val effectiveResource = { + if (absoluteCapacity.isInstanceOf[BigDecimal]) { + absoluteCapacity.asInstanceOf[BigDecimal].toDouble + } else if (absoluteCapacity.isInstanceOf[Double]) { + absoluteCapacity.asInstanceOf[Double] + } else { 0d + } } - val effectiveResource = absoluteCapacity + new YarnResource( math .floor(effectiveResource * totalResouceInfoResponse._1 * 1024L * 1024L / 100) @@ -115,77 +112,96 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { var realQueueName = "root." + queueName - def getQueue(queues: JValue): Option[JValue] = queues match { - case JArray(queue) => - queue.foreach { q => - val yarnQueueName = (q \ "queueName").asInstanceOf[JString].values + def getQueue(queues: Any): Option[Any] = { + if (queues.isInstanceOf[List[Any]]) { + queues.asInstanceOf[List[Any]].foreach { q => + val yarnQueueName = JsonPath.read(q, "$.queueName").asInstanceOf[String] if (yarnQueueName == realQueueName) return Some(q) else if (realQueueName.startsWith(yarnQueueName + ".")) { return getQueue(getChildQueues(q)) } } None - case JObject(queue) => + } else if (queues.isInstanceOf[Map[Any, Any]]) { if ( - queue + queues + .asInstanceOf[Map[Any, Any]] .find(_._1 == "queueName") - .exists(_._2.asInstanceOf[JString].values == realQueueName) + .exists(_._2.toString == realQueueName) ) { Some(queues) } else { - val childQueues = queue.find(_._1 == "childQueues") + val childQueues = queues.asInstanceOf[Map[Any, Any]].find(_._1 == "childQueues") if (childQueues.isEmpty) None else getQueue(childQueues.map(_._2).get) } - case _ => None + } else { + None + } } - def getChildQueues(resp: JValue): JValue = { - val queues = resp \ "childQueues" \ "queue" + def getChildQueues(resp: Any): Any = { + val ctx = JsonPath.parse(resp) + val childQueuesValue = ctx.read("$.childQueues") + val queues = + ctx.read("$.childQueues.queue").asInstanceOf[List[Any]] - if ( - queues != null && queues != JNull && queues != JNothing && null != queues.children && queues.children.nonEmpty - ) { + if (queues != null && queues.nonEmpty) { logger.info(s"queues:$queues") queues - } else resp \ "childQueues" + } else childQueuesValue } - def getQueueOfCapacity(queues: JValue): Option[JValue] = queues match { - case JArray(queue) => - queue.foreach { q => - val yarnQueueName = (q \ "queueName").asInstanceOf[JString].values + def getQueueOfCapacity(queues: Any): Option[Any] = { + if (queues.isInstanceOf[List[Any]]) { + queues.asInstanceOf[List[Any]].foreach { q => + val ctx = JsonPath.parse(q) + val yarnQueueName = ctx.read("$.queueName").asInstanceOf[String] + val queuesValue = ctx.read("$.queues").asInstanceOf[String] + if (yarnQueueName == realQueueName) return Some(q) - else if ((q \ "queues").toOption.nonEmpty) { + else if (queuesValue.nonEmpty) { val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(q)) if (matchQueue.nonEmpty) return matchQueue } } None - case JObject(queue) => + } else if (queues.isInstanceOf[Map[Any, Any]]) { + val queuesValue = JsonPath.read(queues, "$.queues").asInstanceOf[String] if ( - queue + queues + .asInstanceOf[Map[Any, Any]] .find(_._1 == "queueName") - .exists(_._2.asInstanceOf[JString].values == realQueueName) + .exists(_._2.toString == realQueueName) ) { return Some(queues) - } else if ((queues \ "queues").toOption.nonEmpty) { - val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues)) + } else if (queuesValue.nonEmpty) { + val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues.toString)) if (matchQueue.nonEmpty) return matchQueue } None - case _ => None + } else { + None + } } - def getChildQueuesOfCapacity(resp: JValue) = resp \ "queues" \ "queue" + def getChildQueuesOfCapacity(resp: Any) = { + JsonPath.read(resp, "$.queues.queue").asInstanceOf[String] + } def getResources() = { val resp = getResponseByUrl("scheduler", rmWebAddress) + val ctx = JsonPath.parse(resp) + val schedulerInfoValue = + ctx.read("$.scheduler.schedulerInfo").asInstanceOf[String] val schedulerType = - (resp \ "scheduler" \ "schedulerInfo" \ "type").asInstanceOf[JString].values + ctx.read("$.scheduler.schedulerInfo.type").asInstanceOf[String] + val rootQueueValue = + ctx.read("$.scheduler.schedulerInfo.rootQueue").asInstanceOf[String] + if ("capacityScheduler".equals(schedulerType)) { realQueueName = queueName - val childQueues = getChildQueuesOfCapacity(resp \ "scheduler" \ "schedulerInfo") + val childQueues = getChildQueuesOfCapacity(schedulerInfoValue) val queue = getQueueOfCapacity(childQueues) if (queue.isEmpty) { logger.debug(s"cannot find any information about queue $queueName, response: " + resp) @@ -194,9 +210,15 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc, queueName) ) } - (maxEffectiveHandle(queue).get, getYarnResource(queue.map(_ \ "resourcesUsed")).get) + + val resourceCtx = JsonPath.parse(queue) + val usedMemory = resourceCtx.read("$.resourcesUsed.memory").asInstanceOf[Long] + val usedvCores = resourceCtx.read("$.resourcesUsed.vCores").asInstanceOf[Int] + val resourcesUsed = new YarnResource(usedMemory * 1024L * 1024L, usedvCores, 0, queueName) + + (maxEffectiveHandle(queue).get, resourcesUsed) } else if ("fairScheduler".equals(schedulerType)) { - val childQueues = getChildQueues(resp \ "scheduler" \ "schedulerInfo" \ "rootQueue") + val childQueues = getChildQueues(rootQueueValue) val queue = getQueue(childQueues) if (queue.isEmpty) { logger.debug(s"cannot find any information about queue $queueName, response: " + resp) @@ -205,10 +227,18 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc, queueName) ) } - ( - getYarnResource(queue.map(_ \ "maxResources")).get, - getYarnResource(queue.map(_ \ "usedResources")).get - ) + val resourceCtx = JsonPath.parse(queue) + val maxResourceMemory = resourceCtx.read("$.maxResources.memory").asInstanceOf[Long] + val maxResourcevCores = resourceCtx.read("$.maxResources.vCores").asInstanceOf[Int] + val maxResources = + new YarnResource(maxResourceMemory * 1024L * 1024L, maxResourcevCores, 0, queueName) + + val usedResourceMemory = resourceCtx.read("$.usedResources.memory").asInstanceOf[Long] + val usedResourcevCores = resourceCtx.read("$.usedResources.vCores").asInstanceOf[Int] + val usedResourcesUsed = + new YarnResource(usedResourceMemory * 1024L * 1024L, usedResourcevCores, 0, queueName) + + (maxResources, usedResourcesUsed) } else { logger.debug( s"only support fairScheduler or capacityScheduler, schedulerType: $schedulerType , response: " + resp @@ -245,38 +275,39 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { val queueName = identifier.asInstanceOf[YarnResourceIdentifier].getQueueName - def getYarnResource(jValue: Option[JValue]) = jValue.map(r => - new YarnResource( - (r \ "allocatedMB").asInstanceOf[JInt].values.toLong * 1024L * 1024L, - (r \ "allocatedVCores").asInstanceOf[JInt].values.toInt, - 0, - queueName - ) - ) - val realQueueName = "root." + queueName def getAppInfos(): Array[ExternalAppInfo] = { val resp = getResponseByUrl("apps", rmWebAddress) - resp \ "apps" \ "app" match { - case JArray(apps) => - val appInfoBuffer = new ArrayBuffer[YarnAppInfo]() - apps.foreach { app => - val yarnQueueName = (app \ "queue").asInstanceOf[JString].values - val state = (app \ "state").asInstanceOf[JString].values - if (yarnQueueName == realQueueName && (state == "RUNNING" || state == "ACCEPTED")) { - val appInfo = new YarnAppInfo( - (app \ "id").asInstanceOf[JString].values, - (app \ "user").asInstanceOf[JString].values, - state, - (app \ "applicationType").asInstanceOf[JString].values, - getYarnResource(Some(app)).get - ) - appInfoBuffer.append(appInfo) - } + val ctx = JsonPath.parse(resp) + val apps = ctx.read("$.apps") + val app = ctx.read("$.apps.app") + + if (app.isInstanceOf[List[Any]]) { + val appInfoBuffer = new ArrayBuffer[YarnAppInfo]() + apps.asInstanceOf[List[Any]].foreach { app => + val appCtx = JsonPath.parse(app) + val queueValue = appCtx.read("$.queue").asInstanceOf[String] + val stateValue = appCtx.read("$.state").asInstanceOf[String] + val idValue = appCtx.read("$.id").asInstanceOf[String] + val userValue = appCtx.read("$.user").asInstanceOf[String] + val applicationTypeValue = + appCtx.read("$.applicationType").asInstanceOf[String] + val yarnQueueName = queueValue + val allocatedMB = appCtx.read("$.allocatedMB").asInstanceOf[Long] + val allocatedVCores = appCtx.read("$.allocatedVCores").asInstanceOf[Int] + val yarnResource = + new YarnResource(allocatedMB * 1024L * 1024L, allocatedVCores, 0, queueName) + + val state = stateValue + if (yarnQueueName == realQueueName && (state == "RUNNING" || state == "ACCEPTED")) { + val appInfo = YarnAppInfo(idValue, userValue, state, applicationTypeValue, yarnResource) + appInfoBuffer.append(appInfo) } - appInfoBuffer.toArray - case _ => new ArrayBuffer[YarnAppInfo](0).toArray + } + appInfoBuffer.toArray + } else { + new ArrayBuffer[YarnAppInfo](0).toArray } } @@ -294,7 +325,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { private def getResponseByUrl(url: String, rmWebAddress: String) = { val httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url) httpGet.addHeader("Accept", "application/json") - val authorEnable: Any = this.provider.getConfigMap.get("authorEnable"); + val authorEnable: Any = this.provider.getConfigMap.get("authorEnable") var httpResponse: HttpResponse = null authorEnable match { case flag: Boolean => @@ -303,7 +334,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { } case _ => } - val kerberosEnable: Any = this.provider.getConfigMap.get("kerberosEnable"); + val kerberosEnable: Any = this.provider.getConfigMap.get("kerberosEnable") kerberosEnable match { case flag: Boolean => if (flag) { @@ -313,7 +344,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { val requestKuu = new RequestKerberosUrlUtils(principalName, keytabPath, krb5Path, false) val response = requestKuu.callRestUrl(rmWebAddress + "/ws/v1/cluster/" + url, principalName) - httpResponse = response; + httpResponse = response } else { val response = YarnResourceRequester.httpClient.execute(httpGet) httpResponse = response @@ -322,7 +353,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { val response = YarnResourceRequester.httpClient.execute(httpGet) httpResponse = response } - parse(EntityUtils.toString(httpResponse.getEntity())) + JsonUtils.jackson.readValue(EntityUtils.toString(httpResponse.getEntity()), classOf[String]) } def getAndUpdateActiveRmWebAddress(haAddress: String): String = { @@ -340,15 +371,15 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging { .split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue) .foreach(address => { Utils.tryCatch { - val response = getResponseByUrl("info", address) - response \ "clusterInfo" \ "haState" match { - case state: JString => - if (HASTATE_ACTIVE.equalsIgnoreCase(state.s)) { - activeAddress = address - } else { - logger.warn(s"Resourcemanager : ${address} haState : ${state.s}") - } - case _ => + val response = getResponseByUrl("info", address).asInstanceOf[Any] + val haState = JsonPath.read(response, "$.clusterInfo.haState") + + if (haState.isInstanceOf[String]) { + if (HASTATE_ACTIVE.equalsIgnoreCase(haState)) { + activeAddress = address + } else { + logger.warn(s"Resourcemanager : ${address} haState : ${haState}") + } } } { case e: Exception => logger.error("Get Yarn resourcemanager info error, " + e.getMessage, e) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala index 8c59f1b17a..193066995d 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala @@ -25,7 +25,6 @@ import org.apache.linkis.manager.common.entity.node.EngineNode import org.apache.linkis.manager.common.entity.resource._ import org.apache.linkis.manager.common.errorcode.ManagerCommonErrorCodeSummary._ import org.apache.linkis.manager.common.exception.RMErrorException -import org.apache.linkis.manager.common.serializer.NodeResourceSerializer import org.apache.linkis.manager.common.utils.ResourceUtils import org.apache.linkis.manager.label.builder.CombinedLabelBuilder import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext @@ -54,7 +53,6 @@ import org.apache.linkis.server.{toScalaBuffer, BDPJettyServerHelper, Message} import org.apache.linkis.server.security.SecurityFilter import org.apache.linkis.server.utils.ModuleUserUtils -import org.apache.commons.collections4.ListUtils import org.apache.commons.lang3.StringUtils import org.springframework.beans.factory.annotation.Autowired @@ -73,16 +71,13 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.databind.ObjectMapper import com.github.pagehelper.page.PageMethod import com.google.common.collect.Lists -import io.swagger.annotations.{Api, ApiImplicitParams, ApiModel, ApiOperation} -import org.json4s.DefaultFormats -import org.json4s.jackson.Serialization.write +import io.swagger.annotations.{Api, ApiOperation} @RestController @Api(tags = Array("resource management")) @RequestMapping(path = Array("/linkisManager/rm")) class RMMonitorRest extends Logging { - implicit val formats = DefaultFormats + ResourceSerializer + NodeResourceSerializer val mapper = new ObjectMapper() private val dateFormatLocal = new ThreadLocal[SimpleDateFormat]() { @@ -127,7 +122,7 @@ class RMMonitorRest extends Logging { var COMBINED_USERCREATOR_ENGINETYPE: String = _ def appendMessageData(message: Message, key: String, value: AnyRef): Message = - message.data(key, mapper.readTree(write(value))) + message.data(key, mapper.readTree(gson.toJson(value))) @ApiOperation(value = "getApplicationList", notes = "get applicationList") @RequestMapping(path = Array("applicationlist"), method = Array(RequestMethod.POST)) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultReqResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultReqResourceService.scala index 45670e2df4..5211277002 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultReqResourceService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultReqResourceService.scala @@ -17,16 +17,12 @@ package org.apache.linkis.manager.rm.service.impl -import org.apache.linkis.manager.common.entity.resource.{ResourceSerializer, ResourceType} +import org.apache.linkis.manager.common.entity.resource.ResourceType import org.apache.linkis.manager.rm.service.{LabelResourceService, RequestResourceService} -import org.json4s.DefaultFormats - class DefaultReqResourceService(labelResourceService: LabelResourceService) extends RequestResourceService(labelResourceService) { - implicit val formats = DefaultFormats + ResourceSerializer - override val resourceType: ResourceType = ResourceType.Default } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala index 6145f8bc0b..8c6266528c 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala @@ -803,7 +803,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ */ override def getResourceInfo(serviceInstances: Array[ServiceInstance]): ResourceInfo = { - val resourceInfo = new ResourceInfo(Lists.newArrayList()) + val resourceInfo = ResourceInfo(Lists.newArrayList()) serviceInstances.foreach({ serviceInstance => val rmNode = new InfoRMNode var aggregatedResource: NodeResource = null diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala index e82ff8383c..2a06cdf351 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DriverAndYarnReqResourceService.scala @@ -28,15 +28,11 @@ import org.apache.linkis.manager.rm.external.yarn.YarnResourceIdentifier import org.apache.linkis.manager.rm.service.{LabelResourceService, RequestResourceService} import org.apache.linkis.manager.rm.utils.RMUtils -import org.json4s.DefaultFormats - class DriverAndYarnReqResourceService( labelResourceService: LabelResourceService, externalResourceService: ExternalResourceService ) extends RequestResourceService(labelResourceService) { - implicit val formats = DefaultFormats + ResourceSerializer - override val resourceType: ResourceType = DriverAndYarn override def canRequest(labelContainer: RMLabelContainer, resource: NodeResource): Boolean = { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/RMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/RMUtils.scala index ca5f420693..bb300ca503 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/RMUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/RMUtils.scala @@ -17,28 +17,22 @@ package org.apache.linkis.manager.rm.utils -import org.apache.linkis.common.conf.{CommonVars, Configuration, TimeType} -import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils} +import org.apache.linkis.common.conf.{CommonVars, TimeType} +import org.apache.linkis.common.utils.{ByteTimeUtils, JsonUtils, Logging, Utils} import org.apache.linkis.manager.common.constant.RMConstant import org.apache.linkis.manager.common.entity.persistence.PersistenceResource import org.apache.linkis.manager.common.entity.resource._ -import org.apache.linkis.manager.common.serializer.NodeResourceSerializer import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.rm.conf.ResourceStatus import org.apache.linkis.manager.rm.restful.vo.UserResourceVo -import org.apache.linkis.server.BDPJettyServerHelper import java.util import scala.collection.JavaConverters.asScalaBufferConverter -import org.json4s.DefaultFormats -import org.json4s.jackson.Serialization.{read, write} - object RMUtils extends Logging { - implicit val formats = DefaultFormats + ResourceSerializer + NodeResourceSerializer - val mapper = BDPJettyServerHelper.jacksonJson + val jacksonUtil = JsonUtils.jackson val MANAGER_KILL_ENGINE_EAIT = CommonVars("wds.linkis.manager.rm.kill.engine.wait", new TimeType("30s")) @@ -65,11 +59,11 @@ object RMUtils extends Logging { val RM_RESOURCE_ACTION_RECORD = CommonVars("wds.linkis.manager.rm.resource.action.record", true) def deserializeResource(plainResource: String): Resource = { - read[Resource](plainResource) + jacksonUtil.readValue(plainResource, classOf[Resource]) } def serializeResource(resource: Resource): String = { - write(resource) + jacksonUtil.writeValueAsString(resource) } def toUserResourceVo(userResource: UserResource): UserResourceVo = { @@ -90,27 +84,42 @@ object RMUtils extends Logging { if (userResource.getId != null) userResourceVo.setId(userResource.getId) if (userResource.getUsedResource != null) { userResourceVo.setUsedResource( - mapper.readValue(write(userResource.getUsedResource), classOf[util.Map[String, Any]]) + jacksonUtil.readValue( + jacksonUtil.writeValueAsString(userResource.getUsedResource), + classOf[util.Map[String, Any]] + ) ) } if (userResource.getLeftResource != null) { userResourceVo.setLeftResource( - mapper.readValue(write(userResource.getLeftResource), classOf[util.Map[String, Any]]) + jacksonUtil.readValue( + jacksonUtil.writeValueAsString(userResource.getLeftResource), + classOf[util.Map[String, Any]] + ) ) } if (userResource.getLockedResource != null) { userResourceVo.setLockedResource( - mapper.readValue(write(userResource.getLockedResource), classOf[util.Map[String, Any]]) + jacksonUtil.readValue( + jacksonUtil.writeValueAsString(userResource.getLockedResource), + classOf[util.Map[String, Any]] + ) ) } if (userResource.getMaxResource != null) { userResourceVo.setMaxResource( - mapper.readValue(write(userResource.getMaxResource), classOf[util.Map[String, Any]]) + jacksonUtil.readValue( + jacksonUtil.writeValueAsString(userResource.getMaxResource), + classOf[util.Map[String, Any]] + ) ) } if (userResource.getMinResource != null) { userResourceVo.setMinResource( - mapper.readValue(write(userResource.getMinResource), classOf[util.Map[String, Any]]) + jacksonUtil.readValue( + jacksonUtil.writeValueAsString(userResource.getMinResource), + classOf[util.Map[String, Any]] + ) ) } if (userResource.getResourceType != null) { @@ -190,9 +199,9 @@ object RMUtils extends Logging { return null } if (firstNodeResource == null) { - return secondNodeResource.asInstanceOf[CommonNodeResource] + secondNodeResource.asInstanceOf[CommonNodeResource] } else { - return firstNodeResource.asInstanceOf[CommonNodeResource] + firstNodeResource.asInstanceOf[CommonNodeResource] } } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/pom.xml b/linkis-computation-governance/linkis-manager/linkis-manager-common/pom.xml index 08822a5c45..a3884163fc 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/pom.xml +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/pom.xml @@ -52,20 +52,6 @@ ${project.version} provided - - - org.json4s - json4s-jackson_${scala.binary.version} - ${json4s.version} - provided - - - org.scala-lang - scala-library - - - - diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/entity/resource/Resource.scala b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/entity/resource/Resource.scala index d1244a9f57..177f4cd071 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/entity/resource/Resource.scala +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/entity/resource/Resource.scala @@ -28,12 +28,10 @@ import java.text.MessageFormat import scala.collection.JavaConverters._ -import org.json4s.{CustomSerializer, DefaultFormats, Extraction} -import org.json4s.JsonAST.JObject -import org.json4s.JsonDSL._ -import org.json4s.jackson.Serialization +import com.fasterxml.jackson.annotation.JsonIgnoreProperties abstract class Resource { + def add(r: Resource): Resource def minus(r: Resource): Resource @@ -131,7 +129,9 @@ object Resource extends Logging { case class UserAvailableResource(moduleName: String, resource: Resource) +@JsonIgnoreProperties(ignoreUnknown = true) class MemoryResource(val memory: Long) extends Resource { + def this() = this(Long.MaxValue) private implicit def toMemoryResource(r: Resource): MemoryResource = r match { case t: MemoryResource => t @@ -170,7 +170,9 @@ class MemoryResource(val memory: Long) extends Resource { } +@JsonIgnoreProperties(ignoreUnknown = true) class CPUResource(val cores: Int) extends Resource { + def this() = this(Integer.MAX_VALUE) private implicit def toCPUResource(r: Resource): CPUResource = r match { case t: CPUResource => t @@ -210,7 +212,9 @@ class CPUResource(val cores: Int) extends Resource { override def toString: String = toJson } +@JsonIgnoreProperties(ignoreUnknown = true) class LoadResource(val memory: Long, val cores: Int) extends Resource { + def this() = this(Long.MaxValue, Integer.MAX_VALUE) private implicit def toLoadResource(r: Resource): LoadResource = r match { case t: LoadResource => t @@ -255,8 +259,11 @@ class LoadResource(val memory: Long, val cores: Int) extends Resource { override def toString: String = toJson } +@JsonIgnoreProperties(ignoreUnknown = true) class LoadInstanceResource(val memory: Long, val cores: Int, val instances: Int) extends Resource { + def this() = this(Long.MaxValue, Integer.MAX_VALUE, Integer.MAX_VALUE) + implicit def toLoadInstanceResource(r: Resource): LoadInstanceResource = r match { case t: LoadInstanceResource => t case l: LoadResource => new LoadInstanceResource(l.memory, l.cores, 0) @@ -308,6 +315,7 @@ class LoadInstanceResource(val memory: Long, val cores: Int, val instances: Int) } +@JsonIgnoreProperties(ignoreUnknown = true) class InstanceResource(val instances: Int) extends CPUResource(instances) { override protected def toResource(cores: Int): Resource = new InstanceResource(cores) @@ -325,6 +333,7 @@ class InstanceResource(val instances: Int) extends CPUResource(instances) { * @param queueCores * @param queueInstances */ +@JsonIgnoreProperties(ignoreUnknown = true) class YarnResource( val queueMemory: Long, val queueCores: Int, @@ -332,6 +341,7 @@ class YarnResource( val queueName: String = "default", val applicationId: String = "" ) extends Resource { + def this() = this(Long.MaxValue, Integer.MAX_VALUE, Integer.MAX_VALUE, "default") implicit def toYarnResource(r: Resource): YarnResource = r match { case t: YarnResource => t @@ -409,12 +419,18 @@ class YarnResource( } +@JsonIgnoreProperties(ignoreUnknown = true) class DriverAndYarnResource( val loadInstanceResource: LoadInstanceResource, val yarnResource: YarnResource ) extends Resource with Logging { + def this() = this( + new LoadInstanceResource(Long.MaxValue, Integer.MAX_VALUE, Integer.MAX_VALUE), + new YarnResource(Long.MaxValue, Integer.MAX_VALUE, Integer.MAX_VALUE) + ) + private implicit def DriverAndYarnResource(r: Resource): DriverAndYarnResource = r match { case t: DriverAndYarnResource => t case y: YarnResource => new DriverAndYarnResource(new LoadInstanceResource(0, 0, 0), y) @@ -556,6 +572,7 @@ class DriverAndYarnResource( } +@JsonIgnoreProperties(ignoreUnknown = true) class SpecialResource(val resources: java.util.Map[String, AnyVal]) extends Resource { def this(resources: Map[String, AnyVal]) = this(resources.asJava) @@ -754,95 +771,3 @@ class SpecialResource(val resources: java.util.Map[String, AnyVal]) extends Reso override def toJson: String = s"Special:$resources" } - -object ResourceSerializer - extends CustomSerializer[Resource](implicit formats => - ( - { - case JObject(List(("memory", memory))) => new MemoryResource(memory.extract[Long]) - case JObject(List(("cores", cores))) => new CPUResource(cores.extract[Int]) - case JObject(List(("instance", instances))) => - new InstanceResource(instances.extract[Int]) - case JObject(List(("memory", memory), ("cores", cores))) => - new LoadResource(memory.extract[Long], cores.extract[Int]) - case JObject(List(("memory", memory), ("cores", cores), ("instance", instances))) => - new LoadInstanceResource( - memory.extract[Long], - cores.extract[Int], - instances.extract[Int] - ) - case JObject( - List( - ("applicationId", applicationId), - ("queueName", queueName), - ("queueMemory", queueMemory), - ("queueCores", queueCores), - ("queueInstances", queueInstances) - ) - ) => - new YarnResource( - queueMemory.extract[Long], - queueCores.extract[Int], - queueInstances.extract[Int], - queueName.extract[String], - applicationId.extract[String] - ) - case JObject( - List( - ( - "DriverAndYarnResource", - JObject( - List( - ("loadInstanceResource", loadInstanceResource), - ("yarnResource", yarnResource) - ) - ) - ) - ) - ) => - implicit val formats = DefaultFormats - new DriverAndYarnResource( - loadInstanceResource.extract[LoadInstanceResource], - yarnResource.extract[YarnResource] - ) - case JObject(List(("resources", resources))) => - new SpecialResource(resources.extract[Map[String, AnyVal]]) - case JObject(list) => - throw new ResourceWarnException( - NOT_RESOURCE_STRING.getErrorCode, - NOT_RESOURCE_STRING.getErrorDesc + list - ) - }, - { - case m: MemoryResource => ("memory", m.memory) - case c: CPUResource => ("cores", c.cores) - case i: InstanceResource => ("instance", i.instances) - case l: LoadResource => ("memory", l.memory) ~ ("cores", l.cores) - case li: LoadInstanceResource => - ("memory", li.memory) ~ ("cores", li.cores) ~ ("instance", li.instances) - case yarn: YarnResource => - ( - "applicationId", - yarn.applicationId - ) ~ ("queueName", yarn.queueName) ~ ("queueMemory", yarn.queueMemory) ~ ("queueCores", yarn.queueCores) ~ ("queueInstances", yarn.queueInstances) - case dy: DriverAndYarnResource => - implicit val formats = DefaultFormats - ( - "DriverAndYarnResource", - new JObject( - List( - ("loadInstanceResource", Extraction.decompose(dy.loadInstanceResource)), - ("yarnResource", Extraction.decompose(dy.yarnResource)) - ) - ) - ) - case s: SpecialResource => - ("resources", Serialization.write(s.resources.asScala.toMap)) - case r: Resource => - throw new ResourceWarnException( - NOT_RESOURCE_TYPE.getErrorCode, - MessageFormat.format(NOT_RESOURCE_TYPE.getErrorDesc, r.getClass) - ) - } - ) - ) diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/serializer/NodeResourceSerializer.scala b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/serializer/NodeResourceSerializer.scala deleted file mode 100644 index e8a54ea73a..0000000000 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/serializer/NodeResourceSerializer.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.manager.common.serializer - -import org.apache.linkis.manager.common.entity.resource._ - -import org.json4s.{CustomSerializer, DefaultFormats, Extraction} -import org.json4s.JsonAST.JObject -import org.json4s.JsonDSL._ - -object NodeResourceSerializer - extends CustomSerializer[NodeResource](implicit formats => - ( - { - case JObject( - List( - ("resourceType", resourceType), - ("maxResource", maxResource), - ("minResource", minResource), - ("usedResource", usedResource), - ("lockedResource", lockedResource), - ("expectedResource", expectedResource), - ("leftResource", leftResource) - ) - ) => - val resource = new CommonNodeResource - resource.setResourceType(ResourceType.valueOf(resourceType.extract[String])) - resource.setMaxResource(maxResource.extract[Resource]) - resource.setMinResource(minResource.extract[Resource]) - resource.setUsedResource(usedResource.extract[Resource]) - resource.setLockedResource(lockedResource.extract[Resource]) - resource.setExpectedResource(expectedResource.extract[Resource]) - resource.setLeftResource(leftResource.extract[Resource]) - resource - }, - { case c: CommonNodeResource => - implicit val formats = DefaultFormats + ResourceSerializer - ("resourceType", c.getResourceType.toString) ~ - ("maxResource", Extraction.decompose(c.getMaxResource)) ~ - ("minResource", Extraction.decompose(c.getMinResource)) ~ - ("usedResource", Extraction.decompose(c.getUsedResource)) ~ - ("lockedResource", Extraction.decompose(c.getLockedResource)) ~ - ("expectedResource", Extraction.decompose(c.getExpectedResource)) ~ - ("leftResource", Extraction.decompose(c.getLeftResource)) - } - ) - ) diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/serializer/RegisterEMRequestSerializer.scala b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/serializer/RegisterEMRequestSerializer.scala deleted file mode 100644 index 223e75b767..0000000000 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/serializer/RegisterEMRequestSerializer.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.manager.common.serializer - -import org.apache.linkis.common.ServiceInstance -import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, ResourceSerializer} -import org.apache.linkis.manager.common.protocol.em.RegisterEMRequest - -import org.json4s.{CustomSerializer, DefaultFormats, Extraction} -import org.json4s.JsonAST.JObject -import org.json4s.JsonDSL._ - -object RegisterEMRequestSerializer - extends CustomSerializer[RegisterEMRequest](implicit formats => - ( - { - case JObject( - List( - ("serviceInstance", serviceInstance), - ("labels", labels), - ("nodeResource", nodeResource), - ("user", user), - ("alias", alias) - ) - ) => - val registerEMRequest = new RegisterEMRequest - registerEMRequest.setServiceInstance(serviceInstance.extract[ServiceInstance]) - registerEMRequest.setAlias(alias.extract[String]) - // registerEMRequest.setLabels(labels.extract[java.util.HashMap[String, Object]]) - registerEMRequest.setUser(user.extract[String]) - registerEMRequest.setNodeResource(nodeResource.extract[CommonNodeResource]) - registerEMRequest - }, - { case c: RegisterEMRequest => - implicit val formats = DefaultFormats + ResourceSerializer + NodeResourceSerializer - ("serviceInstance", Extraction.decompose(c.getServiceInstance)) ~ - ("labels", Extraction.decompose(c.getLabels)) ~ - ("nodeResource", Extraction.decompose(c.getNodeResource)) ~ - ("user", c.getUser) ~ - ("alias", c.getAlias) - } - ) - ) diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/utils/ResourceUtils.scala b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/utils/ResourceUtils.scala index 5968a30d66..6e89039d80 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/utils/ResourceUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/utils/ResourceUtils.scala @@ -17,22 +17,55 @@ package org.apache.linkis.manager.common.utils +import org.apache.linkis.common.utils.JsonUtils import org.apache.linkis.manager.common.entity.persistence.PersistenceResource import org.apache.linkis.manager.common.entity.resource._ -import org.json4s.DefaultFormats -import org.json4s.jackson.Serialization.{read, write} - object ResourceUtils { - implicit val formats = DefaultFormats + ResourceSerializer - - def deserializeResource(plainResource: String): Resource = { - read[Resource](plainResource) + def deserializeResource(plainResource: String, resourceType: ResourceType): Resource = { + if (resourceType.equals(ResourceType.CPU)) { + JsonUtils.jackson.readValue(plainResource, classOf[CPUResource]) + } else if (resourceType.equals(ResourceType.DriverAndYarn)) { + JsonUtils.jackson.readValue(plainResource, classOf[DriverAndYarnResource]) + } else if (resourceType.equals(ResourceType.Instance)) { + JsonUtils.jackson.readValue(plainResource, classOf[InstanceResource]) + } else if (resourceType.equals(ResourceType.LoadInstance)) { + JsonUtils.jackson.readValue(plainResource, classOf[LoadInstanceResource]) + } else if (resourceType.equals(ResourceType.Load)) { + JsonUtils.jackson.readValue(plainResource, classOf[LoadResource]) + } else if (resourceType.equals(ResourceType.Memory)) { + JsonUtils.jackson.readValue(plainResource, classOf[MemoryResource]) + } else if (resourceType.equals(ResourceType.Special)) { + JsonUtils.jackson.readValue(plainResource, classOf[SpecialResource]) + } else if (resourceType.equals(ResourceType.Yarn)) { + JsonUtils.jackson.readValue(plainResource, classOf[YarnResource]) + } else { + JsonUtils.jackson.readValue(plainResource, classOf[Resource]) + } } def serializeResource(resource: Resource): String = { - write(resource) + val resourceType = getResourceTypeByResource(resource) + if (resourceType.equals(ResourceType.CPU)) { + JsonUtils.jackson.writeValueAsString(resource, classOf[CPUResource]) + } else if (resourceType.equals(ResourceType.DriverAndYarn)) { + JsonUtils.jackson.writeValueAsString(resource, classOf[DriverAndYarnResource]) + } else if (resourceType.equals(ResourceType.Instance)) { + JsonUtils.jackson.writeValueAsString(resource, classOf[InstanceResource]) + } else if (resourceType.equals(ResourceType.LoadInstance)) { + JsonUtils.jackson.writeValueAsString(resource, classOf[LoadInstanceResource]) + } else if (resourceType.equals(ResourceType.Load)) { + JsonUtils.jackson.writeValueAsString(resource, classOf[LoadResource]) + } else if (resourceType.equals(ResourceType.Memory)) { + JsonUtils.jackson.writeValueAsString(resource, classOf[MemoryResource]) + } else if (resourceType.equals(ResourceType.Special)) { + JsonUtils.jackson.writeValueAsString(resource, classOf[SpecialResource]) + } else if (resourceType.equals(ResourceType.Yarn)) { + JsonUtils.jackson.writeValueAsString(resource, classOf[YarnResource]) + } else { + JsonUtils.jackson.writeValueAsString(resource, classOf[Resource]) + } } def toPersistenceResource(nodeResource: NodeResource): PersistenceResource = { @@ -62,24 +95,37 @@ object ResourceUtils { def fromPersistenceResource(persistenceResource: PersistenceResource): CommonNodeResource = { if (persistenceResource == null) return null val nodeResource = new CommonNodeResource + val resourceType = ResourceType.valueOf(persistenceResource.getResourceType) nodeResource.setId(persistenceResource.getId) if (persistenceResource.getMaxResource != null) { - nodeResource.setMaxResource(deserializeResource(persistenceResource.getMaxResource)) + nodeResource.setMaxResource( + deserializeResource(persistenceResource.getMaxResource, resourceType) + ) } if (persistenceResource.getMinResource != null) { - nodeResource.setMinResource(deserializeResource(persistenceResource.getMinResource)) + nodeResource.setMinResource( + deserializeResource(persistenceResource.getMinResource, resourceType) + ) } if (persistenceResource.getLockedResource != null) { - nodeResource.setLockedResource(deserializeResource(persistenceResource.getLockedResource)) + nodeResource.setLockedResource( + deserializeResource(persistenceResource.getLockedResource, resourceType) + ) } if (persistenceResource.getExpectedResource != null) { - nodeResource.setExpectedResource(deserializeResource(persistenceResource.getExpectedResource)) + nodeResource.setExpectedResource( + deserializeResource(persistenceResource.getExpectedResource, resourceType) + ) } if (persistenceResource.getLeftResource != null) { - nodeResource.setLeftResource(deserializeResource(persistenceResource.getLeftResource)) + nodeResource.setLeftResource( + deserializeResource(persistenceResource.getLeftResource, resourceType) + ) } if (persistenceResource.getUsedResource != null) { - nodeResource.setUsedResource(deserializeResource(persistenceResource.getUsedResource)) + nodeResource.setUsedResource( + deserializeResource(persistenceResource.getUsedResource, resourceType) + ) } if (persistenceResource.getCreateTime != null) { nodeResource.setCreateTime(persistenceResource.getCreateTime) @@ -87,7 +133,7 @@ object ResourceUtils { if (persistenceResource.getUpdateTime != null) { nodeResource.setUpdateTime(persistenceResource.getUpdateTime) } - nodeResource.setResourceType(ResourceType.valueOf(persistenceResource.getResourceType)) + nodeResource.setResourceType(resourceType) nodeResource } @@ -95,23 +141,37 @@ object ResourceUtils { if (persistenceResource == null) return null val nodeResource = new UserResource nodeResource.setId(persistenceResource.getId) + val resourceType = ResourceType.valueOf(persistenceResource.getResourceType) + if (persistenceResource.getMaxResource != null) { - nodeResource.setMaxResource(deserializeResource(persistenceResource.getMaxResource)) + nodeResource.setMaxResource( + deserializeResource(persistenceResource.getMaxResource, resourceType) + ) } if (persistenceResource.getMinResource != null) { - nodeResource.setMinResource(deserializeResource(persistenceResource.getMinResource)) + nodeResource.setMinResource( + deserializeResource(persistenceResource.getMinResource, resourceType) + ) } if (persistenceResource.getLockedResource != null) { - nodeResource.setLockedResource(deserializeResource(persistenceResource.getLockedResource)) + nodeResource.setLockedResource( + deserializeResource(persistenceResource.getLockedResource, resourceType) + ) } if (persistenceResource.getExpectedResource != null) { - nodeResource.setExpectedResource(deserializeResource(persistenceResource.getExpectedResource)) + nodeResource.setExpectedResource( + deserializeResource(persistenceResource.getExpectedResource, resourceType) + ) } if (persistenceResource.getLeftResource != null) { - nodeResource.setLeftResource(deserializeResource(persistenceResource.getLeftResource)) + nodeResource.setLeftResource( + deserializeResource(persistenceResource.getLeftResource, resourceType) + ) } if (persistenceResource.getUsedResource != null) { - nodeResource.setUsedResource(deserializeResource(persistenceResource.getUsedResource)) + nodeResource.setUsedResource( + deserializeResource(persistenceResource.getUsedResource, resourceType) + ) } if (persistenceResource.getCreateTime != null) { nodeResource.setCreateTime(persistenceResource.getCreateTime) @@ -119,7 +179,7 @@ object ResourceUtils { if (persistenceResource.getUpdateTime != null) { nodeResource.setUpdateTime(persistenceResource.getUpdateTime) } - nodeResource.setResourceType(ResourceType.valueOf(persistenceResource.getResourceType)) + nodeResource.setResourceType(resourceType) nodeResource } @@ -179,7 +239,7 @@ object ResourceUtils { return nodeResource } } - return nodeResource + nodeResource } /** diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/rm/ResultResource.scala b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/rm/ResultResource.scala index 017cb189ac..2818b87f8e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/rm/ResultResource.scala +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/rm/ResultResource.scala @@ -17,30 +17,8 @@ package org.apache.linkis.manager.rm -import org.json4s.{CustomSerializer, Extraction} -import org.json4s.JsonAST.JObject -import org.json4s.JsonDSL._ - trait ResultResource -case class NotEnoughResource(val reason: String = null) extends ResultResource - -case class AvailableResource(val ticketId: String) extends ResultResource +case class NotEnoughResource(var reason: String = null) extends ResultResource -object ResultResourceSerializer - extends CustomSerializer[ResultResource](implicit formats => - ( - { - case JObject(List(("NotEnoughResource", JObject(List(("reason", reason)))))) => - NotEnoughResource(reason.extract[String]) - case JObject(List(("AvailableResource", JObject(List(("ticketId", ticketId)))))) => - AvailableResource(ticketId.extract[String]) - }, - { - case r: NotEnoughResource => - ("NotEnoughResource", ("reason", Extraction.decompose(r.reason))) - case r: AvailableResource => - ("AvailableResource", ("ticketId", Extraction.decompose(r.ticketId))) - } - ) - ) +case class AvailableResource(var ticketId: String) extends ResultResource diff --git a/linkis-dist/docker/licenses/LICENSE-spark-2.4.3-bin-hadoop2.7.txt b/linkis-dist/docker/licenses/LICENSE-spark-2.4.3-bin-hadoop2.7.txt index 1346f0622e..66afaa0155 100644 --- a/linkis-dist/docker/licenses/LICENSE-spark-2.4.3-bin-hadoop2.7.txt +++ b/linkis-dist/docker/licenses/LICENSE-spark-2.4.3-bin-hadoop2.7.txt @@ -287,10 +287,6 @@ org.apache.orc:orc-mapreduce org.mortbay.jetty:jetty org.mortbay.jetty:jetty-util com.jolbox:bonecp -org.json4s:json4s-ast_2.11 -org.json4s:json4s-core_2.11 -org.json4s:json4s-jackson_2.11 -org.json4s:json4s-scalap_2.11 com.carrotsearch:hppc com.fasterxml.jackson.core:jackson-annotations com.fasterxml.jackson.core:jackson-core diff --git a/linkis-dist/release-docs/LICENSE b/linkis-dist/release-docs/LICENSE index dbb0c9f1db..a7fabe58fd 100644 --- a/linkis-dist/release-docs/LICENSE +++ b/linkis-dist/release-docs/LICENSE @@ -446,10 +446,6 @@ See licenses/ for text of these licenses. (Apache License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.13.4.1 - http://github.com/FasterXML/jackson) (Apache License, Version 2.0) jackson-module-scala (com.fasterxml.jackson.module:jackson-module-scala_2.11:2.13.4 - http://wiki.fasterxml.com/JacksonModuleScala) (Apache License, Version 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/) - (Apache License, Version 2.0) json4s-ast (org.json4s:json4s-ast_2.11:3.7.0-M11 - https://github.com/json4s/json4s) - (Apache License, Version 2.0) json4s-core (org.json4s:json4s-core_2.11:3.7.0-M11 - https://github.com/json4s/json4s) - (Apache License, Version 2.0) json4s-jackson (org.json4s:json4s-jackson_2.11:3.7.0-M11 - https://github.com/json4s/json4s) - (Apache License, Version 2.0) json4s-scalap (org.json4s:json4s-scalap_2.11:3.7.0-M11 - https://github.com/json4s/json4s) (Apache License, Version 2.0) jna (net.java.dev.jna:jna:5.6.0 - https://github.com/java-native-access/jna) (Apache License, Version 2.0) jna-platform (net.java.dev.jna:jna-platform:5.6.0 - https://github.com/java-native-access/jna) (Apache License, Version 2.0) micrometer-core (io.micrometer:micrometer-core:1.3.1 - https://github.com/micrometer-metrics/micrometer) diff --git a/linkis-engineconn-plugins/flink/pom.xml b/linkis-engineconn-plugins/flink/pom.xml index 0499e3536f..7b84a27bc0 100644 --- a/linkis-engineconn-plugins/flink/pom.xml +++ b/linkis-engineconn-plugins/flink/pom.xml @@ -364,11 +364,6 @@ org.apache.hadoop hadoop-auth - - - org.json4s - json4s-jackson_${scala.binary.version} - com.google.protobuf protobuf-java diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml index dd922de87c..4897a0c1b1 100644 --- a/linkis-engineconn-plugins/spark/pom.xml +++ b/linkis-engineconn-plugins/spark/pom.xml @@ -105,11 +105,6 @@ * - - org.json4s - json4s-jackson_${scala.binary.version} - - com.sun.jersey jersey-core diff --git a/linkis-hadoop-hdfs-client-shade/pom.xml b/linkis-hadoop-hdfs-client-shade/pom.xml index 560bbaa698..bd5e474477 100644 --- a/linkis-hadoop-hdfs-client-shade/pom.xml +++ b/linkis-hadoop-hdfs-client-shade/pom.xml @@ -239,6 +239,7 @@ org.codehaus.mojo build-helper-maven-plugin + ${maven-helper-plugin.version} compile diff --git a/pom.xml b/pom.xml index 901499a8ea..244466eece 100644 --- a/pom.xml +++ b/pom.xml @@ -136,7 +136,7 @@ 2.8.9 2.13.4.20221013 - 3.7.0-M11 + 2.7.0 1.19.4 2.23.1 @@ -395,6 +395,11 @@ + + com.jayway.jsonpath + json-path + ${jsonpath.version} + com.google.code.gson gson @@ -407,33 +412,6 @@ pom import - - org.json4s - json4s-core_${scala.binary.version} - ${json4s.version} - - - org.json4s - json4s-jackson_${scala.binary.version} - ${json4s.version} - - - com.fasterxml.jackson.core - * - - - - - org.json4s - json4s-ast_${scala.binary.version} - ${json4s.version} - - - org.json4s - json4s-scalap_${scala.binary.version} - ${json4s.version} - - com.sun.jersey jersey-client @@ -952,7 +930,6 @@ pom import - @@ -1392,7 +1369,6 @@ linkis-hadoop-hdfs-client-shade ${project.version} compile - 3.5.3 2.4.3 2.11.12 2.11 diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index 367b9a70f4..29c3a4a53f 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -315,10 +315,7 @@ jpam-1.1.jar json-0.191.jar json-0.193.jar json-1.8.jar -json4s-ast_2.11-3.7.0-M11.jar -json4s-core_2.11-3.7.0-M11.jar -json4s-jackson_2.11-3.7.0-M11.jar -json4s-scalap_2.11-3.7.0-M11.jar +json-path-2.7.0.jar jsp-api-2.1.jar jsqlparser-1.0.jar jsqlparser-4.2.jar @@ -699,10 +696,6 @@ jline-3.9.0.jar joda-time-2.10.10.jar joda-time-2.9.9.jar json-smart-2.3.1.jar -json4s-ast_2.12-3.7.0-M11.jar -json4s-core_2.12-3.7.0-M11.jar -json4s-jackson_2.12-3.7.0-M11.jar -json4s-scalap_2.12-3.7.0-M11.jar kerb-admin-1.0.1.jar kerb-client-1.0.1.jar kerb-common-1.0.1.jar