Skip to content

Commit

Permalink
Merge pull request #630 from akki/election-scheduler
Browse files Browse the repository at this point in the history
Feature for scheduling preferred replica leader election
  • Loading branch information
patelh authored Mar 3, 2020
2 parents c7640d3 + 7f0a1f5 commit 03f2868
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 4 deletions.
92 changes: 91 additions & 1 deletion app/controllers/PreferredReplicaElection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package controllers

import features.{ApplicationFeatures, KMPreferredReplicaElectionFeature}
import features.{ApplicationFeatures, KMPreferredReplicaElectionFeature, KMScheduleLeaderElectionFeature}
import kafka.manager.ApiError
import kafka.manager.features.ClusterFeatures
import models.FollowLink
Expand All @@ -15,6 +15,7 @@ import play.api.data.Form
import play.api.data.Forms._
import play.api.data.validation.{Constraint, Invalid, Valid}
import play.api.i18n.I18nSupport
import play.api.libs.json.{JsObject, Json}
import play.api.mvc._

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -84,4 +85,93 @@ class PreferredReplicaElection (val cc: ControllerComponents, val kafkaManagerCo
)
}
}

def handleScheduledIntervalAPI(cluster: String): Action[AnyContent] = Action.async { implicit request =>
featureGate(KMScheduleLeaderElectionFeature) {
val interval = kafkaManager.pleCancellable.get(cluster).map(_._2).getOrElse(0)
Future(Ok(Json.obj("scheduledInterval" -> interval))
.withHeaders("X-Frame-Options" -> "SAMEORIGIN"))
}
}

