Skip to content

Commit

Permalink
Relieve memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Mar 19, 2024
1 parent f1d88fc commit b3beb34
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ object SchedulerConfiguration {
CommonVars("wds.linkis.fifo.consumer.auto.clear.enabled", true)

val FIFO_CONSUMER_MAX_IDLE_TIME =
CommonVars("wds.linkis.fifo.consumer.max.idle.time", new TimeType("1h")).getValue.toLong
CommonVars("wds.linkis.fifo.consumer.max.idle.time", new TimeType("10m")).getValue.toLong

val FIFO_CONSUMER_IDLE_SCAN_INTERVAL =
CommonVars("wds.linkis.fifo.consumer.idle.scan.interval", new TimeType("2h"))
CommonVars("wds.linkis.fifo.consumer.idle.scan.interval", new TimeType("30m"))

val FIFO_CONSUMER_IDLE_SCAN_INIT_TIME =
CommonVars("wds.linkis.fifo.consumer.idle.scan.init.time", new TimeType("1s"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,6 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
case _ =>
jobDaemon.foreach(_.kill())
jobListener.foreach(_.onJobCompleted(this))
// if(getJobInfo != null) logListener.foreach(_.onLogUpdate(this, getJobInfo.getMetric))
logListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("job is completed.")))
// TODO job end event
}

protected def transitionCompleted(executeCompleted: CompletedExecuteResponse): Unit = {
Expand Down Expand Up @@ -351,6 +348,16 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
}

override def toString: String = if (StringUtils.isNotBlank(getName)) getName else getId

/**
* clear job memory
*/
def clear(): Unit = {
logger.info(s" clear job base info $getId")
this.executor = null
this.jobDaemon = null
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ class FIFOUserConsumer(
case _ =>
}
}
// clear cache
queue.clearAll()

this.runningJobs.foreach { job =>
if (job != null && !job.isCompleted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)

override def destroyConsumer(groupName: String): Unit =
consumerGroupMap.get(groupName).foreach { tmpConsumer =>
tmpConsumer.shutdown()
consumerGroupMap.remove(groupName)
Utils.tryAndWarn(tmpConsumer.shutdown())
Utils.tryAndWarn(consumerGroupMap.remove(groupName))
consumerListener.foreach(_.onConsumerDestroyed(tmpConsumer))
logger.warn(s"Consumer of group ($groupName) in $schedulerName is destroyed.")
}
Expand Down

0 comments on commit b3beb34

Please sign in to comment.