From 545f139d73f4b04ad0498807cd5d4da3d886f3ab Mon Sep 17 00:00:00 2001 From: peacewong Date: Thu, 9 May 2024 22:30:59 +0800 Subject: [PATCH] Support automatic retry for important RPC requests --- .../protocol/AbstractRetryableProtocol.java | 6 +- .../linkis/protocol/engine/EngineInfo.java | 47 ----- .../protocol/IRServiceGroupProtocol.scala | 27 --- .../linkis/protocol/RetryableProtocol.scala | 6 +- .../callback/LogCallbackProtocol.scala | 24 --- .../protocol/engine/EngineCallback.scala | 35 ---- .../engine/EngineStateTransitionRequest.scala | 27 --- .../protocol/engine/RequestEngineStatus.scala | 32 --- .../engine/RequestUserEngineKill.scala | 34 ---- .../linkis/protocol/utils/ProtocolUtils.scala | 44 ----- .../engine/RequestEngineStatusTest.scala | 44 ----- .../engine/ResponseUserEngineKillTest.scala | 35 ---- .../protocol/utils/ProtocolUtilsTest.scala | 45 ----- .../linkis/rpc/conf/CacheManualRefresher.java | 22 --- .../linkis/rpc/conf/DynamicFeignClient.java | 126 ------------ .../EurekaClientCacheManualRefresher.java | 118 ------------ .../rpc/conf/FeignRequestInterceptor.java | 58 ------ .../conf/NacosClientCacheManualRefresher.java | 33 ---- .../linkis/rpc/constant/RpcConstant.java | 4 - ...LinkisLoadBalancerClientConfiguration.java | 33 ---- .../ServiceInstancePriorityLoadBalancer.java | 182 ------------------ .../common/RetryableRPCInterceptor.scala | 10 - .../rpc/sender/SpringMVCRPCSender.scala | 5 + .../common/protocol/job/JobReqProcotol.scala | 3 +- .../common/protocol/task/RequestTask.scala | 3 +- .../protocol/task/ResponseEngineConnPid.scala | 3 +- .../protocol/task/ResponseTaskExecute.scala | 7 +- .../service/TaskExecutionServiceImpl.scala | 3 +- .../resource/EngineResourceRequest.scala | 2 +- .../http/GatewayAuthorizationFilter.java | 6 +- .../http/IpPriorityLoadBalancer.java | 4 +- 31 files changed, 28 insertions(+), 1000 deletions(-) delete mode 100644 linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala delete mode 100644 linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala delete mode 100644 linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala delete mode 100644 linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala delete mode 100644 linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java delete mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java index 3dfd166846..aa7ddece50 100644 --- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java +++ b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java @@ -21,7 +21,7 @@ public class AbstractRetryableProtocol implements RetryableProtocol { @Override public long maxPeriod() { - return 3000L; + return 30000L; } @Override @@ -31,11 +31,11 @@ public Class[] retryExceptions() { @Override public int retryNum() { - return 2; + return 5; } @Override public long period() { - return 1000L; + return 10000L; } } diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java deleted file mode 100644 index 0504ee2113..0000000000 --- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/engine/EngineInfo.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine; - -public class EngineInfo { - - private Long id; - private EngineState engineState; - - public EngineInfo() {} - - public EngineInfo(Long id, EngineState state) { - this.id = id; - this.engineState = state; - } - - public Long getId() { - return id; - } - - public void setId(Long id) { - this.id = id; - } - - public EngineState getEngineState() { - return engineState; - } - - public void setEngineState(EngineState engineState) { - this.engineState = engineState; - } -} diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala deleted file mode 100644 index 675dc0c830..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/IRServiceGroupProtocol.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol - -trait IRServiceGroupProtocol extends IRProtocol with InstanceProtocol { - val userWithCreator: UserWithCreator - - def user: String = userWithCreator.user - def creator: String = userWithCreator.creator -} - -case class UserWithCreator(user: String, creator: String) diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala index 51509d6883..6ebee7d0e2 100644 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala +++ b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala @@ -18,8 +18,8 @@ package org.apache.linkis.protocol trait RetryableProtocol extends Protocol { - def retryNum: Int = 2 - def period: Long = 1000L - def maxPeriod: Long = 3000L + def retryNum: Int = 5 + def period: Long = 10000L + def maxPeriod: Long = 30000L def retryExceptions: Array[Class[_ <: Throwable]] = Array.empty[Class[_ <: Throwable]] } diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala deleted file mode 100644 index 0109472a90..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/callback/LogCallbackProtocol.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.callback - -import org.apache.linkis.protocol.message.RequestProtocol - -case class YarnAPPIdCallbackProtocol(nodeId: String, applicationId: String) extends RequestProtocol - -case class YarnInfoCallbackProtocol(nodeId: String, uri: String) extends RequestProtocol diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala deleted file mode 100644 index 8856d3a927..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineCallback.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -object EngineCallback { - private val DWC_APPLICATION_NAME = "dwc.application.name" - private val DWC_INSTANCE = "dwc.application.instance" - - def mapToEngineCallback(options: Map[String, String]): EngineCallback = - EngineCallback(options(DWC_APPLICATION_NAME), options(DWC_INSTANCE)) - - def callbackToMap(engineCallback: EngineCallback): Map[String, String] = - Map( - DWC_APPLICATION_NAME -> engineCallback.applicationName, - DWC_INSTANCE -> engineCallback.instance - ) - -} - -case class EngineCallback(applicationName: String, instance: String) diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala deleted file mode 100644 index 9137001c14..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/EngineStateTransitionRequest.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -case class EngineStateTransitionRequest(engineInstance: String, state: String) - -case class EngineStateTransitionResponse( - engineInstance: String, - state: String, - result: Boolean, - message: String -) diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala deleted file mode 100644 index a4672aa4e5..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestEngineStatus.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -import org.apache.linkis.protocol.RetryableProtocol -import org.apache.linkis.protocol.message.RequestProtocol - -case class RequestEngineStatus(messageType: Int) extends RetryableProtocol with RequestProtocol - -object RequestEngineStatus { - val Status_Only = 1 - val Status_Overload = 2 - val Status_Concurrent = 3 - val Status_Overload_Concurrent = 4 - val Status_BasicInfo = 5 - val ALL = 6 -} diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala deleted file mode 100644 index beb7987b01..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/RequestUserEngineKill.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -import org.apache.linkis.protocol.message.RequestProtocol - -case class RequestUserEngineKill( - ticketId: String, - creator: String, - user: String, - properties: Map[String, String] -) extends RequestProtocol - -case class ResponseUserEngineKill(ticketId: String, status: String, message: String) - -object ResponseUserEngineKill { - val Success = "Success" - val Error = "Error" -} diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala deleted file mode 100644 index 1bb0791be3..0000000000 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ProtocolUtils.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.utils - -import org.apache.linkis.common.conf.CommonVars - -object ProtocolUtils { - - val SERVICE_SUFFIX = CommonVars("wds.linkis.service.suffix", "engineManager,entrance,engine") - val suffixs = SERVICE_SUFFIX.getValue.split(",") - - /** - * Pass in moduleName to return the corresponding appName 传入moduleName返回对应的appName - * @param moduleName - * module's name - * @return - * application's name - */ - def getAppName(moduleName: String): Option[String] = { - val moduleNameLower = moduleName.toLowerCase() - for (suffix <- suffixs) { - if (moduleNameLower.contains(suffix.toLowerCase())) { - return Some(moduleNameLower.replace(suffix.toLowerCase(), "")) - } - } - None - } - -} diff --git a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala deleted file mode 100644 index d9fc07b6c0..0000000000 --- a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/RequestEngineStatusTest.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -import org.junit.jupiter.api.{Assertions, DisplayName, Test} - -class RequestEngineStatusTest { - - @Test - @DisplayName("constTest") - def constTest(): Unit = { - - val statusOnly = RequestEngineStatus.Status_Only - val statusOverload = RequestEngineStatus.Status_Overload - val statusConcurrent = RequestEngineStatus.Status_Concurrent - val statusOverloadConcurrent = RequestEngineStatus.Status_Overload_Concurrent - val statusBasicInfo = RequestEngineStatus.Status_BasicInfo - val all = RequestEngineStatus.ALL - - Assertions.assertTrue(1 == statusOnly) - Assertions.assertTrue(2 == statusOverload) - Assertions.assertTrue(3 == statusConcurrent) - Assertions.assertTrue(4 == statusOverloadConcurrent) - Assertions.assertTrue(5 == statusBasicInfo) - Assertions.assertTrue(6 == all) - - } - -} diff --git a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala deleted file mode 100644 index dbf3f5e3b5..0000000000 --- a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/engine/ResponseUserEngineKillTest.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.engine - -import org.junit.jupiter.api.{Assertions, DisplayName, Test} - -class ResponseUserEngineKillTest { - - @Test - @DisplayName("constTest") - def constTest(): Unit = { - - val success = ResponseUserEngineKill.Success - val error = ResponseUserEngineKill.Error - - Assertions.assertEquals("Success", success) - Assertions.assertEquals("Error", error) - } - -} diff --git a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala b/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala deleted file mode 100644 index 2435f51497..0000000000 --- a/linkis-commons/linkis-protocol/src/test/scala/org/apache/linkis/protocol/utils/ProtocolUtilsTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.protocol.utils - -import org.junit.jupiter.api.{Assertions, DisplayName, Test} - -class ProtocolUtilsTest { - - @Test - @DisplayName("constTest") - def constTest(): Unit = { - - val serviceSuffix = ProtocolUtils.SERVICE_SUFFIX.getValue - val suffixs = ProtocolUtils.suffixs - - Assertions.assertNotNull(serviceSuffix) - Assertions.assertTrue(suffixs.length == 3) - } - - @Test - @DisplayName("getAppNameTest") - def getAppNameTest(): Unit = { - - val modeleName = "engineManager" - val appNameOption = ProtocolUtils.getAppName(modeleName) - Assertions.assertNotNull(appNameOption.get) - - } - -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java deleted file mode 100644 index dbdf52a1fc..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/CacheManualRefresher.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -public interface CacheManualRefresher { - void refresh(); -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java deleted file mode 100644 index 89eb6083b9..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -import org.apache.linkis.DataWorkCloudApplication; - -import org.apache.commons.lang3.StringUtils; - -import org.springframework.cloud.openfeign.FeignClientBuilder; -import org.springframework.cloud.openfeign.FeignClientFactoryBean; -import org.springframework.stereotype.Component; - -import java.util.concurrent.ConcurrentHashMap; - -@Component -public class DynamicFeignClient { - - private FeignClientBuilder feignClientBuilder; - - private final ConcurrentHashMap CACHE_BEAN = new ConcurrentHashMap(); - - public DynamicFeignClient() { - this.feignClientBuilder = - new FeignClientBuilder(DataWorkCloudApplication.getApplicationContext()); - } - - public T getFeignClient(final Class type, final String serviceName) { - return getFeignClient(type, serviceName, null); - } - - public T getFeignClient( - final Class type, final Class fallbackFactory, final String serviceName) { - return getFeignClient(type, fallbackFactory, serviceName, null); - } - - public T getFeignClient( - final Class type, - final FeignClientFactoryBean clientFactoryBean, - final String serviceName) { - return getFeignClient(type, clientFactoryBean, serviceName, null); - } - - public T getFeignClient(final Class type, String serviceName, final String serviceUrl) { - String k = serviceName; - if (StringUtils.isNotBlank(serviceUrl)) { - k = serviceUrl; - } - return CACHE_BEAN.computeIfAbsent( - k, - (t) -> { - FeignClientBuilder.Builder builder = - this.feignClientBuilder.forType(type, serviceName); - if (StringUtils.isNotBlank(serviceUrl)) { - builder.url(serviceUrl); - } - return builder.build(); - }); - } - - public T getFeignClient( - final Class type, - final Class fallbackFactory, - final String serviceName, - final String serviceUrl) { - String k = serviceName; - if (StringUtils.isNotBlank(serviceUrl)) { - k = serviceUrl; - } - return CACHE_BEAN.computeIfAbsent( - k, - (t) -> { - FeignClientFactoryBean feignClientFactoryBean = new FeignClientFactoryBean(); - feignClientFactoryBean.setFallbackFactory(fallbackFactory); - FeignClientBuilder.Builder builder = - this.feignClientBuilder.forType(type, feignClientFactoryBean, serviceName); - if (StringUtils.isNotBlank(serviceUrl)) { - builder.url(serviceUrl); - } - return builder.build(); - }); - } - - public T getFeignClient( - final Class type, - final FeignClientFactoryBean clientFactoryBean, - final String serviceName, - final String serviceUrl) { - String k = serviceName; - if (StringUtils.isNotBlank(serviceUrl)) { - k = serviceUrl; - } - return CACHE_BEAN.computeIfAbsent( - k, - (t) -> { - FeignClientBuilder.Builder builder = - this.feignClientBuilder.forType(type, clientFactoryBean, serviceName); - if (StringUtils.isNotBlank(serviceUrl)) { - builder.url(serviceUrl); - } - return builder.build(); - }); - } - - private T getFromCache(final String serviceName, final String serviceUrl) { - if (StringUtils.isNotEmpty(serviceUrl)) { - return CACHE_BEAN.get(serviceUrl); - } else { - return CACHE_BEAN.get(serviceName); - } - } -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java deleted file mode 100644 index 7394698672..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -import org.apache.commons.lang3.exception.ExceptionUtils; - -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.stereotype.Component; -import org.springframework.util.ReflectionUtils; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.time.Duration; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Component -@ConditionalOnClass(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") -public class EurekaClientCacheManualRefresher implements CacheManualRefresher { - private static final Logger logger = - LoggerFactory.getLogger(EurekaClientCacheManualRefresher.class); - private final AtomicBoolean isRefreshing = new AtomicBoolean(false); - private final ExecutorService refreshExecutor = Executors.newSingleThreadExecutor(); - private final String cacheRefreshTaskField = "cacheRefreshTask"; - private Object cacheRefreshTask; - - private long lastRefreshMillis = 0; - private final Duration refreshIntervalDuration = Duration.ofSeconds(3); - - @Autowired private BeanFactory beanFactory; - - public void refreshOnExceptions(Exception e, List> clazzs) { - if (null == clazzs || clazzs.size() == 0) { - throw new IllegalArgumentException(); - } - - if (clazzs.stream() - .anyMatch( - clazz -> clazz.isInstance(e) || clazz.isInstance(ExceptionUtils.getRootCause(e)))) { - refresh(); - } - } - - public void refresh() { - if (isRefreshing.compareAndSet(false, true)) { - refreshExecutor.execute( - () -> { - try { - if (System.currentTimeMillis() - <= lastRefreshMillis + refreshIntervalDuration.toMillis()) { - logger.warn( - "Not manually refresh eureka client cache as refresh interval was not exceeded:{}", - refreshIntervalDuration.getSeconds()); - return; - } - - String discoveryClientClassName = "com.netflix.discovery.DiscoveryClient"; - if (null == cacheRefreshTask) { - Class discoveryClientClass = Class.forName(discoveryClientClassName); - Field field = - ReflectionUtils.findField(discoveryClientClass, cacheRefreshTaskField); - if (null != field) { - ReflectionUtils.makeAccessible(field); - Object discoveryClient = beanFactory.getBean(discoveryClientClass); - cacheRefreshTask = ReflectionUtils.getField(field, discoveryClient); - } - } - - if (null == cacheRefreshTask) { - logger.error( - "Field ({}) not found in class '{}'", - cacheRefreshTaskField, - discoveryClientClassName); - return; - } - - lastRefreshMillis = System.currentTimeMillis(); - Class timedSupervisorTaskClass = - Class.forName("com.netflix.discovery.TimedSupervisorTask"); - Method method = timedSupervisorTaskClass.getDeclaredMethod("run"); - method.setAccessible(true); - method.invoke(cacheRefreshTask); - logger.info( - "Manually refresh eureka client cache completed(DiscoveryClient.cacheRefreshTask#run())"); - } catch (Exception e) { - logger.error("An exception occurred when manually refresh eureka client cache", e); - } finally { - isRefreshing.set(false); - } - }); - } else { - logger.warn( - "Not manually refresh eureka client cache as another thread is refreshing it already"); - } - } -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java deleted file mode 100644 index 26db20d769..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignRequestInterceptor.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -import org.apache.linkis.rpc.constant.RpcConstant; -import org.apache.linkis.server.BDPJettyServerHelper; -import org.apache.linkis.server.Message; -import org.apache.linkis.server.security.SSOUtils$; -import org.apache.linkis.server.security.SecurityFilter$; - -import java.io.UnsupportedEncodingException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import scala.Tuple2; - -import feign.RequestInterceptor; -import feign.RequestTemplate; - -public class FeignRequestInterceptor implements RequestInterceptor { - - @Override - public void apply(RequestTemplate requestTemplate) { - Map> headers = new HashMap<>(requestTemplate.headers()); - headers.put( - RpcConstant.LINKIS_LOAD_BALANCER_TYPE, - Arrays.asList(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC)); - Tuple2 userTicketKV = - SSOUtils$.MODULE$.getUserTicketKV(SecurityFilter$.MODULE$.OTHER_SYSTEM_IGNORE_UM_USER()); - headers.put(userTicketKV._1, Arrays.asList(userTicketKV._2)); - try { - String body = - new String( - requestTemplate.body(), - org.apache.linkis.common.conf.Configuration.BDP_ENCODING().getValue()); - Message message = BDPJettyServerHelper.gson().fromJson(body, Message.class); - requestTemplate.headers(headers); - } catch (UnsupportedEncodingException e) { - } - } -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java deleted file mode 100644 index 34e2357541..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/NacosClientCacheManualRefresher.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.conf; - -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.stereotype.Component; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Component -@ConditionalOnClass(name = "com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration") -public class NacosClientCacheManualRefresher implements CacheManualRefresher { - private static final Logger logger = - LoggerFactory.getLogger(NacosClientCacheManualRefresher.class); - - public void refresh() {} -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java index 3d46661de2..9fd0b81104 100644 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java +++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java @@ -19,9 +19,5 @@ public class RpcConstant { - public static final String LINKIS_LOAD_BALANCER_TYPE = "LinkisLoadBalancerType"; - - public static final String LINKIS_LOAD_BALANCER_TYPE_RPC = "RPC"; - public static final String FIXED_INSTANCE = "client-ip"; } diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java deleted file mode 100644 index 6701c0acdc..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/LinkisLoadBalancerClientConfiguration.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.loadbalancer; - -import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer; -import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; -import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; -import org.springframework.core.env.Environment; - -public class LinkisLoadBalancerClientConfiguration { - public ReactorLoadBalancer customLoadBalancer( - Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { - String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); - return new ServiceInstancePriorityLoadBalancer( - loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name); - } -} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java deleted file mode 100644 index 3623cbccf5..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/loadbalancer/ServiceInstancePriorityLoadBalancer.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.loadbalancer; - -import org.apache.linkis.rpc.conf.CacheManualRefresher; -import org.apache.linkis.rpc.conf.RPCConfiguration; -import org.apache.linkis.rpc.constant.RpcConstant; -import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary; -import org.apache.linkis.rpc.exception.NoInstanceExistsException; -import org.apache.linkis.rpc.sender.SpringCloudFeignConfigurationCache$; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.client.loadbalancer.*; -import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier; -import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer; -import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback; -import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; - -import java.text.MessageFormat; -import java.util.List; -import java.util.Objects; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -import reactor.core.publisher.Mono; - -public class ServiceInstancePriorityLoadBalancer implements ReactorServiceInstanceLoadBalancer { - - private static final Log log = LogFactory.getLog(ServiceInstancePriorityLoadBalancer.class); - - @Autowired private CacheManualRefresher cacheManualRefresher; - - private final String serviceId; - - final AtomicInteger position; - private final ObjectProvider serviceInstanceListSupplierProvider; - - private final Long maxWaitTime = - RPCConfiguration.RPC_SERVICE_REFRESH_MAX_WAIT_TIME().getValue().toLong(); - - public ServiceInstancePriorityLoadBalancer( - ObjectProvider serviceInstanceListSupplierProvider, - String serviceId) { - this(serviceInstanceListSupplierProvider, serviceId, (new Random()).nextInt(1000)); - } - - public ServiceInstancePriorityLoadBalancer( - ObjectProvider serviceInstanceListSupplierProvider, - String serviceId, - int seedPosition) { - this.serviceId = serviceId; - this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider; - this.position = new AtomicInteger(seedPosition); - } - - @Override - public Mono> choose(Request request) { - List clientIpList = - ((RequestDataContext) request.getContext()) - .getClientRequest() - .getHeaders() - .get(RpcConstant.FIXED_INSTANCE); - String clientIp = CollectionUtils.isNotEmpty(clientIpList) ? clientIpList.get(0) : null; - ServiceInstanceListSupplier supplier = - serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new); - return supplier - .get(request) - .next() - .map( - serviceInstances -> - processInstanceResponse(request, supplier, serviceInstances, clientIp)); - } - - private Response processInstanceResponse( - Request request, - ServiceInstanceListSupplier supplier, - List serviceInstances, - String clientIp) { - Response serviceInstanceResponse = - getInstanceResponse(serviceInstances, clientIp); - Long endTime = System.currentTimeMillis() + maxWaitTime; - - List linkisLoadBalancerTypeList = - ((RequestDataContext) request.getContext()) - .getClientRequest() - .getHeaders() - .get(RpcConstant.LINKIS_LOAD_BALANCER_TYPE); - String linkisLoadBalancerType = - CollectionUtils.isNotEmpty(linkisLoadBalancerTypeList) - ? linkisLoadBalancerTypeList.get(0) - : null; - - while (null == serviceInstanceResponse - && StringUtils.isNotBlank(clientIp) - && isRPC(linkisLoadBalancerType) - && System.currentTimeMillis() < endTime) { - cacheManualRefresher.refresh(); - List instances = - SpringCloudFeignConfigurationCache$.MODULE$.discoveryClient().getInstances(serviceId); - serviceInstanceResponse = getInstanceResponse(instances, clientIp); - if (null == serviceInstanceResponse) { - try { - Thread.sleep(5000L); - } catch (InterruptedException e) { - - } - } - } - - if (null == serviceInstanceResponse && StringUtils.isNotBlank(clientIp)) { - throw new NoInstanceExistsException( - LinkisRpcErrorCodeSummary.INSTANCE_NOT_FOUND_ERROR.getErrorCode(), - MessageFormat.format( - LinkisRpcErrorCodeSummary.INSTANCE_NOT_FOUND_ERROR.getErrorDesc(), clientIp)); - } - - if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) { - ((SelectedInstanceCallback) supplier) - .selectedServiceInstance(serviceInstanceResponse.getServer()); - } - return serviceInstanceResponse; - } - - private boolean isRPC(String linkisLoadBalancerType) { - return StringUtils.isNotBlank(linkisLoadBalancerType) - && linkisLoadBalancerType.equalsIgnoreCase(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC); - } - - private Response getInstanceResponse( - List instances, String clientIp) { - if (instances.isEmpty()) { - log.warn("No servers available for service: " + serviceId); - return null; - } - int pos = this.position.incrementAndGet() & Integer.MAX_VALUE; - - if (StringUtils.isBlank(clientIp)) { - return new DefaultResponse(instances.get(pos % instances.size())); - } - String[] ipAndPort = clientIp.split(":"); - if (ipAndPort.length != 2) { - throw new NoInstanceExistsException( - LinkisRpcErrorCodeSummary.INSTANCE_ERROR.getErrorCode(), - MessageFormat.format(LinkisRpcErrorCodeSummary.INSTANCE_ERROR.getErrorDesc(), clientIp)); - } - ServiceInstance chooseInstance = null; - for (ServiceInstance instance : instances) { - if (Objects.equals(ipAndPort[0], instance.getHost()) - && Objects.equals(ipAndPort[1], String.valueOf(instance.getPort()))) { - chooseInstance = instance; - break; - } - } - if (null == chooseInstance) { - return null; - } else { - return new DefaultResponse(chooseInstance); - } - } -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala index 4faeaa180e..d835eef328 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala @@ -42,25 +42,15 @@ import feign.RetryableException class RetryableRPCInterceptor extends RPCInterceptor { override val order: Int = 20 -// private val commonRetryHandler = new RPCRetryHandler -// commonRetryHandler.setRetryInfo(new RetryableProtocol{}) -// -// private def isCommonRetryHandler(retry: RetryableProtocol): Boolean = retry.maxPeriod == commonRetryHandler.getRetryMaxPeriod && -// retry.period == commonRetryHandler.getRetryPeriod && retry.retryNum == commonRetryHandler.getRetryNum && -// (retry.retryExceptions.isEmpty || commonRetryHandler.getRetryExceptions.containsSlice(retry.retryExceptions)) - override def intercept( interceptorExchange: RPCInterceptorExchange, chain: RPCInterceptorChain ): Any = interceptorExchange.getProtocol match { case retry: RetryableProtocol => val retryName = retry.getClass.getSimpleName -// if(isCommonRetryHandler(retry)) commonRetryHandler.retry(chain.handle(interceptorExchange), retryName) -// else { val retryHandler = new RPCRetryHandler retryHandler.setRetryInfo(retry, chain) retryHandler.retry(chain.handle(interceptorExchange), retryName) -// } case _ => chain.handle(interceptorExchange) } diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala index 55068f273c..ae1070865a 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala @@ -40,6 +40,11 @@ private[rpc] class SpringMVCRPCSender private[rpc] ( override protected def createRPCInterceptorChain() = new ServiceInstanceRPCInterceptorChain(0, getRPCInterceptors, serviceInstance) + /** + * If it's a random call, you don't need to set target specify instance,need to specify target and + * do not set client setting + * @param builder + */ override protected def doBuilder(builder: Feign.Builder): Unit = { if (serviceInstance != null && StringUtils.isNotBlank(serviceInstance.getInstance)) { builder.requestInterceptor(new RequestInterceptor() { diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala index df197ddb2c..829a967aab 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala @@ -18,6 +18,7 @@ package org.apache.linkis.governance.common.protocol.job import org.apache.linkis.governance.common.entity.job.JobRequest +import org.apache.linkis.protocol.RetryableProtocol import org.apache.linkis.protocol.message.RequestProtocol import java.util @@ -25,7 +26,7 @@ import java.util.Date import scala.beans.BeanProperty -trait JobReq extends RequestProtocol +trait JobReq extends RequestProtocol with RetryableProtocol case class JobReqInsert(jobReq: JobRequest) extends JobReq diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala index 4d0b8952ca..17c01fcfc2 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/RequestTask.scala @@ -18,6 +18,7 @@ package org.apache.linkis.governance.common.protocol.task import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.protocol.RetryableProtocol import org.apache.linkis.protocol.message.RequestProtocol import java.util @@ -91,7 +92,7 @@ trait TaskState extends RequestProtocol {} case class RequestTaskPause(execId: String) extends TaskState case class RequestTaskResume(execId: String) extends TaskState -case class RequestTaskKill(execId: String) extends TaskState +case class RequestTaskKill(execId: String) extends TaskState with RetryableProtocol /** * The status of requesting job execution, mainly used for:
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala index 971bdf247b..ef1355d580 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseEngineConnPid.scala @@ -25,5 +25,4 @@ import org.apache.linkis.protocol.message.RequestProtocol * @param pid */ case class ResponseEngineConnPid(serviceInstance: ServiceInstance, pid: String, ticketId: String) - extends RetryableProtocol - with RequestProtocol + extends RequestProtocol diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala index b136c61099..a4a7837da0 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala @@ -30,8 +30,7 @@ case class ResponseTaskProgress( execId: String, progress: Float, progressInfo: Array[JobProgressInfo] -) extends RetryableProtocol - with RequestProtocol +) extends RequestProtocol case class ResponseEngineLock(lock: String) @@ -67,9 +66,7 @@ case class ResponseEngineStatus( engineInfo: ResponseEngineInfo ) -case class ResponseTaskLog(execId: String, log: String) - extends RetryableProtocol - with RequestProtocol +case class ResponseTaskLog(execId: String, log: String) extends RequestProtocol case class ResponseTaskError(execId: String, errorMsg: String) extends RetryableProtocol diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala index 6b4fc64fe6..9dba95ef66 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala @@ -154,8 +154,7 @@ class TaskExecutionServiceImpl sender = Sender.getSender(task.getCallbackServiceInstance()) sender.send(msg) } else { - // todo - logger.debug("SendtoEntrance error, cannot find entrance instance.") + logger.warn("SendtoEntrance error, cannot find entrance instance.") } } { t => val errorMsg = s"SendToEntrance error. $msg" + t.getCause diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala index 3b3005fee6..8bcc79b410 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/resource/EngineResourceRequest.scala @@ -22,7 +22,7 @@ import org.apache.linkis.protocol.message.RequestProtocol import java.util -trait EngineResourceRequest extends RequestProtocol { +trait EngineResourceRequest { val user: String val labels: util.List[Label[_]] val properties: util.Map[String, String] diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/GatewayAuthorizationFilter.java b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/GatewayAuthorizationFilter.java index 79371f8539..c09aed0e8a 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/GatewayAuthorizationFilter.java +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/GatewayAuthorizationFilter.java @@ -27,6 +27,7 @@ import org.apache.linkis.gateway.security.LinkisPreFilter$; import org.apache.linkis.gateway.security.SecurityFilter; import org.apache.linkis.gateway.springcloud.SpringCloudGatewayConfiguration; +import org.apache.linkis.rpc.constant.RpcConstant; import org.apache.linkis.server.Message; import org.apache.commons.lang3.StringUtils; @@ -131,7 +132,10 @@ private Route getRealRoute( } String uri = scheme + serviceInstance.getApplicationName(); if (StringUtils.isNotBlank(serviceInstance.getInstance())) { - exchange.getRequest().mutate().header("FIXED_INSTANCE", serviceInstance.getInstance()); + exchange + .getRequest() + .mutate() + .header(RpcConstant.FIXED_INSTANCE, serviceInstance.getInstance()); } return Route.async() .id(route.getId()) diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java index 6300cbce5b..206b31ccf5 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/IpPriorityLoadBalancer.java @@ -17,6 +17,8 @@ package org.apache.linkis.gateway.springcloud.http; +import org.apache.linkis.rpc.constant.RpcConstant; + import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -56,7 +58,7 @@ public Mono> choose(Request request) { ((RequestDataContext) request.getContext()) .getClientRequest() .getHeaders() - .get("client-ip"); + .get(RpcConstant.FIXED_INSTANCE); String clientIp = CollectionUtils.isNotEmpty(clientIpList) ? clientIpList.get(0) : null; ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);