def scheduleRunElection(c: String) = Action.async { implicit request =>
def getOrZero : (Int, String) = if(kafkaManager.pleCancellable.contains(c)){
(kafkaManager.pleCancellable(c)._2, "Scheduler is running")
}
else {
(0, "Scheduler is not running")
}
val (timePeriod, status_string) = getOrZero
kafkaManager.getTopicList(c).map { errorOrStatus =>
Ok(views.html.scheduleLeaderElection(c,errorOrStatus, status_string, timePeriod)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
}
}

def handleScheduleRunElection(c: String) = Action.async { implicit request =>
def setOrExtract : (Int, String) = if(!kafkaManager.pleCancellable.contains(c)){
kafkaManager.getTopicList(c).flatMap { errorOrTopicList =>
errorOrTopicList.fold({ e =>
Future.successful(-\/(e))
}, { topicList =>
kafkaManager.schedulePreferredLeaderElection(c, topicList.list.toSet, request.body.asFormUrlEncoded.get("timePeriod")(0).toInt)
})
}
(request.body.asFormUrlEncoded.get("timePeriod")(0).toInt, "Scheduler started")
}
else{
(kafkaManager.pleCancellable(c)._2, "Scheduler already scheduled")
}
val (timeIntervalMinutes, status_string) = setOrExtract
kafkaManager.getTopicList(c).map { errorOrStatus =>
Ok(views.html.scheduleLeaderElection(c, errorOrStatus, status_string, timeIntervalMinutes)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
}
}

def cancelScheduleRunElection(c: String) = Action.async { implicit request =>
val status_string: String = if(kafkaManager.pleCancellable.contains(c)){
kafkaManager.cancelPreferredLeaderElection(c)
"Scheduler stopped"
}
else "Scheduler already not running"
kafkaManager.getTopicList(c).map { errorOrStatus =>
Ok(views.html.scheduleLeaderElection(c,errorOrStatus,status_string, 0)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
}
}

def handleScheduleRunElectionAPI(c: String) = Action.async { implicit request =>
// ToDo: Refactor out common part from handleScheduleRunElection
featureGate(KMScheduleLeaderElectionFeature) {
def setOrExtract : (Int, String) = if(!kafkaManager.pleCancellable.contains(c)){
kafkaManager.getTopicList(c).flatMap { errorOrTopicList =>
errorOrTopicList.fold({ e =>
Future.successful(-\/(e))
}, { topicList =>
kafkaManager.schedulePreferredLeaderElection(c, topicList.list.toSet, request.body.asInstanceOf[AnyContentAsJson].json.asInstanceOf[JsObject].values.toList(0).toString().toInt)
})
}
(request.body.asInstanceOf[AnyContentAsJson].json.asInstanceOf[JsObject].values.toList(0).toString().toInt, "Scheduler started")
}
else{
(kafkaManager.pleCancellable(c)._2, "Scheduler already scheduled")
}
val (timePeriod, status_string) = setOrExtract
Future(
Ok(Json.obj(
"scheduledInterval" -> timePeriod, "message" -> status_string
)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
)
}
}

def cancelScheduleRunElectionAPI(c: String) = Action.async { implicit request =>
// ToDo: Refactor out common part from cancelScheduleRunElection
featureGate(KMScheduleLeaderElectionFeature) {
val status_string: String = if(kafkaManager.pleCancellable.contains(c)){
kafkaManager.cancelPreferredLeaderElection(c)
"Scheduler stopped"
}
else "Scheduler already not running"
Future(Ok(Json.obj("scheduledInterval" -> 0, "message" -> status_string)).withHeaders("X-Frame-Options" -> "SAMEORIGIN"))
}
}
}
1 change: 1 addition & 0 deletions app/features/ApplicationFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ sealed trait ApplicationFeature extends KMFeature
case object KMClusterManagerFeature extends ApplicationFeature
case object KMTopicManagerFeature extends ApplicationFeature
case object KMPreferredReplicaElectionFeature extends ApplicationFeature
case object KMScheduleLeaderElectionFeature extends ApplicationFeature
case object KMReassignPartitionsFeature extends ApplicationFeature
case object KMBootstrapClusterConfigFeature extends ApplicationFeature

Expand Down
74 changes: 73 additions & 1 deletion app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ package kafka.manager

import java.util.Properties
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import org.json4s.jackson.JsonMethods.parse

import akka.actor.{ActorPath, ActorSystem, Props}
import akka.actor.{ActorPath, ActorSystem, Cancellable, Props}
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -359,6 +360,54 @@ class KafkaManager(akkaConfig: Config) extends Logging {
}
}

private def runPreferredLeaderElectionWithAllTopics(clusterName: String) = {
implicit val ec = apiExecutionContext

getTopicList(clusterName).flatMap { errorOrTopicList =>
errorOrTopicList.fold({ e =>
Future.successful(-\/(e))
}, { topicList =>
runPreferredLeaderElection(clusterName, topicList.list.toSet)
})
}
}

private def updateSchedulePreferredLeaderElection(clusterName: String): Unit = {
system.actorSelection(kafkaManagerActor).ask(KMClusterCommandRequest(
clusterName,
CMSchedulePreferredLeaderElection(
pleCancellable map { case (key, value) => (key, value._2) }
)
))
}

def schedulePreferredLeaderElection(clusterName: String, topics: Set[String], timeIntervalMinutes: Int): Future[String] = {
implicit val ec = apiExecutionContext

pleCancellable += (clusterName ->
(
Some(
system.scheduler.schedule(0 seconds, Duration(timeIntervalMinutes, TimeUnit.MINUTES)) {
runPreferredLeaderElectionWithAllTopics(clusterName)
}
),
timeIntervalMinutes
)
)
updateSchedulePreferredLeaderElection(clusterName)

Future("Scheduler started")
}

def cancelPreferredLeaderElection(clusterName: String): Future[String] = {
implicit val ec = apiExecutionContext

pleCancellable(clusterName)._1.map(_.cancel())
pleCancellable -= clusterName
updateSchedulePreferredLeaderElection(clusterName)
Future("Scheduler stopped")
}

def manualPartitionAssignments(clusterName: String,
assignments: List[(String, List[(Int, List[Int])])]) = {
implicit val ec = apiExecutionContext
Expand Down Expand Up @@ -910,4 +959,27 @@ class KafkaManager(akkaConfig: Config) extends Logging {
)
}
}

def initialiseSchedulePreferredLeaderElection(): Unit = {
implicit val ec = apiExecutionContext
implicit val formats = org.json4s.DefaultFormats

var temp: Map[String, Int] = Map.empty
val x = system.actorSelection(kafkaManagerActor).ask(KSGetScheduleLeaderElection)
x.foreach { schedule =>
temp = parse(schedule.toString).extract[Map[String, Int]]
for ((cluster, timeInterval) <- temp) {
schedulePreferredLeaderElection(cluster, Set(), timeInterval)
}
}
}

/* Contains a key for each cluster (by its name) which has preferred leader election scheduled
* Value of each key is a 2-tuple where
* * first element is the scheduler's cancellable object - required for cancelling the schedule
* * second element is the time interval for scheduling (in minutes) - required for storing in ZK
*/
var pleCancellable : Map[String, (Option[Cancellable], Int)] = Map.empty
initialiseSchedulePreferredLeaderElection()

}
4 changes: 4 additions & 0 deletions app/kafka/manager/actor/KafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import kafka.manager.actor.cluster.{ClusterManagerActor, ClusterManagerActorConf
import kafka.manager.base.{LongRunningPoolConfig, BaseZkPath, CuratorAwareActor, BaseQueryCommandActor}
import kafka.manager.model.{ClusterTuning, ClusterConfig, CuratorConfig}
import kafka.manager.model.ActorModel.CMShutdown
import kafka.manager.utils.ZkUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener}
Expand Down Expand Up @@ -229,6 +230,9 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
case KMGetAllClusters =>
sender ! KMClusterList(clusterConfigMap.values.toIndexedSeq, pendingClusterConfigMap.values.toIndexedSeq)

