Skip to content

Commit

Permalink
merge dev-1.1.16-webank
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexkun committed Oct 27, 2023
2 parents e4780b5 + 05ed141 commit 5cc1cc6
Show file tree
Hide file tree
Showing 48 changed files with 916 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,19 +232,20 @@ private[conf] object BDPConfiguration extends Logging {

private[common] def formatValue[T](defaultValue: T, value: Option[String]): Option[T] = {
if (value.isEmpty || value.exists(StringUtils.isEmpty)) return Option(defaultValue)
val trimValue = value.map(_.trim)
val formattedValue = defaultValue match {
case _: String => value
case _: Byte => value.map(_.toByte)
case _: Short => value.map(_.toShort)
case _: Char => value.map(_.toCharArray.apply(0))
case _: Int => value.map(_.toInt)
case _: Long => value.map(_.toLong)
case _: Float => value.map(_.toFloat)
case _: Double => value.map(_.toDouble)
case _: Boolean => value.map(_.toBoolean)
case _: TimeType => value.map(new TimeType(_))
case _: ByteType => value.map(new ByteType(_))
case null => value
case _: String => trimValue
case _: Byte => trimValue.map(_.toByte)
case _: Short => trimValue.map(_.toShort)
case _: Char => trimValue.map(_.toCharArray.apply(0))
case _: Int => trimValue.map(_.toInt)
case _: Long => trimValue.map(_.toLong)
case _: Float => trimValue.map(_.toFloat)
case _: Double => trimValue.map(_.toDouble)
case _: Boolean => trimValue.map(_.toBoolean)
case _: TimeType => trimValue.map(new TimeType(_))
case _: ByteType => trimValue.map(new ByteType(_))
case null => trimValue
}
formattedValue.asInstanceOf[Option[T]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object LogUtils {
}

def generateERROR(rawLog: String): String = {
getTimeFormat + " " + "ERROR" + " " + rawLog
getTimeFormat + " " + ERROR_STR + " " + rawLog
}

def generateWarn(rawLog: String): String = {
Expand All @@ -52,4 +52,6 @@ object LogUtils {
getTimeFormat + " " + "SYSTEM-WARN" + " " + rawLog
}

val ERROR_STR = "ERROR"

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@
# limitations under the License.
#

linkis.jobhistory.error.msg.tip=properties支持中文
linkis.jobhistory.error.msg.tip=properties支持中文
linkis.test.error.conf=123
linkis.test.error.conf2= 456
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,11 @@ class ConfigurationTest {
Assertions.assertFalse(Configuration.isAdmin("HaDooop"))
}

@Test private[conf] def testFormatValue(): Unit = {
val confvalue = CommonVars[Int]("linkis.test.error.conf", 456).getValue
val confvalue2 = CommonVars[Int]("linkis.test.error.conf2", 789).getValue
Assertions.assertTrue(123 == confvalue)
Assertions.assertTrue(456 == confvalue2)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ object TaskUtils {
}
} else params.put(key, waitToAdd)

private def clearMap(params: util.Map[String, AnyRef], key: String): Unit =
if (params != null && params.containsKey(key)) {
params.get(key) match {
case map: util.Map[String, AnyRef] => map.clear()
case _ => params.put(key, new util.HashMap[String, AnyRef]())
}
}

private def getConfigurationMap(
params: util.Map[String, AnyRef],
key: String
Expand Down Expand Up @@ -84,13 +92,20 @@ object TaskUtils {
def addStartupMap(params: util.Map[String, AnyRef], startupMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, startupMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)

def clearStartupMap(params: util.Map[String, AnyRef]): Unit = {
val configurationMap = getMap(params, TaskConstant.PARAMS_CONFIGURATION)
if (!configurationMap.isEmpty) {
clearMap(configurationMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)
}
}

def addRuntimeMap(params: util.Map[String, AnyRef], runtimeMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, runtimeMap, TaskConstant.PARAMS_CONFIGURATION_RUNTIME)

def addSpecialMap(params: util.Map[String, AnyRef], specialMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, specialMap, TaskConstant.PARAMS_CONFIGURATION_SPECIAL)

// tdoo
// todo
def getLabelsMap(params: util.Map[String, AnyRef]): util.Map[String, AnyRef] =
getMap(params, TaskConstant.LABELS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object FileSystemUtils extends Logging {
}(Utils.tryQuietly(fileSystem.close()))
}

@deprecated("please use createNewFileAndSetOwnerWithFileSystem")
def createNewFileWithFileSystem(
fileSystem: FileSystem,
filePath: FsPath,
Expand All @@ -82,6 +83,31 @@ object FileSystemUtils extends Logging {
}
}

/**
* create new file and set file owner by FileSystem
* @param fileSystem
* @param filePath
* @param user
* @param createParentWhenNotExists
*/
def createNewFileAndSetOwnerWithFileSystem(
fileSystem: FileSystem,
filePath: FsPath,
user: String,
createParentWhenNotExists: Boolean
): Unit = {
if (!fileSystem.exists(filePath)) {
if (!fileSystem.exists(filePath.getParent)) {
if (!createParentWhenNotExists) {
throw new IOException("parent dir " + filePath.getParent.getPath + " dose not exists.")
}
mkdirsAndSetOwner(fileSystem, filePath.getParent, user)
}
fileSystem.createNewFile(filePath)
fileSystem.setOwner(filePath, user)
}
}

/**
* Recursively create a directory(递归创建目录)
* @param fileSystem
Expand All @@ -91,6 +117,7 @@ object FileSystemUtils extends Logging {
* @return
*/
@throws[IOException]
@deprecated("please use mkdirsAndSetOwner")
def mkdirs(fileSystem: FileSystem, dest: FsPath, user: String): Boolean = {
var parentPath = dest.getParent
val dirsToMake = new util.Stack[FsPath]()
Expand All @@ -113,4 +140,32 @@ object FileSystemUtils extends Logging {
true
}

/**
* Recursively create a directory(递归创建目录) 默认添加 Owner 信息
* @param fileSystem
* @param dest
* @param user
* @throws
* @return
*/
@throws[IOException]
def mkdirsAndSetOwner(fileSystem: FileSystem, dest: FsPath, user: String): Boolean = {
var parentPath = dest.getParent
val dirsToMake = new util.Stack[FsPath]()
dirsToMake.push(dest)
while (!fileSystem.exists(parentPath)) {
dirsToMake.push(parentPath)
parentPath = parentPath.getParent
}
if (!fileSystem.canExecute(parentPath)) {
throw new IOException("You have not permission to access path " + dest.getPath)
}
while (!dirsToMake.empty()) {
val path = dirsToMake.pop()
fileSystem.mkdir(path)
fileSystem.setOwner(path, user)
}
true
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ public JobResult run() {
LinkisOperResultAdapter jobInfoResult =
oper.queryJobInfo(submitResult.getUser(), submitResult.getJobID());
oper.queryJobStatus(
jobInfoResult.getUser(), jobInfoResult.getJobID(), jobInfoResult.getStrongerExecId());
submitResult.getUser(), submitResult.getJobID(), submitResult.getStrongerExecId());
infoBuilder.setLength(0);
infoBuilder
.append("JobId:")
.append(jobInfoResult.getJobID())
.append(submitResult.getJobID())
.append(System.lineSeparator())
.append("TaskId:")
.append(jobInfoResult.getJobID())
.append(submitResult.getJobID())
.append(System.lineSeparator())
.append("ExecId:")
.append(jobInfoResult.getStrongerExecId());
.append(submitResult.getStrongerExecId());
LoggerManager.getPlaintTextLogger().info(infoBuilder.toString());
infoBuilder.setLength(0);

Expand Down Expand Up @@ -137,7 +137,7 @@ public JobResult run() {
logRetriever.retrieveLogAsync();

// wait complete
jobInfoResult = waitJobComplete(submitResult.getUser(), submitResult.getJobID());
jobInfoResult = waitJobComplete(submitResult.getUser(), submitResult.getJobID(), submitResult.getStrongerExecId());
logRetriever.waitIncLogComplete();

// get result-set
Expand Down Expand Up @@ -205,20 +205,20 @@ private JobResult getResult(
return result;
}

private LinkisOperResultAdapter waitJobComplete(String user, String jobId)
private LinkisOperResultAdapter waitJobComplete(String user, String jobId, String execId)
throws LinkisClientRuntimeException {
int retryCnt = 0;
final int MAX_RETRY = 30;

LinkisOperResultAdapter jobInfoResult = oper.queryJobInfo(user, jobId);
oper.queryJobStatus(user, jobId, jobInfoResult.getStrongerExecId());
oper.queryJobStatus(user, jobId, execId);

while (!jobInfoResult.getJobStatus().isJobFinishedState()) {
// query progress
try {
jobInfoResult = oper.queryJobInfo(user, jobId);
oper.queryJobStatus(
jobInfoResult.getUser(), jobInfoResult.getJobID(), jobInfoResult.getStrongerExecId());
jobInfoResult.getUser(), jobInfoResult.getJobID(), execId);
} catch (Exception e) {
logger.warn("", e);
retryCnt++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public LinkisOperResultAdapter submit(InteractiveJobDesc jobDesc)
// jobExecuteResult = client.execute(jobExecuteAction);

jobSubmitResult = client.submit(jobSubmitAction);
logger.info("Response info from Linkis: \n{}", CliUtils.GSON.toJson(jobSubmitAction));
logger.info("Response info from Linkis: \n{}", CliUtils.GSON.toJson(jobSubmitResult));

} catch (Exception e) {
// must throw if exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public String getStrongerExecId() {
return null;
}
String execId = null;

if (result instanceof JobSubmitResult) {
execId = ((JobSubmitResult) result).getExecID();
}
if (result instanceof JobInfoResult) {
if (result != null
&& ((JobInfoResult) result).getTask() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ public class TemplateConfRequest implements RequestProtocol {

private String templateUuid;

private String templateName;

public TemplateConfRequest(String templateUuid, String templateName) {
this.templateUuid = templateUuid;
this.templateName = templateName;
}

public TemplateConfRequest(String templateUuid) {
this.templateUuid = templateUuid;
}
Expand All @@ -34,4 +41,12 @@ public String getTemplateUuid() {
public void setTemplateUuid(String templateUuid) {
this.templateUuid = templateUuid;
}

public String getTemplateName() {
return templateName;
}

public void setTemplateName(String templateName) {
this.templateName = templateName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object ECMConfiguration {

val ECM_PROTECTED_MEMORY: Long = CommonVars[Long](
"wds.linkis.ecm.protected.memory",
ByteTimeUtils.byteStringAsBytes("4g")
ByteTimeUtils.byteStringAsBytes("10g")
).getValue

val ECM_PROTECTED_CPU_LOAD: Double =
Expand All @@ -80,7 +80,7 @@ object ECMConfiguration {
GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue

val ECM_HEALTH_REPORT_PERIOD: Long =
CommonVars("wds.linkis.ecm.health.report.period", 30).getValue
CommonVars("wds.linkis.ecm.health.report.period", 10).getValue

val ECM_HEALTH_REPORT_DELAY: Long =
CommonVars("wds.linkis.ecm.health.report.delay", 10).getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ object ECMUtils extends Logging {
// if enable estimate actual memory
if (ECM_STIMATE_ACTUAL_MEMORY_ENABLE) {

// 90%
val totalByte = (HardwareUtils.getMaxMemory() * 0.9).asInstanceOf[Long]
val totalByte = HardwareUtils.getMaxMemory()

val resultMemory = math.max(totalByte, ECM_PROTECTED_MEMORY)
// max of PhysicalMemory or ECM_PROTECTED_MEMORY
Expand Down
Loading

0 comments on commit 5cc1cc6

Please sign in to comment.