Skip to content

Commit

Permalink
Merge branch 'dev-1.10.0-webank-merge' into dev-1.10.0-bug-fix
Browse files Browse the repository at this point in the history
# Conflicts:
#	linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala
  • Loading branch information
“v_kkhuang” committed Dec 16, 2024
2 parents 804adfa + 84df21e commit 1041452
Show file tree
Hide file tree
Showing 30 changed files with 378 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object DataType extends Logging {
case LongType | BigIntType => if (isNumberNull(newValue)) null else newValue.toLong
case FloatType => if (isNumberNull(newValue)) null else newValue.toFloat
case DoubleType => if (isNumberNull(newValue)) null else newValue.toDouble
case DecimalType(dataType, 3) =>
case DecimalType(_, _) =>
if (isNumberNull(newValue)) null else new JavaBigDecimal(newValue)
case DateType => if (isNumberNull(newValue)) null else Date.valueOf(newValue)
case TimestampType =>
Expand Down Expand Up @@ -150,11 +150,13 @@ case object ArrayType extends DataType("array", 2003)
case object MapType extends DataType("map", 2000)
case object ListType extends DataType("list", 2001)
case object StructType extends DataType("struct", 2002)
case object BigDecimalType extends DataType("bigdecimal", 3)

case class DecimalType(override val typeName: String, override val javaSQLType: Int)
extends DataType(typeName, javaSQLType)

case class BigDecimalType(override val typeName: String, override val javaSQLType: Int)
extends DataType(typeName, javaSQLType)

case class Column(columnName: String, dataType: DataType, comment: String) {

def toArray: Array[Any] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ class StorageExcelWriter(
case VarcharType => style.setDataFormat(format.getFormat("@"))
case DateType => style.setDataFormat(format.getFormat("m/d/yy h:mm"))
case TimestampType => style.setDataFormat(format.getFormat("m/d/yy h:mm"))
case DecimalType(dataType, 3) => style.setDataFormat(format.getFormat("#.000000000"))
case BigDecimalType => style.setDataFormat(format.getFormat("#.000000000"))
case DecimalType(_, _) => style.setDataFormat(format.getFormat("#.000000000"))
case BigDecimalType(_, _) => style.setDataFormat(format.getFormat("#.000000000"))
case _ => style.setDataFormat(format.getFormat("@"))
}
}
Expand Down Expand Up @@ -171,10 +171,10 @@ class StorageExcelWriter(
case VarcharType => cell.setCellValue(DataType.valueToString(elem))
case DateType => cell.setCellValue(getDate(elem))
case TimestampType => cell.setCellValue(getDate(elem))
case DecimalType(dataType, 3) =>
case DecimalType(_, _) =>
doubleCheck(DataType.valueToString(elem))
cell.setCellValue(DataType.valueToString(elem).toDouble)
case BigDecimalType =>
case BigDecimalType(_, _) =>
doubleCheck(DataType.valueToString(elem))
cell.setCellValue(DataType.valueToString(elem).toDouble)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
import org.apache.linkis.ujes.client.request.JobExecuteAction.{EngineType, RunType}
import org.apache.linkis.ujes.client.response.ResultSetResult

import java.math.{BigDecimal => JavaBigDecimal}
import java.util
import java.util.Locale

Expand All @@ -29,6 +30,7 @@ import com.google.gson.{Gson, JsonObject}
object UJESClientUtils {

val gson: Gson = new Gson()
val DECIMAL_REGEX = "^decimal\\(\\s*\\d*\\s*,\\s*\\d*\\s*\\)".r.unanchored

def toEngineType(engineType: String): EngineType = engineType match {
case "spark" => EngineType.SPARK
Expand Down Expand Up @@ -75,7 +77,7 @@ object UJESClientUtils {
case "boolean" => value.toBoolean
case "byte" => value.toByte
case "bigint" => value.toLong
case "decimal" => value.toDouble
case "decimal" | DECIMAL_REGEX() => new JavaBigDecimal(value)
case "array" => gson.fromJson(value, classOf[util.ArrayList[Object]])
case "map" => gson.fromJson(value, classOf[util.HashMap[Object, Object]])
case "struct" => gson.fromJson(value, classOf[JsonObject])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object ComputationExecutorConf {

val PRINT_TASK_PARAMS_SKIP_KEYS = CommonVars(
"linkis.engineconn.print.task.params.skip.keys",
"jobId",
"jobId,wds.linkis.rm.yarnqueue",
"skip to print params key at job logs"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,13 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)

def printTaskParamsLog(engineExecutorContext: EngineExecutionContext): Unit = {
val sb = new StringBuilder

EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) =>
// skip log jobId because it corresponding jobid when the ec created
if (!ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue.contains(key)) {
if (
!ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue
.split(",")
.exists(_.equals(key))
) {
sb.append(s"${key}=${value}\n")
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class AMConfiguration {
CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "trino,appconn,io_file,jdbc");
public static final CommonVars<String> MULTI_USER_ENGINE_USER =
CommonVars.apply("wds.linkis.multi.user.engine.user", getDefaultMultiEngineUser());
public static final String UDF_KILL_ENGINE_TYPE = CommonVars.apply("linkis.udf.kill.engine.type", "spark,hive").getValue();

public static final CommonVars<Integer> ENGINE_LOCKER_MAX_TIME =
CommonVars.apply("wds.linkis.manager.am.engine.locker.max.time", 1000 * 60 * 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ public Message executeEngineConnOperation(HttpServletRequest req, @RequestBody J

@ApiOperation(
value = "kill egineconns of a ecm",
notes = "Kill engine after updating configuration",
notes = "Kill engine by cteator or engineType",
response = Message.class)
@ApiImplicitParams({
@ApiImplicitParam(name = "creator", dataType = "String", required = true, example = "IDE"),
Expand All @@ -748,7 +748,7 @@ public Message executeEngineConnOperation(HttpServletRequest req, @RequestBody J
example = "hive-2.3.3"),
})
@ApiOperationSupport(ignoreParameters = {"param"})
@RequestMapping(path = "/rm/killEngineByUpdateConfig", method = RequestMethod.POST)
@RequestMapping(path = "/rm/killEngineByCreatorEngineType", method = RequestMethod.POST)
public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody JsonNode jsonNode)
throws AMErrorException {
String userName = ModuleUserUtils.getOperationUser(req);
Expand All @@ -770,10 +770,18 @@ public Message killEngineByUpdateConfig(HttpServletRequest req, @RequestBody Jso
&& AMConfiguration.isUnAllowKilledEngineType(engineType)) {
return Message.error("multi user engine does not support this feature(多用户引擎不支持此功能)");
}
engineStopService.stopUnlockECByUserCreatorAndECType(userName, creatorStr, engineType);
if (engineType.equals(Configuration.GLOBAL_CONF_SYMBOL())) {
Arrays.stream(AMConfiguration.UDF_KILL_ENGINE_TYPE.split(","))
.forEach(
engine ->
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, creatorStr, engine));
} else {
engineStopService.stopUnlockECByUserCreatorAndECType(
userName, creatorStr, engineType);
}
return Message.ok("Kill engineConn succeed");
}

static ServiceInstance getServiceInstance(JsonNode jsonNode) throws AMErrorException {
String applicationName = jsonNode.get("applicationName").asText();
String instance = jsonNode.get("instance").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.manager.rm.external.yarn;

import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.ResourceType;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class YarnResourceRequester implements ExternalResourceRequester {
private final String HASTATE_ACTIVE = "active";
private static final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, String> rmAddressMap = new ConcurrentHashMap<>();
private final String queuePrefix = EngineConnPluginConfiguration.QUEUE_PREFIX().getValue();

private static final HttpClient httpClient = HttpClients.createDefault();

Expand All @@ -74,7 +76,11 @@ public NodeResource requestResourceInfo(
logger.info("rmWebAddress: " + rmWebAddress);

String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
String realQueueName = "root." + queueName;
if (queueName.startsWith(queuePrefix)) {
logger.info("Queue name {} starts with '{}', remove '{}'", queueName, queuePrefix, queuePrefix);
queueName = queueName.substring(queuePrefix.length());
}
String realQueueName = queuePrefix + queueName;

try {
YarnQueueInfo resources = getResources(rmWebAddress, realQueueName, queueName, provider);
Expand Down Expand Up @@ -301,7 +307,7 @@ public List<ExternalAppInfo> requestAppInfo(
String rmWebAddress = getAndUpdateActiveRmWebAddress(provider);

String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
String realQueueName = "root." + queueName;
String realQueueName = queuePrefix + queueName;

JsonNode resp = getResponseByUrl("apps", rmWebAddress, provider).path("apps").path("app");
if (resp.isMissingNode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ object EngineConnPluginConfiguration {
val EC_BML_VERSION_MAY_WITH_PREFIX_V: CommonVars[Boolean] =
CommonVars("linkis.engineconn.bml.version.may.with.prefix", true)

val QUEUE_PREFIX: CommonVars[String] =
CommonVars("wds.linkis.queue.prefix", "root.")

}
Original file line number Diff line number Diff line change
Expand Up @@ -290,24 +290,28 @@ class DefaultEngineStopService extends AbstractEngineService with EngineStopServ
dealEngineByEngineNode(engineNodes.toList, userName)
}
// kill EMnode by user creator
if (StringUtils.isNotBlank(engineType) && !creator.equals(Configuration.GLOBAL_CONF_SYMBOL)) {
val filterEngineNode = engineNodes
.filter(_.getOwner.equals(userName))
.filter(node => {
var filterResult = false
if (!node.getLabels.isEmpty) {
val userCreator = LabelUtil.getUserCreatorLabel(node.getLabels)
val engineTypeLabel = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue
if (
userCreator.getUser.equals(userName) && userCreator.getCreator
.equals(creator) && engineTypeLabel.equals(engineType)
) {
filterResult = true
}
}
filterResult
})
.toList
if (StringUtils.isNotBlank(engineType)) {
val filterEngineNode = creator match {
case Configuration.GLOBAL_CONF_SYMBOL =>
engineNodes
.filter(_.getOwner.equals(userName))
.filter(!_.getLabels.isEmpty)
.filter(node =>
LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName)
&& LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType)
)
.toList
case _ =>
engineNodes
.filter(_.getOwner.equals(userName))
.filter(!_.getLabels.isEmpty)
.filter(node =>
LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName)
&& LabelUtil.getUserCreatorLabel(node.getLabels).getCreator.equals(creator)
&& LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType)
)
.toList
}
dealEngineByEngineNode(filterEngineNode, userName)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,10 @@ object HiveEngineConfiguration {

val HIVE_RANGER_ENABLE = CommonVars[Boolean]("linkis.hive.ranger.enabled", false).getValue

val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename"

val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue"

val HIVE_TEZ_QUEUE_NAME: String = "tez.queue.name"

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ import scala.collection.JavaConverters._

class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory with Logging {

private val HIVE_QUEUE_NAME: String = "mapreduce.job.queuename"
private val BDP_QUEUE_NAME: String = "wds.linkis.rm.yarnqueue"
private val HIVE_TEZ_QUEUE_NAME: String = "tez.queue.name"

override protected def newExecutor(
id: Int,
engineCreationContext: EngineCreationContext,
Expand Down Expand Up @@ -188,10 +184,10 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w
}
.foreach { case (k, v) =>
logger.info(s"key is $k, value is $v")
if (BDP_QUEUE_NAME.equals(k)) {
hiveConf.set(HIVE_QUEUE_NAME, v)
if (HiveEngineConfiguration.BDP_QUEUE_NAME.equals(k)) {
hiveConf.set(HiveEngineConfiguration.HIVE_QUEUE_NAME, v)
if ("tez".equals(HiveEngineConfiguration.HIVE_ENGINE_TYPE)) {
hiveConf.set(HIVE_TEZ_QUEUE_NAME, v)
hiveConf.set(HiveEngineConfiguration.HIVE_TEZ_QUEUE_NAME, v)
}
} else hiveConf.set(k, v)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ class HiveEngineConcurrentConnExecutor(
engineExecutorContext.appendStdout(
s"Your hive taskId: $taskId has $numberOfJobs MR jobs to do"
)
val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME)
engineExecutorContext.appendStdout(
s"Your task will be submitted to the $queueName queue"
)
}

logger.info(s"there are ${numberOfJobs} jobs.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ class HiveEngineConnExecutor(
}
if (numberOfMRJobs > 0) {
engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do")
val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME)
engineExecutorContext.appendStdout(s"Your task will be submitted to the $queueName queue")
}
if (thread.isInterrupted) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public static String getTypeStr(int type) {
retVal = BinaryType.typeName();
break;
case Types.DECIMAL:
retVal = DecimalType.toDataType(String.valueOf(type)).toString();
retVal = new DecimalType("decimal", 3).typeName();
break;
case Types.ARRAY:
retVal = ArrayType.typeName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object DolphinToSpark {
case wds.BigIntType => LongType
case wds.FloatType => FloatType
case wds.DoubleType => DoubleType
case wds.DecimalType(dataType, 3) => DecimalType(bigDecimalPrecision, bigDecimalScale)
case wds.DecimalType(_, _) => DecimalType(bigDecimalPrecision, bigDecimalScale)
case wds.DateType => DateType
// case wds.TimestampType => TimestampType
case wds.BinaryType => BinaryType
Expand Down
Loading

0 comments on commit 1041452

Please sign in to comment.