case KSGetScheduleLeaderElection =>
sender ! ZkUtils.readDataMaybeNull(curator, ZkUtils.SchedulePreferredLeaderElectionPath)._1.getOrElse("{}")

case KMGetClusterConfig(name) =>
sender ! KMClusterConfigResult(Try {
val cc = clusterConfigMap.get(name)
Expand Down
12 changes: 12 additions & 0 deletions app/kafka/manager/actor/cluster/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import kafka.manager.features.{ClusterFeatures, KMJMXMetricsFeature, KMLogKafkaF
import kafka.manager.logkafka._
import kafka.manager.model.{ClusterConfig, ClusterContext, CuratorConfig}
import kafka.manager.utils.AdminUtils
import kafka.manager.utils.zero81.SchedulePreferredLeaderElectionCommand
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.PathChildrenCache
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
Expand Down Expand Up @@ -352,6 +353,13 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}
}
}

private def writeScheduleLeaderElectionToZk(schedule: Map[String, Int]) = {
implicit val ec = longRunningExecutionContext

log.info("Updating schedule for preferred leader election")
SchedulePreferredLeaderElectionCommand.writeScheduleLeaderElectionData(curator, schedule)
}

implicit private def toTryClusterContext(t: Try[Unit]) : Try[ClusterContext] = {
t.map(_ => clusterContext)
Expand Down Expand Up @@ -521,6 +529,10 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}
} pipeTo sender()

case CMSchedulePreferredLeaderElection(schedule) =>
implicit val ec = longRunningExecutionContext
writeScheduleLeaderElectionToZk(schedule)

case CMRunReassignPartition(topics, forceSet) =>
implicit val ec = longRunningExecutionContext
val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
Expand Down
2 changes: 2 additions & 0 deletions app/kafka/manager/model/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ object ActorModel {
case class CMUpdateTopicConfig(topic: String, config: Properties, readVersion: Int) extends CommandRequest
case class CMDeleteTopic(topic: String) extends CommandRequest
case class CMRunPreferredLeaderElection(topics: Set[String]) extends CommandRequest
case class CMSchedulePreferredLeaderElection(schedule: Map[String, Int]) extends CommandRequest
case class CMRunReassignPartition(topics: Set[String], forceSet: Set[ForceReassignmentCommand]) extends CommandRequest
case class CMGeneratePartitionAssignments(topics: Set[String], brokers: Set[Int], replicationFactor: Option[Int] = None) extends CommandRequest
case class CMManualPartitionAssignments(assignments: List[(String, List[(Int, List[Int])])]) extends CommandRequest
Expand Down Expand Up @@ -168,6 +169,7 @@ object ActorModel {
case object KSGetTopicsLastUpdateMillis extends KSRequest
case object KSGetPreferredLeaderElection extends KSRequest
case object KSGetReassignPartition extends KSRequest
case object KSGetScheduleLeaderElection extends KSRequest
case class KSEndPreferredLeaderElection(millis: Long) extends CommandRequest
case class KSUpdatePreferredLeaderElection(millis: Long, json: String) extends CommandRequest
case class KSEndReassignPartition(millis: Long) extends CommandRequest
Expand Down
1 change: 1 addition & 0 deletions app/kafka/manager/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object ZkUtils {
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val AdminPath = "/admin"
val SchedulePreferredLeaderElectionPath = AdminPath + "/schedule_leader_election"

def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.manager.utils.zero81

import grizzled.slf4j.Logging
import kafka.manager.utils._
import org.apache.curator.framework.CuratorFramework

object SchedulePreferredLeaderElectionCommand extends Logging {

def writeScheduleLeaderElectionData(curator: CuratorFramework, schedule: Map[String, Int]): Unit = {
val jsonData = toJson(schedule)

ZkUtils.updatePersistentPath(curator, ZkUtils.SchedulePreferredLeaderElectionPath, jsonData)
logger.info("Updating ZK scheduler with %s".format(jsonData))
}
}
3 changes: 3 additions & 0 deletions app/models/navigation/BreadCrumbs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ object BreadCrumbs {
"Preferred Replica Election" -> IndexedSeq(
"Clusters".baseRouteBreadCrumb,
BCDynamicNamedLink(identity,"Summary".clusterRoute)),
"Schedule Leader Election" -> IndexedSeq(
"Clusters".baseRouteBreadCrumb,
BCDynamicNamedLink(identity,"Summary".clusterRoute)),
"Reassign Partitions" -> IndexedSeq(
"Clusters".baseRouteBreadCrumb,
BCDynamicNamedLink(identity,"Summary".clusterRoute)),
Expand Down
9 changes: 8 additions & 1 deletion app/models/navigation/Menus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package models.navigation

import features.{KMTopicManagerFeature, KMClusterManagerFeature, KMPreferredReplicaElectionFeature, KMReassignPartitionsFeature, ApplicationFeatures}
import features.{KMTopicManagerFeature, KMClusterManagerFeature, KMPreferredReplicaElectionFeature, KMScheduleLeaderElectionFeature, KMReassignPartitionsFeature, ApplicationFeatures}
import kafka.manager.features.{KMLogKafkaFeature, ClusterFeatures}

/**
Expand Down Expand Up @@ -50,6 +50,12 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) {
} else None
}

private[this] def scheduleLeaderElectionMenu(cluster: String) : Option[Menu] = {
if (applicationFeatures.features(KMScheduleLeaderElectionFeature)) {
Option("Schedule Leader Election".clusterMenu(cluster))
} else None
}

private[this] def reassignPartitionsMenu(cluster: String) : Option[Menu] = {
if (applicationFeatures.features(KMReassignPartitionsFeature)) {
Option("Reassign Partitions".clusterMenu(cluster))
Expand Down Expand Up @@ -77,6 +83,7 @@ class Menus(implicit applicationFeatures: ApplicationFeatures) {
brokersMenu(cluster),
topicMenu(cluster),
preferredReplicaElectionMenu(cluster),
scheduleLeaderElectionMenu(cluster),
reassignPartitionsMenu(cluster),
consumersMenu(cluster),
logKafkaMenu(cluster, clusterFeatures)
Expand Down
1 change: 1 addition & 0 deletions app/models/navigation/QuickRoutes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object QuickRoutes {
"List" -> controllers.routes.Topic.topics,
"Create" -> controllers.routes.Topic.createTopic,
"Preferred Replica Election" -> controllers.routes.PreferredReplicaElection.preferredReplicaElection,
"Schedule Leader Election" -> controllers.routes.PreferredReplicaElection.scheduleRunElection,
"Reassign Partitions" -> controllers.routes.ReassignPartitions.reassignPartitions,
"Logkafkas" -> controllers.routes.Logkafka.logkafkas,
"List Logkafka" -> controllers.routes.Logkafka.logkafkas,
Expand Down
Loading

0 comments on commit 03f2868

Please sign in to comment.