From ea00bdcd2d3656afdac03941512c171aca7ce07e Mon Sep 17 00:00:00 2001 From: James Oliver Date: Thu, 2 Jul 2015 10:21:31 -0700 Subject: [PATCH 01/10] dockerize initial commit --- Dockerfile | 34 ++++++++++++++++++++++++++++++++++ kafka-manager | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 Dockerfile create mode 100755 kafka-manager diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..2007a548c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM java:7 + +ENV app /kafka-manager +WORKDIR ${app} + +COPY sbt $app/sbt +COPY build.sbt $app/build.sbt +COPY app $app/app +COPY conf $app/conf +COPY img $app/img +COPY project/build.properties $app/project/build.properties +COPY project/plugins.sbt $app/project/plugins.sbt +COPY public $app/public +COPY src $app/src +COPY test $app/test + +RUN ./sbt assembly +EXPOSE 9000 +CMD [ "./sbt", "run" ] diff --git a/kafka-manager b/kafka-manager new file mode 100755 index 000000000..2dfa3ab1e --- /dev/null +++ b/kafka-manager @@ -0,0 +1,32 @@ +#!/bin/bash -Eu +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO: detect our OS +if [ $(boot2docker status) != 'running' ]; then + echo 'Starting boot2docker VM...' + boot2docker up +fi + +echo 'Building docker container' +docker build -t stealthly/kafkamanager . + +echo 'Starting docker runtime in background' +#docker run --rm -it stealthly/kafkamanager + +id=$(docker run -p 9000:9000 -t -d stealthly/kafkamanager) + +echo 'Kafka manager started on port 9000' \ No newline at end of file From ffef152763f93ecf938898bc660e605c2882aa74 Mon Sep 17 00:00:00 2001 From: James Oliver Date: Mon, 6 Jul 2015 11:12:30 -0700 Subject: [PATCH 02/10] Marathon JSON app definition --- marathon.json | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 marathon.json diff --git a/marathon.json b/marathon.json new file mode 100644 index 000000000..2eb659e66 --- /dev/null +++ b/marathon.json @@ -0,0 +1,39 @@ +{ + "id": "/kafka-manager", + "cpus": 0.5, + "mem": 256.0, + "instances": 1, + "container": { + "type": "DOCKER", + "docker": { + "image": "stealthly/kafkamanager", + "forcePullImage": true, + "network": "BRIDGE", + "portMappings": [ + { + "containerPort": 9000, + "hostPort": 0, + "protocol": "tcp" + } + ], + "privileged": false, + "parameters": [ + { "key": "tty", "value": "true" } + ] + }, + "volumes": [] + }, + "env": { + "APPLICATION_SECRET": "letmein" + }, + "healthChecks": [ + { + "protocol": "HTTP", + "portIndex": 0, + "path": "/", + "gracePeriodSeconds": 10, + "intervalSeconds": 30, + "maxConsecutiveFailures": 3 + } + ] +} \ No newline at end of file From f4cecfd707ac5c6520a8c5bb3e2f8b7402e8220a Mon Sep 17 00:00:00 2001 From: James Oliver Date: Mon, 6 Jul 2015 19:11:18 -0700 Subject: [PATCH 03/10] Increase atMost Await duration to 60 seconds to accomodate testing on slow machines --- test/kafka/manager/TestKafkaManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index 8cd04dbc2..1ea3d3fcb 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -32,7 +32,7 @@ class TestKafkaManager extends CuratorAwareTest { private[this] val kafkaManager : KafkaManager = new KafkaManager(config) - private[this] val duration = FiniteDuration(10,SECONDS) + private[this] val duration = FiniteDuration(30,SECONDS) private[this] val createTopicName = "km-unit-test" override protected def beforeAll() : Unit = { From c7177330c9f0932077a88f2cd063639f5c77ccbe Mon Sep 17 00:00:00 2001 From: James Oliver Date: Mon, 6 Jul 2015 23:12:13 -0700 Subject: [PATCH 04/10] Increase delay between test cases --- test/kafka/manager/TestKafkaManager.scala | 30 +++++++++++------------ 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index 1ea3d3fcb..25f0563b6 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -32,12 +32,12 @@ class TestKafkaManager extends CuratorAwareTest { private[this] val kafkaManager : KafkaManager = new KafkaManager(config) - private[this] val duration = FiniteDuration(30,SECONDS) + private[this] val duration = FiniteDuration(60,SECONDS) private[this] val createTopicName = "km-unit-test" override protected def beforeAll() : Unit = { super.beforeAll() - Thread.sleep(1000) + Thread.sleep(3000) } override protected def afterAll(): Unit = { @@ -56,21 +56,21 @@ class TestKafkaManager extends CuratorAwareTest { val future = kafkaManager.addCluster("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false) val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(2000) + Thread.sleep(6000) } test("create topic") { val future = kafkaManager.createTopic("dev",createTopicName,4,1) val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(2000) + Thread.sleep(6000) } test("fail to create topic again") { val future = kafkaManager.createTopic("dev",createTopicName,4,1) val result = Await.result(future,duration) assert(result.isLeft === true) - Thread.sleep(2000) + Thread.sleep(6000) } test("get topic list") { @@ -131,7 +131,7 @@ class TestKafkaManager extends CuratorAwareTest { val result = Await.result(future,duration) //TODO: this is a failure since there is nothing to do, need a better test assert(result.isLeft === true) - Thread.sleep(3000) + Thread.sleep(9000) } test("get preferred leader election") { @@ -153,7 +153,7 @@ class TestKafkaManager extends CuratorAwareTest { val future = kafkaManager.runReassignPartitions("dev",topicList.list.toSet) val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(3000) + Thread.sleep(9000) } test("get reassign partitions") { @@ -212,7 +212,7 @@ class TestKafkaManager extends CuratorAwareTest { val result2 = Await.result(future2,duration) assert(result2.isRight === true, result2) assert(result2.toOption.get.deleteSet(createTopicName),"Topic not in delete set") - Thread.sleep(2000) + Thread.sleep(6000) } test("fail to delete non-existent topic") { @@ -231,7 +231,7 @@ class TestKafkaManager extends CuratorAwareTest { assert(result2.isRight === true) assert((result2.toOption.get.pending.nonEmpty === true) || (result2.toOption.get.active.find(c => c.name == "dev").get.curatorConfig.zkConnect === testServer.getConnectString)) - Thread.sleep(3000) + Thread.sleep(9000) } test("disable cluster") { @@ -244,14 +244,14 @@ class TestKafkaManager extends CuratorAwareTest { assert(result2.isRight === true) assert((result2.toOption.get.pending.nonEmpty === true) || (result2.toOption.get.active.find(c => c.name == "dev").get.enabled === false)) - Thread.sleep(3000) + Thread.sleep(9000) } test("enable cluster") { val future = kafkaManager.enableCluster("dev") val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(3000) + Thread.sleep(9000) } test("update cluster version") { @@ -264,7 +264,7 @@ class TestKafkaManager extends CuratorAwareTest { assert(result2.isRight === true) assert((result2.toOption.get.pending.nonEmpty === true) || (result2.toOption.get.active.find(c => c.name == "dev").get.version === Kafka_0_8_1_1)) - Thread.sleep(5000) + Thread.sleep(15000) } test("delete topic not supported prior to 0.8.2.0") { @@ -272,7 +272,7 @@ class TestKafkaManager extends CuratorAwareTest { val result = Await.result(future,duration) assert(result.isLeft === true, result) assert(result.swap.toOption.get.msg.contains("not supported")) - Thread.sleep(2000) + Thread.sleep(6000) } test("delete cluster") { @@ -281,13 +281,13 @@ class TestKafkaManager extends CuratorAwareTest { val future = kafkaManager.disableCluster("dev") val result = Await.result(future, duration) assert(result.isRight === true) - Thread.sleep(3000) + Thread.sleep(9000) } val future = kafkaManager.deleteCluster("dev") val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(3000) + Thread.sleep(9000) val future2 = kafkaManager.getClusterList val result2 = Await.result(future2,duration) assert(result2.isRight === true) From 270c1ca6745024c0d256aacce683162d8d1c807b Mon Sep 17 00:00:00 2001 From: James Oliver Date: Tue, 7 Jul 2015 00:24:42 -0700 Subject: [PATCH 05/10] Revert timing changes --- test/kafka/manager/TestKafkaManager.scala | 30 +++++++++++------------ 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index 25f0563b6..8cd04dbc2 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -32,12 +32,12 @@ class TestKafkaManager extends CuratorAwareTest { private[this] val kafkaManager : KafkaManager = new KafkaManager(config) - private[this] val duration = FiniteDuration(60,SECONDS) + private[this] val duration = FiniteDuration(10,SECONDS) private[this] val createTopicName = "km-unit-test" override protected def beforeAll() : Unit = { super.beforeAll() - Thread.sleep(3000) + Thread.sleep(1000) } override protected def afterAll(): Unit = { @@ -56,21 +56,21 @@ class TestKafkaManager extends CuratorAwareTest { val future = kafkaManager.addCluster("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false) val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(6000) + Thread.sleep(2000) } test("create topic") { val future = kafkaManager.createTopic("dev",createTopicName,4,1) val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(6000) + Thread.sleep(2000) } test("fail to create topic again") { val future = kafkaManager.createTopic("dev",createTopicName,4,1) val result = Await.result(future,duration) assert(result.isLeft === true) - Thread.sleep(6000) + Thread.sleep(2000) } test("get topic list") { @@ -131,7 +131,7 @@ class TestKafkaManager extends CuratorAwareTest { val result = Await.result(future,duration) //TODO: this is a failure since there is nothing to do, need a better test assert(result.isLeft === true) - Thread.sleep(9000) + Thread.sleep(3000) } test("get preferred leader election") { @@ -153,7 +153,7 @@ class TestKafkaManager extends CuratorAwareTest { val future = kafkaManager.runReassignPartitions("dev",topicList.list.toSet) val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(9000) + Thread.sleep(3000) } test("get reassign partitions") { @@ -212,7 +212,7 @@ class TestKafkaManager extends CuratorAwareTest { val result2 = Await.result(future2,duration) assert(result2.isRight === true, result2) assert(result2.toOption.get.deleteSet(createTopicName),"Topic not in delete set") - Thread.sleep(6000) + Thread.sleep(2000) } test("fail to delete non-existent topic") { @@ -231,7 +231,7 @@ class TestKafkaManager extends CuratorAwareTest { assert(result2.isRight === true) assert((result2.toOption.get.pending.nonEmpty === true) || (result2.toOption.get.active.find(c => c.name == "dev").get.curatorConfig.zkConnect === testServer.getConnectString)) - Thread.sleep(9000) + Thread.sleep(3000) } test("disable cluster") { @@ -244,14 +244,14 @@ class TestKafkaManager extends CuratorAwareTest { assert(result2.isRight === true) assert((result2.toOption.get.pending.nonEmpty === true) || (result2.toOption.get.active.find(c => c.name == "dev").get.enabled === false)) - Thread.sleep(9000) + Thread.sleep(3000) } test("enable cluster") { val future = kafkaManager.enableCluster("dev") val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(9000) + Thread.sleep(3000) } test("update cluster version") { @@ -264,7 +264,7 @@ class TestKafkaManager extends CuratorAwareTest { assert(result2.isRight === true) assert((result2.toOption.get.pending.nonEmpty === true) || (result2.toOption.get.active.find(c => c.name == "dev").get.version === Kafka_0_8_1_1)) - Thread.sleep(15000) + Thread.sleep(5000) } test("delete topic not supported prior to 0.8.2.0") { @@ -272,7 +272,7 @@ class TestKafkaManager extends CuratorAwareTest { val result = Await.result(future,duration) assert(result.isLeft === true, result) assert(result.swap.toOption.get.msg.contains("not supported")) - Thread.sleep(6000) + Thread.sleep(2000) } test("delete cluster") { @@ -281,13 +281,13 @@ class TestKafkaManager extends CuratorAwareTest { val future = kafkaManager.disableCluster("dev") val result = Await.result(future, duration) assert(result.isRight === true) - Thread.sleep(9000) + Thread.sleep(3000) } val future = kafkaManager.deleteCluster("dev") val result = Await.result(future,duration) assert(result.isRight === true) - Thread.sleep(9000) + Thread.sleep(3000) val future2 = kafkaManager.getClusterList val result2 = Await.result(future2,duration) assert(result2.isRight === true) From 1d317ef6264e0e6865038213addd630a7ddd02ea Mon Sep 17 00:00:00 2001 From: James Oliver Date: Tue, 7 Jul 2015 00:24:55 -0700 Subject: [PATCH 06/10] Disable tests for assembly task --- build.sbt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/build.sbt b/build.sbt index 3262bff95..28af2fd46 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,10 @@ assemblyMergeStrategy in assembly := { case other => (assemblyMergeStrategy in assembly).value(other) } +// Disable tests during assembly +test in assembly := {} + + libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.3.10", "org.webjars" %% "webjars-play" % "2.3.0-2", From 46a53a5248b7efd291702ddcd27776cf0b2c61d7 Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Mon, 27 Jul 2015 19:40:07 +0300 Subject: [PATCH 07/10] Scheduler REST API integration --- app/controllers/Application.scala | 6 +- app/controllers/Cluster.scala | 35 ++- app/kafka/manager/ActorModel.scala | 100 +++++++- app/kafka/manager/KafkaManager.scala | 128 ++++++++++ app/kafka/manager/KafkaManagerActor.scala | 167 ++++++++++++- app/models/navigation/BreadCrumbs.scala | 49 ++++ app/models/navigation/Menus.scala | 17 ++ app/models/navigation/QuickRoutes.scala | 28 ++- app/scheduler/controllers/Broker.scala | 177 +++++++++++++ .../controllers/RebalanceTopics.scala | 51 ++++ .../controllers/SchedulerApplication.scala | 30 +++ .../manager/KafkaSchedulerCommandActor.scala | 96 +++++++ .../manager/KafkaSchedulerStateActor.scala | 86 +++++++ .../SchedulerBrokerViewCacheActor.scala | 225 +++++++++++++++++ .../kafka/manager/SchedulerManagerActor.scala | 234 ++++++++++++++++++ .../kafka/manager/SchedulerRestClient.scala | 232 +++++++++++++++++ .../models/form/BrokerOperations.scala | 15 ++ .../form/RebalanceTopicsOperations.scala | 7 + .../views/broker/addBroker.scala.html | 57 +++++ .../views/broker/brokerList.scala.html | 46 ++++ .../views/broker/brokerListContent.scala.html | 33 +++ .../views/broker/brokerView.scala.html | 24 ++ .../views/broker/brokerViewContent.scala.html | 101 ++++++++ .../views/broker/rebalanceTopics.scala.html | 46 ++++ .../views/broker/updateBroker.scala.html | 56 +++++ .../views/scheduler/addScheduler.scala.html | 38 +++ .../views/scheduler/schedulerList.scala.html | 58 +++++ .../views/scheduler/schedulerView.scala.html | 19 ++ .../scheduler/schedulerViewContent.scala.html | 33 +++ app/views/index.scala.html | 8 +- app/views/navigation/schedulerMenu.scala.html | 20 ++ build.sbt | 1 + conf/routes | 15 ++ 33 files changed, 2230 insertions(+), 8 deletions(-) create mode 100644 app/scheduler/controllers/Broker.scala create mode 100644 app/scheduler/controllers/RebalanceTopics.scala create mode 100644 app/scheduler/controllers/SchedulerApplication.scala create mode 100644 app/scheduler/kafka/manager/KafkaSchedulerCommandActor.scala create mode 100644 app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala create mode 100644 app/scheduler/kafka/manager/SchedulerBrokerViewCacheActor.scala create mode 100644 app/scheduler/kafka/manager/SchedulerManagerActor.scala create mode 100644 app/scheduler/kafka/manager/SchedulerRestClient.scala create mode 100644 app/scheduler/models/form/BrokerOperations.scala create mode 100644 app/scheduler/models/form/RebalanceTopicsOperations.scala create mode 100644 app/scheduler/views/broker/addBroker.scala.html create mode 100644 app/scheduler/views/broker/brokerList.scala.html create mode 100644 app/scheduler/views/broker/brokerListContent.scala.html create mode 100644 app/scheduler/views/broker/brokerView.scala.html create mode 100644 app/scheduler/views/broker/brokerViewContent.scala.html create mode 100644 app/scheduler/views/broker/rebalanceTopics.scala.html create mode 100644 app/scheduler/views/broker/updateBroker.scala.html create mode 100644 app/scheduler/views/scheduler/addScheduler.scala.html create mode 100644 app/scheduler/views/scheduler/schedulerList.scala.html create mode 100644 app/scheduler/views/scheduler/schedulerView.scala.html create mode 100644 app/scheduler/views/scheduler/schedulerViewContent.scala.html create mode 100644 app/views/navigation/schedulerMenu.scala.html diff --git a/app/controllers/Application.scala b/app/controllers/Application.scala index 0fc914248..fc6170d43 100644 --- a/app/controllers/Application.scala +++ b/app/controllers/Application.scala @@ -17,9 +17,9 @@ object Application extends Controller { private[this] val kafkaManager = KafkaManagerContext.getKafkaManager def index = Action.async { - kafkaManager.getClusterList.map { errorOrClusterList => - Ok(views.html.index(errorOrClusterList)) - } + for {errorOrSchedulerList <- kafkaManager.getSchedulerList + errorOrClusterList <- kafkaManager.getClusterList + } yield Ok(views.html.index(errorOrClusterList, errorOrSchedulerList)) } def cluster(c: String) = Action.async { diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala index dcceadbd1..62b5b64ea 100644 --- a/app/controllers/Cluster.scala +++ b/app/controllers/Cluster.scala @@ -5,7 +5,7 @@ package controllers -import kafka.manager.{KafkaVersion, ApiError, ClusterConfig} +import kafka.manager.{SchedulerConfig, KafkaVersion, ApiError, ClusterConfig} import models.FollowLink import models.form._ import play.api.data.Form @@ -71,6 +71,17 @@ object Cluster extends Controller { )(ClusterConfig.apply)(ClusterConfig.customUnapply) ) + val schedulerConfigForm = Form( + mapping( + "name" -> nonEmptyText.verifying(maxLength(250), validateName), + "kafkaVersion" -> nonEmptyText.verifying(validateKafkaVersion), + "apiUrl" -> nonEmptyText.verifying(validateZkHosts), + "zkHosts" -> nonEmptyText.verifying(validateZkHosts), + "zkMaxRetry" -> ignored(100 : Int), + "jmxEnabled" -> boolean + )(SchedulerConfig.apply)(SchedulerConfig.customUnapply) + ) + val updateForm = Form( mapping( "operation" -> nonEmptyText.verifying(validateOperation), @@ -86,6 +97,10 @@ object Cluster extends Controller { Future.successful(Ok(views.html.cluster.addCluster(clusterConfigForm))) } + def addScheduler = Action.async { implicit request => + Future.successful(Ok(scheduler.views.html.scheduler.addScheduler(schedulerConfigForm))) + } + def updateCluster(c: String) = Action.async { implicit request => kafkaManager.getClusterConfig(c).map { errorOrClusterConfig => Ok(views.html.cluster.updateCluster(c,errorOrClusterConfig.map { cc => @@ -112,6 +127,24 @@ object Cluster extends Controller { ) } + def handleAddScheduler = Action.async { implicit request => + schedulerConfigForm.bindFromRequest.fold( + formWithErrors => Future.successful(BadRequest(scheduler.views.html.scheduler.addScheduler(formWithErrors))), + schedulerConfig => { + kafkaManager.addScheduler(schedulerConfig.name, schedulerConfig.version.toString, schedulerConfig.apiUrl, schedulerConfig.curatorConfig.zkConnect, jmxEnabled = true).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.defaultMenu(), + models.navigation.BreadCrumbs.withView("Add Scheduler"), + errorOrSuccess, + "Add Scheduler", + FollowLink("Go to scheduler view.",scheduler.controllers.routes.SchedulerApplication.getScheduler(schedulerConfig.name).toString()), + FollowLink("Try again.",routes.Cluster.addScheduler().toString()) + )) + } + } + ) + } + def handleUpdateCluster(c: String) = Action.async { implicit request => updateForm.bindFromRequest.fold( formWithErrors => Future.successful(BadRequest(views.html.cluster.updateCluster(c,\/-(formWithErrors)))), diff --git a/app/kafka/manager/ActorModel.scala b/app/kafka/manager/ActorModel.scala index 1c922914a..8a246c984 100644 --- a/app/kafka/manager/ActorModel.scala +++ b/app/kafka/manager/ActorModel.scala @@ -5,11 +5,12 @@ package kafka.manager -import java.util.Properties +import java.util.{Date, Properties} import org.joda.time.DateTime import kafka.manager.utils.TopicAndPartition import org.slf4j.LoggerFactory +import scheduler.models.form.Failover import scala.util.Try import scalaz.{NonEmptyList, Validation} @@ -42,6 +43,9 @@ object ActorModel { def numTopics : Int = topicPartitions.size def numPartitions : Int = topicPartitions.values.foldLeft(0)((acc,i) => acc + i.size) } + + case class SMGetBrokerIdentity(id: Int) extends BVRequest + case class BVUpdateTopicMetricsForBroker(id: Int, metrics: IndexedSeq[(String,BrokerMetrics)]) extends CommandRequest case class BVUpdateBrokerMetrics(id: Int, metric: BrokerMetrics) extends CommandRequest @@ -65,6 +69,40 @@ object ActorModel { case class CMRunReassignPartition(topics: Set[String]) extends CommandRequest case class CMGeneratePartitionAssignments(topics: Set[String], brokers: Seq[Int]) extends CommandRequest + case object SMGetView extends QueryRequest + case class SMView(topicsCount: Int, brokersCount: Int, schedulerConfig: SchedulerConfig) extends QueryResponse + + case class SMAddBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) extends CommandRequest + + case class SMUpdateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) extends CommandRequest + + case class SMCommandResult(result: Try[Unit]) extends CommandResponse + + case class KSCAddBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) extends CommandRequest + + case class KSCUpdateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) extends CommandRequest + + case class SMStartBroker(brokerId: Int) extends CommandRequest + case class SMStopBroker(brokerId: Int) extends CommandRequest + case class SMRemoveBroker(brokerId: Int) extends CommandRequest + case class SMRebalanceTopics(ids: String, topics:Option[String]) extends CommandRequest + + case class KSCStartBroker(id: Int) extends CommandRequest + case class KSCStopBroker(id: Int) extends CommandRequest + case class KSCRemoveBroker(id: Int) extends CommandRequest + case class KSCRebalanceTopics(ids: String, topics:Option[String]) extends CommandRequest case class CMCommandResult(result: Try[Unit]) extends CommandResponse case class CMCommandResults(result: IndexedSeq[Try[Unit]]) extends CommandResponse @@ -107,6 +145,66 @@ object ActorModel { case class KMClusterCommandRequest(clusterName: String, request: CommandRequest) extends CommandRequest case class KMCommandResult(result: Try[Unit]) extends CommandResponse + case class KMSchedulerCommandRequest(scheduler: String, request: CommandRequest) extends CommandRequest + case class KMSchedulerList(active: IndexedSeq[SchedulerConfig], pending : IndexedSeq[SchedulerConfig]) extends QueryResponse + case object KMGetAllSchedulers extends QueryRequest + case class KMAddScheduler(config: SchedulerConfig) extends CommandRequest + case class KMSchedulerQueryRequest(schedulerName: String, request: QueryRequest) extends QueryRequest + case class KMGetSchedulerConfig(schedulerName: String) extends QueryRequest + case class KMSchedulerConfigResult(result: Try[SchedulerConfig]) extends QueryResponse + + case object SchedulerKSGetBrokers extends KSRequest + case class SchedulerBrokerList(list: Seq[SchedulerBrokerIdentity], schedulerConfig: SchedulerConfig) extends QueryResponse + + case class SchedulerBrokerTaskIdentity(id: String, + slaveId: String, + executorId: String, + hostname: String, + endpoint: Option[String], + state: String) + + case class SchedulerBrokerStickinessIdentity(period: String, + stopTime: Option[Date], + hostname: Option[String]) + + case class SchedulerBrokerFailoverIdentity(delay: String, + maxDelay: String, + maxTries: Option[Int], + failures: Option[Int], + failureTime: Option[Date]) + + case class SchedulerBrokerIdentity(id: Int, active: Boolean, cpus: Double, mem: Long, heap: Long, port: Option[String], + bindAddress: Option[String], constraints: Seq[(String, String)], options: Seq[(String, String)], + log4jOptions: Seq[(String, String)], jvmOptions: Option[String], + stickiness: SchedulerBrokerStickinessIdentity, + failover: SchedulerBrokerFailoverIdentity, + task: Option[SchedulerBrokerTaskIdentity], + schedulerConfig: SchedulerConfig = null, + metrics: Option[BrokerMetrics] = None, + stats: Option[BrokerClusterStats] = None) { + + def actualHost(): Option[String] = task.flatMap(t => t.endpoint.map(_.split(":")(0))) + + def actualPort(): Option[String] = task.flatMap(t => t.endpoint.map(_.split(":")(1))) + + def numTopics() = 0 + def numPartitions() = 0 + + def topicPartitions() = Seq.empty[(TopicIdentity, IndexedSeq[Int])] + + def constraintsDesc = constraints.map { case (k, v) => s"$k=$v" }.mkString + def optionsDesc = options.map { case (k, v) => s"$k=$v" }.mkString + def log4jOptionsDesc = log4jOptions.map { case (k, v) => s"$k=$v" }.mkString + + def state(): String = { + if (active) + if (actualHost().isEmpty) "starting" else "running" + else + "stopped|failed" + } + } + /*********************************/ + sealed trait KSRequest extends QueryRequest case object KSGetTopics extends KSRequest case class KSGetTopicConfig(topic: String) extends KSRequest diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 0e27556e0..d42bdf34f 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -12,6 +12,7 @@ import akka.actor.{ActorPath, ActorSystem, Props} import akka.util.Timeout import com.typesafe.config.{ConfigFactory, Config} import kafka.manager.ActorModel._ +import scheduler.models.form.Failover import org.slf4j.{LoggerFactory, Logger} import scala.concurrent.{ExecutionContext, Future} @@ -24,6 +25,8 @@ import scala.util.{Success, Failure, Try} */ case class TopicListExtended(list: IndexedSeq[(String, Option[TopicIdentity])], deleteSet: Set[String], underReassignments: IndexedSeq[String]) case class BrokerListExtended(list: IndexedSeq[BrokerIdentity], metrics: Map[Int,BrokerMetrics], combinedMetric: Option[BrokerMetrics], clusterConfig: ClusterConfig) + +case class SchedulerBrokerListExtended(list: Seq[SchedulerBrokerIdentity], metrics: Map[Int,BrokerMetrics], combinedMetric: Option[BrokerMetrics], schedulerConfig: SchedulerConfig) case class ApiError(msg: String) object ApiError { private[this] val log : Logger = LoggerFactory.getLogger(classOf[ApiError]) @@ -488,4 +491,129 @@ class KafkaManager(akkaConfig: Config) ) )(identity[Option[ReassignPartitions]]) } + + def addScheduler(schedulerName: String, version: String, apiUrl: String, zkHosts: String, jmxEnabled: Boolean): Future[ApiError \/ + Unit] = + { + val sc = SchedulerConfig(schedulerName, apiUrl, CuratorConfig(zkHosts), enabled = true, KafkaVersion(version), jmxEnabled = jmxEnabled) + + tryWithKafkaManagerActor(KMAddScheduler(sc)) { result: KMCommandResult => + result.result.get + } + } + + def getSchedulerView(schedulerName: String): Future[ApiError \/ SMView] = { + tryWithKafkaManagerActor(KMSchedulerQueryRequest(schedulerName, SMGetView))(identity[SMView]) + } + + def getSchedulerBrokerList(schedulerName: String): Future[ApiError \/ SchedulerBrokerListExtended] = { + implicit val ec = apiExecutionContext + + val futureBrokerList = tryWithKafkaManagerActor(KMSchedulerQueryRequest(schedulerName, SchedulerKSGetBrokers))(identity[SchedulerBrokerList]) + futureBrokerList.map { + case \/-(SchedulerBrokerList(identities, config)) => + \/-(SchedulerBrokerListExtended(identities, Map.empty, None, config)) + case a : -\/[ApiError] => + a + } + } + + def getSchedulerConfig(schedulerName: String): Future[ApiError \/ SchedulerConfig] = { + tryWithKafkaManagerActor(KMGetSchedulerConfig(schedulerName)) { result: KMSchedulerConfigResult => + result.result.get + } + } + + def getSchedulerList: Future[ApiError \/ KMSchedulerList] = { + tryWithKafkaManagerActor(KMGetAllSchedulers)(identity[KMSchedulerList]) + } + + + def addBroker(schedulerName:String, id:Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMAddBroker(id, cpus, mem, heap, port, bindAddress, + constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + + } + + def updateBroker(schedulerName:String, id:Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMUpdateBroker(id, cpus, mem, heap, port, bindAddress, + constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + + } + + def getBrokerIdentity(schedulerName: String, brokerId: Int): Future[ApiError \/ SchedulerBrokerIdentity] = { + val futureView = tryWithKafkaManagerActor( + KMSchedulerQueryRequest( + schedulerName, + SMGetBrokerIdentity(brokerId) + ) + )(identity[Option[SchedulerBrokerIdentity]]) + + implicit val ec = apiExecutionContext + futureView.flatMap[ApiError \/ SchedulerBrokerIdentity] { errOrView => + errOrView.fold( + { err: ApiError => + Future.successful(-\/[ApiError](err)) + }, { viewOption: Option[SchedulerBrokerIdentity] => + viewOption.fold { + Future.successful[ApiError \/ SchedulerBrokerIdentity](-\/(ApiError(s"Broker not found $brokerId for scheduler $schedulerName"))) + } { view => + Future.successful(\/-(view)) + } + } + ) + } + } + + def startBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMStartBroker(brokerId))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } + + def stopBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMStopBroker(brokerId))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } + + def removeBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMRemoveBroker(brokerId))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } + + def rebalanceTopics(schedulerName: String, ids: String, topics: Option[String]): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMRebalanceTopics(ids, topics))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } } \ No newline at end of file diff --git a/app/kafka/manager/KafkaManagerActor.scala b/app/kafka/manager/KafkaManagerActor.scala index cc8f1a6ff..ad0ab9227 100644 --- a/app/kafka/manager/KafkaManagerActor.scala +++ b/app/kafka/manager/KafkaManagerActor.scala @@ -16,6 +16,7 @@ import org.apache.curator.framework.recipes.cache.{PathChildrenCacheEvent, PathC import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex import org.apache.zookeeper.CreateMode +import scheduler.kafka.manager.{SchedulerManagerActorConfig, SchedulerManagerActor} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Future, ExecutionContext} @@ -154,6 +155,81 @@ object ClusterConfig { case class ClusterConfig (name: String, curatorConfig : CuratorConfig, enabled: Boolean, version: KafkaVersion, jmxEnabled: Boolean) +object SchedulerConfig { + + def apply(name: String, version : String, apiUrl: String, zkHosts: String, zkMaxRetry: Int = 100, jmxEnabled: Boolean) : SchedulerConfig = { + val kafkaVersion = KafkaVersion(version) + //validate scheduler name + ClusterConfig.validateName(name) + //validate zk hosts + ClusterConfig.validateZkHosts(zkHosts) + val cleanZkHosts = zkHosts.replaceAll(" ","") + new SchedulerConfig(name, apiUrl, CuratorConfig(cleanZkHosts, zkMaxRetry), true, kafkaVersion, jmxEnabled) + } + + def customUnapply(sc: SchedulerConfig) : Option[(String, String, String, String, Int, Boolean)] = { + Some((sc.name, sc.version.toString, sc.apiUrl, sc.curatorConfig.zkConnect, sc.curatorConfig.zkMaxRetry, sc.jmxEnabled)) + } + + import scalaz.{Failure, Success} + import scalaz.syntax.applicative._ + import org.json4s._ + import org.json4s.jackson.JsonMethods._ + import org.json4s.jackson.Serialization + import org.json4s.scalaz.JsonScalaz._ + import scala.language.reflectiveCalls + + implicit val formats = Serialization.formats(FullTypeHints(List(classOf[SchedulerConfig]))) + + implicit def curatorConfigJSONW: JSONW[CuratorConfig] = new JSONW[CuratorConfig] { + def write(a: CuratorConfig) = + makeObj(("zkConnect" -> toJSON(a.zkConnect)) + :: ("zkMaxRetry" -> toJSON(a.zkMaxRetry)) + :: ("baseSleepTimeMs" -> toJSON(a.baseSleepTimeMs)) + :: ("maxSleepTimeMs" -> toJSON(a.maxSleepTimeMs)) + :: Nil) + } + + implicit def curatorConfigJSONR: JSONR[CuratorConfig] = CuratorConfig.applyJSON( + field[String]("zkConnect"), field[Int]("zkMaxRetry"), field[Int]("baseSleepTimeMs"), field[Int]("maxSleepTimeMs")) + + def serialize(config: SchedulerConfig): Array[Byte] = { + val json = makeObj( + ("name" -> toJSON(config.name)) + :: ("apiUrl" -> toJSON(config.apiUrl)) + :: ("curatorConfig" -> toJSON(config.curatorConfig)) + :: ("enabled" -> toJSON(config.enabled)) + :: ("kafkaVersion" -> toJSON(config.version.toString)) + :: ("jmxEnabled" -> toJSON(config.jmxEnabled)) + :: Nil) + compact(render(json)).getBytes(StandardCharsets.UTF_8) + } + + def deserialize(ba: Array[Byte]): Try[SchedulerConfig] = { + Try { + val json = parse(kafka.manager.utils.deserializeString(ba)) + + val result = (field[String]("name")(json) |@| field[String]("apiUrl")(json) |@| field[CuratorConfig]("curatorConfig")(json) |@| field[Boolean]("enabled")(json)) { + (name: String, apiUrl: String, curatorConfig: CuratorConfig, enabled: Boolean) => + val versionString = field[String]("kafkaVersion")(json) + val version = versionString.map(KafkaVersion.apply).getOrElse(Kafka_0_8_1_1) + val jmxEnabled = field[Boolean]("jmxEnabled")(json) + SchedulerConfig(name, apiUrl, curatorConfig, enabled, version, jmxEnabled.getOrElse(false)) + } + + result match { + case Failure(nel) => + throw new IllegalArgumentException(nel.toString()) + case Success(schedulerConfig) => + schedulerConfig + } + + } + } +} + +case class SchedulerConfig (name: String, apiUrl: String, curatorConfig : CuratorConfig, enabled: Boolean, version: KafkaVersion, jmxEnabled: Boolean) + object KafkaManagerActor { val ZkRoot : String = "/kafka-manager" @@ -187,6 +263,7 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) private[this] val baseClusterZkPath = zkPath("clusters") private[this] val configsZkPath = zkPath("configs") + private[this] val schedulersZkPath = zkPath("schedulers") private[this] val deleteClustersZkPath = zkPath("deleteClusters") log.info(s"zk=${kafkaManagerConfig.curatorConfig.zkConnect}") @@ -219,6 +296,8 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) private[this] val kafkaManagerPathCache = new PathChildrenCache(curator,configsZkPath,true) + private[this] val schedulersPathCache = new PathChildrenCache(curator,schedulersZkPath,true) + private[this] val mutex = new InterProcessSemaphoreMutex(curator, zkPath("mutex")) private[this] val dcProps = { @@ -259,6 +338,10 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) private[this] var clusterConfigMap : Map[String,ClusterConfig] = Map.empty private[this] var pendingClusterConfigMap : Map[String,ClusterConfig] = Map.empty + private[this] var schedulerManagerMap : Map[String,ActorPath] = Map.empty + private[this] var schedulerConfigMap : Map[String,SchedulerConfig] = Map.empty + private[this] var pendingSchedulerConfigMap : Map[String,SchedulerConfig] = Map.empty + private[this] def modify(fn: => Any) : Unit = { if(longRunningExecutor.getQueue.remainingCapacity() == 0) { Future.successful(KMCommandResult(Try(throw new UnsupportedOperationException("Long running executor blocking queue is full!")))) @@ -293,9 +376,11 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) log.info("Starting kafka manager path cache...") kafkaManagerPathCache.start(StartMode.BUILD_INITIAL_CACHE) + schedulersPathCache.start(StartMode.BUILD_INITIAL_CACHE) log.info("Adding kafka manager path cache listener...") kafkaManagerPathCache.getListenable.addListener(pathCacheListener) + schedulersPathCache.getListenable.addListener(pathCacheListener) implicit val ec = longRunningExecutionContext //schedule periodic forced update @@ -319,12 +404,14 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) log.info("Removing kafka manager path cache listener...") Try(kafkaManagerPathCache.getListenable.removeListener(pathCacheListener)) + Try(schedulersPathCache.getListenable.removeListener(pathCacheListener)) log.info("Shutting down long running executor...") Try(longRunningExecutor.shutdown()) log.info("Shutting down kafka manager path cache...") Try(kafkaManagerPathCache.close()) + Try(schedulersPathCache.close()) log.info("Shutting down delete clusters path cache...") Try(deleteClustersPathCache.close()) @@ -361,7 +448,26 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) clusterManagerPath:ActorPath => context.actorSelection(clusterManagerPath).forward(request) } - + + case KMGetAllSchedulers => + sender ! KMSchedulerList(schedulerConfigMap.values.toIndexedSeq, pendingSchedulerConfigMap.values.toIndexedSeq) + + case KMGetSchedulerConfig(name) => + sender ! KMSchedulerConfigResult(Try { + val sc = schedulerConfigMap.get(name) + require(sc.isDefined, s"Unknown scheduler : $name") + sc.get + }) + + + case KMSchedulerQueryRequest(schedulerName, request) => + schedulerManagerMap.get(schedulerName).fold[Unit] { + sender ! ActorErrorResponse(s"Unknown scheduler : $schedulerName") + } { + schedulerManagerPath: ActorPath => + context.actorSelection(schedulerManagerPath).forward(request) + } + case any: Any => log.warning("kma : processQueryRequest : Received unknown message: {}", any) } @@ -381,6 +487,15 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkpath, data) } + case KMAddScheduler(schedulerConfig) => + modify { + val data: Array[Byte] = SchedulerConfig.serialize(schedulerConfig) + val zkpath: String = getSchedulersZkPath(schedulerConfig) + require(schedulersPathCache.getCurrentData(zkpath) == null, + s"Scheduler already exists : ${schedulerConfig.name}") + curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkpath, data) + } + case KMUpdateCluster(clusterConfig) => modify { val data: Array[Byte] = ClusterConfig.serialize(clusterConfig) @@ -467,6 +582,14 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) context.actorSelection(clusterManagerPath).forward(request) } + case KMSchedulerCommandRequest(schedulerName, request) => + schedulerManagerMap.get(schedulerName).fold[Unit] { + sender ! ActorErrorResponse(s"Unknown scheduler : $schedulerName") + } { + schedulerManagerPath:ActorPath => + context.actorSelection(schedulerManagerPath).forward(request) + } + case KMUpdateState => updateState() @@ -490,6 +613,10 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) zkPathFrom(configsZkPath,clusterConfig.name) } + private[this] def getSchedulersZkPath(schedulerConfig: SchedulerConfig) : String = { + zkPathFrom(schedulersZkPath,schedulerConfig.name) + } + private[this] def getClusterZkPath(clusterConfig: ClusterConfig) : String = { zkPathFrom(baseClusterZkPath,clusterConfig.name) } @@ -539,6 +666,31 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) } } + private[this] def addScheduler(config: SchedulerConfig): Try[Boolean] = { + Try { + if(!config.enabled) { + log.info("Not adding scheduler manager for disabled scheduler : {}", config.name) + schedulerConfigMap += (config.name -> config) + pendingSchedulerConfigMap -= config.name + false + } else { + log.info("Adding new scheduler manager for scheduler : {}", config.name) + val schedulerManagerConfig = SchedulerManagerActorConfig( + kafkaManagerConfig.pinnedDispatcherName, + getSchedulersZkPath(config), + kafkaManagerConfig.curatorConfig, + config, + kafkaManagerConfig.brokerViewUpdatePeriod) + val props = Props(classOf[SchedulerManagerActor], schedulerManagerConfig) + val newSchedulerManager = context.actorOf(props, config.name).path + schedulerConfigMap += (config.name -> config) + schedulerManagerMap += (config.name -> newSchedulerManager) + pendingSchedulerConfigMap -= config.name + true + } + } + } + private[this] def updateCluster(currentConfig: ClusterConfig, newConfig: ClusterConfig): Try[Boolean] = { Try { if(newConfig.curatorConfig.zkConnect == currentConfig.curatorConfig.zkConnect @@ -558,6 +710,10 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) } } + private[this] def updateScheduler(currentConfig: SchedulerConfig, newConfig: SchedulerConfig): Try[Boolean] = { + Try(true) + } + private[this] def updateState(): Unit = { log.info("Updating internal state...") val result = Try { @@ -569,6 +725,15 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) clusterConfigMap.get(newConfig.name).fold(addCluster(newConfig))(updateCluster(_,newConfig)) } } + + schedulersPathCache.getCurrentData.asScala.foreach { data => + SchedulerConfig.deserialize(data.getData) match { + case Failure(t) => + log.error("Failed to deserialize scheduler config",t) + case Success(newScheduler) => + schedulerConfigMap.get(newScheduler.name).fold(addScheduler(newScheduler))(updateScheduler(_,newScheduler)) + } + } } result match { case Failure(t) => diff --git a/app/models/navigation/BreadCrumbs.scala b/app/models/navigation/BreadCrumbs.scala index fa3becb7c..fafd0c33b 100644 --- a/app/models/navigation/BreadCrumbs.scala +++ b/app/models/navigation/BreadCrumbs.scala @@ -31,6 +31,11 @@ object BreadCrumbs { "Add Cluster" -> IndexedSeq("Clusters".baseRouteBreadCrumb) ) + val baseSchedulerBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map( + "Schedulers" -> IndexedSeq.empty[BreadCrumb], + "Add Scheduler" -> IndexedSeq("Schedulers".baseRouteBreadCrumb) + ) + val clusterBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map( "Unknown Cluster Operation" -> IndexedSeq("Clusters".baseRouteBreadCrumb), "Delete Cluster" -> IndexedSeq("Clusters".baseRouteBreadCrumb, BCDynamicText(identity)), @@ -65,6 +70,23 @@ object BreadCrumbs { ) ) + val schedulerBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map( + "Update Scheduler" -> IndexedSeq("Schedulers".baseRouteBreadCrumb, BCDynamicText(identity)), + "Summary" -> IndexedSeq("Schedulers".baseRouteBreadCrumb, BCDynamicText(identity)), + "Brokers" -> IndexedSeq("Schedulers".baseRouteBreadCrumb, BCDynamicNamedLink(identity, "Summary".schedulerRoute)), + "Broker View" -> IndexedSeq( + "Schedulers".baseRouteBreadCrumb, + BCDynamicNamedLink(identity, "Summary".schedulerRoute), + "Brokers".schedulerRouteBreadCrumb), + "Add Broker" -> IndexedSeq( + "Schedulers".baseRouteBreadCrumb, + BCDynamicNamedLink(identity,"Summary".schedulerRoute), + "Brokers".schedulerRouteBreadCrumb), + "Rebalance Topics" -> IndexedSeq( + "Schedulers".baseRouteBreadCrumb, + BCDynamicNamedLink(identity, "Summary".schedulerRoute)) + ) + val topicBreadCrumbs: Map[String, IndexedSeq[BreadCrumb]] = Map( "Topic View" -> IndexedSeq( "Clusters".baseRouteBreadCrumb, @@ -82,6 +104,14 @@ object BreadCrumbs { rendered :+ BCActive(s) } + def withSView(s: String) : IndexedSeq[BreadCrumbRendered] = { + val rendered : IndexedSeq[BreadCrumbRendered] = baseSchedulerBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map { + case BCStaticLink(n,c) => BCLink(n,c.toString()) + case a: Any => throw new IllegalArgumentException(s"Only static link supported : $a") + } + rendered :+ BCActive(s) + } + private[this] def renderWithCluster(s: String, clusterName: String) : IndexedSeq[BreadCrumbRendered] = { clusterBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map { case BCStaticLink(n,c) => BCLink(n,c.toString()) @@ -92,14 +122,33 @@ object BreadCrumbs { } } + private[this] def renderWithScheduler(s: String, schedulerName: String) : IndexedSeq[BreadCrumbRendered] = { + schedulerBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map { + case BCStaticLink(n,c) => BCLink(n,c.toString()) + case BCDynamicNamedLink(cn, cl) => BCLink(cn(schedulerName),cl(schedulerName).toString()) + case BCDynamicLink(cn, cl) => BCLink(cn,cl(schedulerName).toString()) + case BCDynamicText(cn) => BCText(cn(schedulerName)) + case _ => BCText("ERROR") + } + } + + def withNamedViewAndCluster(s: String, clusterName: String, name: String) : IndexedSeq[BreadCrumbRendered] = { renderWithCluster(s, clusterName) :+ BCActive(name) } + def withNamedViewAndScheduler(s: String, schedulerName: String, name: String) : IndexedSeq[BreadCrumbRendered] = { + renderWithScheduler(s, schedulerName) :+ BCActive(name) + } + def withViewAndCluster(s: String, clusterName: String) : IndexedSeq[BreadCrumbRendered] = { withNamedViewAndCluster(s, clusterName, s) } + def withViewAndScheduler(s: String, schedulerName: String) : IndexedSeq[BreadCrumbRendered] = { + withNamedViewAndScheduler(s, schedulerName, s) + } + private[this] def renderWithClusterAndTopic(s: String, clusterName: String, topic: String) : IndexedSeq[BreadCrumbRendered] = { topicBreadCrumbs.getOrElse(s,IndexedSeq.empty[BreadCrumb]) map { case BCStaticLink(n,c) => BCLink(n,c.toString()) diff --git a/app/models/navigation/Menus.scala b/app/models/navigation/Menus.scala index 09657e692..84931b064 100644 --- a/app/models/navigation/Menus.scala +++ b/app/models/navigation/Menus.scala @@ -25,10 +25,27 @@ object Menus { "Reassign Partitions".clusterMenu(cluster) ) + def schedulerMenus(scheduler: String) : IndexedSeq[Menu] = IndexedSeq( + Menu("Scheduler",IndexedSeq( + "Summary".schedulerRouteMenuItem(scheduler), + "List".baseRouteMenuItem, + "Add Scheduler".baseRouteMenuItem), + None), + Menu("Brokers",IndexedSeq( + "Add Broker".schedulerRouteMenuItem(scheduler)), + None), + "Rebalance Topics".schedulerMenu(scheduler) + ) + def indexMenu : IndexedSeq[Menu] = IndexedSeq( Menu("Cluster",IndexedSeq( "List".baseRouteMenuItem, "Add Cluster".baseRouteMenuItem), + None), + + Menu("Scheduler",IndexedSeq( + "List".baseRouteMenuItem, + "Add Scheduler".baseRouteMenuItem), None) ) } diff --git a/app/models/navigation/QuickRoutes.scala b/app/models/navigation/QuickRoutes.scala index 08803ecfc..0772b9ca5 100644 --- a/app/models/navigation/QuickRoutes.scala +++ b/app/models/navigation/QuickRoutes.scala @@ -15,8 +15,10 @@ object QuickRoutes { val baseRoutes : Map[String, Call] = Map( "Clusters" -> controllers.routes.Application.index(), + "Schedulers" -> controllers.routes.Application.index(), "List" -> controllers.routes.Application.index(), - "Add Cluster" -> controllers.routes.Cluster.addCluster() + "Add Cluster" -> controllers.routes.Cluster.addCluster(), + "Add Scheduler" -> controllers.routes.Cluster.addScheduler() ) val clusterRoutes : Map[String, String => Call] = Map( "Update Cluster" -> controllers.routes.Cluster.updateCluster, @@ -28,6 +30,15 @@ object QuickRoutes { "Preferred Replica Election" -> controllers.routes.PreferredReplicaElection.preferredReplicaElection, "Reassign Partitions" -> controllers.routes.ReassignPartitions.reassignPartitions ) + + val schedulerRoutes : Map[String, String => Call] = Map( + "Update Scheduler" -> controllers.routes.Cluster.updateCluster, + "Summary" -> scheduler.controllers.routes.SchedulerApplication.getScheduler, + "Brokers" -> scheduler.controllers.routes.SchedulerApplication.brokers, + "Add Broker" -> scheduler.controllers.routes.Broker.addBroker, + "Rebalance Topics" -> scheduler.controllers.routes.RebalanceTopics.rebalanceTopics + ) + val topicRoutes : Map[String, (String, String) => Call] = Map( "Topic View" -> controllers.routes.Topic.topic, "Add Partitions" -> controllers.routes.Topic.addPartitions, @@ -64,6 +75,21 @@ object QuickRoutes { } } + implicit class SchedulerRoute(s: String) { + def schedulerRouteMenuItem(c: String): (String, Call) = { + s -> schedulerRoutes(s)(c) + } + def schedulerRoute(c: String): Call = { + schedulerRoutes(s)(c) + } + def schedulerMenu(c: String): Menu = { + Menu(s,IndexedSeq.empty,Some(schedulerRoute(c))) + } + def schedulerRouteBreadCrumb : BCDynamicLink = { + BCDynamicLink( s, schedulerRoutes(s)) + } + } + implicit class TopicRoute(s: String) { def topicRouteMenuItem(c: String, t: String): (String, Call) = { s -> topicRoutes(s)(c,t) diff --git a/app/scheduler/controllers/Broker.scala b/app/scheduler/controllers/Broker.scala new file mode 100644 index 000000000..ebad792b9 --- /dev/null +++ b/app/scheduler/controllers/Broker.scala @@ -0,0 +1,177 @@ +package scheduler.controllers + +import controllers.KafkaManagerContext +import kafka.manager.ActorModel.SchedulerBrokerIdentity +import models.FollowLink +import scheduler.models.form._ +import models.navigation.Menus +import play.api.data.Form +import play.api.data.Forms._ +import play.api.data.validation.{Constraint, Constraints} +import play.api.mvc.{Action, Controller} +import play.api.data.format.Formats._ + +import scala.concurrent.Future +import scalaz.{-\/, \/-} + +object Broker extends Controller{ + import play.api.libs.concurrent.Execution.Implicits.defaultContext + + private[this] val kafkaManager = KafkaManagerContext.getKafkaManager + + val validateCpus: Constraint[Double] = Constraints.min(minValue = 0.0) + + val defaultAddForm = Form( + mapping( + "id" -> number(min = 0), + "cpus" -> optional(of[Double].verifying(validateCpus)), + "mem" -> optional(longNumber), + "heap" -> optional(longNumber), + "port" -> optional(text), + "bindAddress" -> optional(text), + "constraints" -> optional(text), + "options" -> optional(text), + "log4jOptions" -> optional(text), + "jvmOptions" -> optional(text), + "stickinessPeriod" -> optional(text), + "failover" -> mapping( + "failoverDelay" -> optional(text), + "failoverMaxDelay" -> optional(text), + "failoverMaxTries" -> optional(number(0)) + )(Failover.apply)(Failover.unapply) + )(AddBroker.apply)(AddBroker.unapply) + ) + + private val defaultF = AddBroker(1, None, None, None, None, None, None, None, None, None, None, Failover(None, None, None)) + + private val addBrokerForm = defaultAddForm.fill(defaultF) + + + val defaultUpdateForm = Form( + mapping( + "id" -> number(min = 0), + "cpus" -> optional(of[Double].verifying(validateCpus)), + "mem" -> optional(longNumber), + "heap" -> optional(longNumber), + "port" -> optional(text), + "bindAddress" -> optional(text), + "constraints" -> optional(text), + "options" -> optional(text), + "log4jOptions" -> optional(text), + "jvmOptions" -> optional(text), + "stickinessPeriod" -> optional(text), + "failover" -> mapping( + "failoverDelay" -> optional(text), + "failoverMaxDelay" -> optional(text), + "failoverMaxTries" -> optional(number(0)) + )(Failover.apply)(Failover.unapply) + )(UpdateBroker.apply)(UpdateBroker.unapply) + ) + + private def defaultUpdateF(brokerId: Int) = UpdateBroker(brokerId, None, None, None, None, None, None, None, None, None, None, Failover(None, None, None)) + + private def stringPairs(pairs: Seq[(String, String)]) = + if (pairs.isEmpty) None + else + Some(pairs.map { + case (key, v) => s"$key=$v" + }.mkString(",")) + + private def updateBrokerForm(schedulerName: String, bi: SchedulerBrokerIdentity) = { + defaultUpdateForm.fill(UpdateBroker(bi.id, Option(bi.cpus), Option(bi.mem), Option(bi.heap), bi.port, bi.bindAddress, stringPairs(bi.constraints), + stringPairs(bi.options), stringPairs(bi.log4jOptions), bi.jvmOptions, Option(bi.stickiness.period), + Failover(Option(bi.failover.delay), Option(bi.failover.maxDelay), bi.failover.maxTries))) + } + + def addBroker(schedulerName: String) = Action.async { implicit request => + Future.successful(Ok(scheduler.views.html.broker.addBroker(schedulerName, \/-(addBrokerForm)))) + } + + def handleAddBroker(schedulerName: String) = Action.async { implicit request => + defaultAddForm.bindFromRequest.fold( + formWithErrors => Future.successful(BadRequest(scheduler.views.html.broker.addBroker(schedulerName,\/-(formWithErrors)))), + ab => { + kafkaManager.addBroker(schedulerName,ab.id, ab.cpus, ab.mem, ab.heap, ab.port, ab.bindAddress, ab.constraints, ab.options, + ab.log4jOptions, ab.jvmOptions, ab.stickinessPeriod, ab.failover).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.schedulerMenu(schedulerName,"Broker","Add Broker",Menus.schedulerMenus(schedulerName)), + models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Add Broker"), + errorOrSuccess, + "Add Broker", + FollowLink("Go to broker view.",scheduler.controllers.routes.SchedulerApplication.broker(schedulerName, ab.id).toString()), + FollowLink("Try again.",scheduler.controllers.routes.Broker.addBroker(schedulerName).toString()) + )) + } + } + ) + } + + def handleStartBroker(schedulerName: String, id: Int) = Action.async { implicit request => + kafkaManager.startBroker(schedulerName, id).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.schedulerMenu(schedulerName, "Broker", "Broker View", Menus.schedulerMenus(schedulerName)), + models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Start Broker"), + errorOrSuccess, + "Start Broker", + FollowLink("Go to broker view.", scheduler.controllers.routes.SchedulerApplication.broker(schedulerName, id).toString()), + FollowLink("Try again.", scheduler.controllers.routes.Broker.handleStartBroker(schedulerName, id).toString()) + )) + } + } + + def handleStopBroker(schedulerName: String, id: Int) = Action.async { implicit request => + kafkaManager.stopBroker(schedulerName, id).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.schedulerMenu(schedulerName, "Broker", "Broker View", Menus.schedulerMenus(schedulerName)), + models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Stop Broker"), + errorOrSuccess, + "Stop Broker", + FollowLink("Go to broker view.", scheduler.controllers.routes.SchedulerApplication.broker(schedulerName, id).toString()), + FollowLink("Try again.", scheduler.controllers.routes.Broker.handleStopBroker(schedulerName, id).toString()) + )) + } + } + + def handleRemoveBroker(schedulerName: String, id: Int) = Action.async { implicit request => + kafkaManager.removeBroker(schedulerName, id).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.schedulerMenu(schedulerName, "Broker", "Broker View", Menus.schedulerMenus(schedulerName)), + models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Remove Broker"), + errorOrSuccess, + "Remove Broker", + FollowLink("Go to brokers list.", scheduler.controllers.routes.SchedulerApplication.brokers(schedulerName).toString()), + FollowLink("Try again.", scheduler.controllers.routes.Broker.handleRemoveBroker(schedulerName, id).toString()) + )) + } + } + + def updateBroker(schedulerName: String, id: Int) = Action.async { implicit request => + val errorOrFormFuture = kafkaManager.getBrokerIdentity(schedulerName, id).map { errorOrBrokerIdentity => + errorOrBrokerIdentity.fold(e => -\/(e), { brokerIdentity: SchedulerBrokerIdentity => + \/-(updateBrokerForm(schedulerName, brokerIdentity)) + }) + } + errorOrFormFuture.map { errorOrForm => + Ok(scheduler.views.html.broker.updateBroker(schedulerName, id, errorOrForm)) + } + } + + def handleUpdateBroker(schedulerName: String, brokerId: Int) = Action.async { implicit request => + defaultUpdateForm.bindFromRequest.fold( + formWithErrors => Future.successful(BadRequest(scheduler.views.html.broker.updateBroker(schedulerName, brokerId, \/-(formWithErrors)))), + ub => { + kafkaManager.updateBroker(schedulerName, ub.id, ub.cpus, ub.mem, ub.heap, ub.port, ub.bindAddress, ub.constraints, ub.options, + ub.log4jOptions, ub.jvmOptions, ub.stickinessPeriod, ub.failover).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.schedulerMenu(schedulerName, "Broker", "Broker View", Menus.schedulerMenus(schedulerName)), + models.navigation.BreadCrumbs.withNamedViewAndScheduler("Brokers",schedulerName,"Update Broker"), + errorOrSuccess, + "Update Broker", + FollowLink("Go to broker view.",scheduler.controllers.routes.SchedulerApplication.broker(schedulerName, ub.id).toString()), + FollowLink("Try again.",scheduler.controllers.routes.Broker.updateBroker(schedulerName, ub.id).toString()) + )) + } + } + ) + } +} diff --git a/app/scheduler/controllers/RebalanceTopics.scala b/app/scheduler/controllers/RebalanceTopics.scala new file mode 100644 index 000000000..46a0c7697 --- /dev/null +++ b/app/scheduler/controllers/RebalanceTopics.scala @@ -0,0 +1,51 @@ +package scheduler.controllers + +import controllers.KafkaManagerContext +import models.FollowLink +import scheduler.models.form.{RebalanceTopics => RebalanceTopicsOp} +import models.navigation.Menus +import play.api.data.Form +import play.api.data.Forms._ +import play.api.mvc.{Action, Controller} + +import scala.concurrent.Future +import scalaz.\/- + +object RebalanceTopics extends Controller{ + import play.api.libs.concurrent.Execution.Implicits.defaultContext + + private[this] val kafkaManager = KafkaManagerContext.getKafkaManager + + val defaultRebalanceForm = Form( + mapping( + "ids" -> text, + "topics" -> optional(text) + )(RebalanceTopicsOp.apply)(RebalanceTopicsOp.unapply) + ) + + private val defaultF = RebalanceTopicsOp("*", Some("")) + + private val rebalanceForm = defaultRebalanceForm.fill(defaultF) + + def rebalanceTopics(schedulerName: String) = Action.async { implicit request => + Future.successful(Ok(scheduler.views.html.broker.rebalanceTopics(schedulerName, \/-(rebalanceForm)))) + } + + def handleRebalance(schedulerName: String) = Action.async { implicit request => + defaultRebalanceForm.bindFromRequest.fold( + formWithErrors => Future.successful(BadRequest(scheduler.views.html.broker.rebalanceTopics(schedulerName,\/-(formWithErrors)))), + rt => { + kafkaManager.rebalanceTopics(schedulerName, rt.ids, rt.topics).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.schedulerMenu(schedulerName,"Rebalance Topics","",Menus.schedulerMenus(schedulerName)), + models.navigation.BreadCrumbs.withNamedViewAndScheduler("Rebalance Topics",schedulerName,"Rebalance Topics"), + errorOrSuccess, + "Rebalance Topics", + FollowLink("Go to the brokers list.", scheduler.controllers.routes.SchedulerApplication.brokers(schedulerName).toString()), + FollowLink("Try again.",scheduler.controllers.routes.RebalanceTopics.rebalanceTopics(schedulerName).toString()) + )) + } + } + ) + } +} diff --git a/app/scheduler/controllers/SchedulerApplication.scala b/app/scheduler/controllers/SchedulerApplication.scala new file mode 100644 index 000000000..b724941c5 --- /dev/null +++ b/app/scheduler/controllers/SchedulerApplication.scala @@ -0,0 +1,30 @@ +package scheduler.controllers + +import controllers.KafkaManagerContext +import play.api.mvc.{Action, Controller} + +object SchedulerApplication extends Controller { + + private[this] val kafkaManager = KafkaManagerContext.getKafkaManager + + import play.api.libs.concurrent.Execution.Implicits.defaultContext + + def getScheduler(s: String) = Action.async { + kafkaManager.getSchedulerView(s).map { errorOrSchedulerView => + Ok(scheduler.views.html.scheduler.schedulerView(s,errorOrSchedulerView)) + } + } + + def brokers(s: String) = Action.async { + kafkaManager.getSchedulerBrokerList(s).map { errorOrBrokerList => + Ok(scheduler.views.html.broker.brokerList(s,errorOrBrokerList)) + } + } + + def broker(s: String, b: Int) = Action.async { + kafkaManager.getBrokerIdentity(s,b).map { errorOrBrokerView => + Ok(scheduler.views.html.broker.brokerView(s,b,errorOrBrokerView)) + } + } +} + diff --git a/app/scheduler/kafka/manager/KafkaSchedulerCommandActor.scala b/app/scheduler/kafka/manager/KafkaSchedulerCommandActor.scala new file mode 100644 index 000000000..13a62348d --- /dev/null +++ b/app/scheduler/kafka/manager/KafkaSchedulerCommandActor.scala @@ -0,0 +1,96 @@ +package scheduler.kafka.manager + +import kafka.manager.ActorModel._ +import kafka.manager._ +import org.apache.curator.framework.CuratorFramework + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +case class KafkaSchedulerCommandActorConfig(schedulerConfig: SchedulerConfig, + curator: CuratorFramework, + longRunningPoolConfig: LongRunningPoolConfig, + askTimeoutMillis: Long = 400, + version: KafkaVersion) + +class KafkaSchedulerCommandActor(kafkaCommandActorConfig: KafkaSchedulerCommandActorConfig) extends BaseCommandActor with LongRunningPoolActor { + + val schedulerRestClient = new SchedulerRestClient(kafkaCommandActorConfig.schedulerConfig.apiUrl)(play.api.libs.concurrent.Execution.Implicits.defaultContext) + + @scala.throws[Exception](classOf[Exception]) + override def preStart() = { + log.info("Started actor %s".format(self.path)) + } + + @scala.throws[Exception](classOf[Exception]) + override def preRestart(reason: Throwable, message: Option[Any]) { + log.error(reason, "Restarting due to [{}] when processing [{}]", + reason.getMessage, message.getOrElse("")) + super.preRestart(reason, message) + } + + @scala.throws[Exception](classOf[Exception]) + override def postStop(): Unit = { + super.postStop() + } + + override protected def longRunningPoolConfig: LongRunningPoolConfig = kafkaCommandActorConfig.longRunningPoolConfig + + override protected def longRunningQueueFull(): Unit = { + sender ! KCCommandResult(Try(throw new UnsupportedOperationException("Long running executor blocking queue is full!"))) + } + + override def processActorResponse(response: ActorResponse): Unit = { + response match { + case any: Any => log.warning("ksca : processActorResponse : Received unknown message: {}", any) + } + } + + def futureToKCCommandResult[T](future: Future[T])(implicit ec: ExecutionContext): Future[KCCommandResult] = { + future.map { + _ => KCCommandResult(Success(())) + }.recover { + case e: Throwable => KCCommandResult(Failure(e)) + } + } + + override def processCommandRequest(request: CommandRequest): Unit = { + implicit val ec = longRunningExecutionContext + request match { + + case KSCAddBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover) => + longRunning { + futureToKCCommandResult(schedulerRestClient.addBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover)) + } + + case KSCUpdateBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover) => + longRunning { + futureToKCCommandResult(schedulerRestClient.updateBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover)) + } + + case KSCStartBroker(id) => + longRunning { + futureToKCCommandResult(schedulerRestClient.startBroker(id)) + } + + case KSCStopBroker(id) => + longRunning { + futureToKCCommandResult(schedulerRestClient.stopBroker(id)) + } + + case KSCRemoveBroker(id) => + longRunning { + futureToKCCommandResult(schedulerRestClient.removeBroker(id)) + } + + case KSCRebalanceTopics(ids, topics) => + longRunning { + futureToKCCommandResult(schedulerRestClient.rebalanceTopics(ids, topics)) + } + + case any: Any => log.warning("ksca : processCommandRequest : Received unknown message: {}", any) + } + } +} + + diff --git a/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala b/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala new file mode 100644 index 000000000..936d8baa8 --- /dev/null +++ b/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala @@ -0,0 +1,86 @@ +/** + * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 + * See accompanying LICENSE file. + */ + +package scheduler.kafka.manager + +import akka.pattern.pipe +import kafka.manager.ActorModel._ +import kafka.manager.{BaseQueryCommandActor, SchedulerConfig} +import org.apache.curator.framework.CuratorFramework + +class KafkaSchedulerStateActor(curator: CuratorFramework, + deleteSupported: Boolean, + schedulerConfig: SchedulerConfig) extends BaseQueryCommandActor { + + val schedulerRestClient = new SchedulerRestClient(schedulerConfig.apiUrl)(play.api.libs.concurrent.Execution.Implicits.defaultContext) + + @scala.throws[Exception](classOf[Exception]) + override def preStart() = { + log.info("Started actor %s".format(self.path)) + } + + @scala.throws[Exception](classOf[Exception]) + override def preRestart(reason: Throwable, message: Option[Any]) { + log.error(reason, "Restarting due to [{}] when processing [{}]", + reason.getMessage, message.getOrElse("")) + super.preRestart(reason, message) + } + + + @scala.throws[Exception](classOf[Exception]) + override def postStop(): Unit = { + log.info("Stopped actor %s".format(self.path)) + + super.postStop() + } + + override def processActorResponse(response: ActorResponse): Unit = { + response match { + case any: Any => log.warning("kssa : processActorResponse : Received unknown message: {}", any.toString) + } + } + + override def processQueryRequest(request: QueryRequest): Unit = { + request match { + case KSGetTopics => + sender ! TopicList(IndexedSeq(), Set()) + + case KSGetAllTopicDescriptions(lastUpdateMillisOption) => + sender ! TopicDescriptions(IndexedSeq.empty, 0L) + case SchedulerKSGetBrokers => + implicit val ec = context.dispatcher + + def deserMap(map: String): Seq[(String, String)] = + map.split(",").map { + kv => + val pair = kv.split("=") + (pair(0), pair(1)) + } + + schedulerRestClient.getStatus map { + status => + val brokerIdentities = + status.brokers.getOrElse(Seq.empty).map{ + b => + SchedulerBrokerIdentity(b.id.toInt, b.active, b.cpus, b.mem, b.heap, b.port, + b.bindAddress, b.constraints.map(deserMap).getOrElse(Seq.empty), + b.options.map(deserMap).getOrElse(Seq.empty), b.log4jOptions.map(deserMap).getOrElse(Seq.empty), b.jvmOptions, + SchedulerBrokerStickinessIdentity(b.stickiness.period, b.stickiness.stopTime, b.stickiness.hostname), + SchedulerBrokerFailoverIdentity(b.failover.delay, b.failover.maxDelay, b.failover.maxTries, b.failover.failures, b.failover.failureTime), + b.task.map(task => SchedulerBrokerTaskIdentity(task.id, task.slaveId, task.executorId, task.hostname, task.endpoint, task.state)), schedulerConfig) + } + SchedulerBrokerList(brokerIdentities, schedulerConfig) + } pipeTo sender() + + case any: Any => log.warning("kssa : processQueryRequest : Received unknown message: {}", any.toString) + } + } + + override def processCommandRequest(request: CommandRequest): Unit = { + request match { + case any: Any => log.warning("kssa : processCommandRequest : Received unknown message: {}", any.toString) + } + } +} \ No newline at end of file diff --git a/app/scheduler/kafka/manager/SchedulerBrokerViewCacheActor.scala b/app/scheduler/kafka/manager/SchedulerBrokerViewCacheActor.scala new file mode 100644 index 000000000..3b972667c --- /dev/null +++ b/app/scheduler/kafka/manager/SchedulerBrokerViewCacheActor.scala @@ -0,0 +1,225 @@ +/** + * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 + * See accompanying LICENSE file. + */ + +package scheduler.kafka.manager + +import akka.actor.{ActorRef, Cancellable, ActorPath} +import kafka.manager.ActorModel._ +import kafka.manager._ +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Try + +case class SchedulerBrokerViewCacheActorConfig(schedulerKafkaStateActorPath: ActorPath, + schedulerConfig: SchedulerConfig, + longRunningPoolConfig: LongRunningPoolConfig, + updatePeriod: FiniteDuration = 10 seconds) +class SchedulerBrokerViewCacheActor(config: SchedulerBrokerViewCacheActorConfig) extends LongRunningPoolActor { + + private[this] val ZERO = BigDecimal(0) + + private[this] var cancellable : Option[Cancellable] = None + + private[this] var topicIdentities : Map[String, TopicIdentity] = Map.empty + + private[this] var topicDescriptionsOption : Option[TopicDescriptions] = None + + private[this] var brokerListOption : Option[SchedulerBrokerList] = None + + private[this] var brokerMetrics : Map[Int, BrokerMetrics] = Map.empty + + private[this] val brokerTopicPartitions : mutable.Map[Int, SchedulerBrokerIdentity] = new mutable.HashMap[Int, SchedulerBrokerIdentity] + + private[this] val topicMetrics: mutable.Map[String, mutable.Map[Int, BrokerMetrics]] = + new mutable.HashMap[String, mutable.Map[Int, BrokerMetrics]]() + + private[this] var combinedBrokerMetric : Option[BrokerMetrics] = None + + override def preStart() = { + log.info("Started actor %s".format(self.path)) + log.info("Scheduling updater for %s".format(config.updatePeriod)) + cancellable = Some( + context.system.scheduler.schedule(0 seconds, + config.updatePeriod, + self, + BVForceUpdate)(context.system.dispatcher,self) + ) + } + + @scala.throws[Exception](classOf[Exception]) + override def postStop(): Unit = { + log.info("Stopped actor %s".format(self.path)) + log.info("Cancelling updater...") + Try(cancellable.map(_.cancel())) + super.postStop() + } + + override protected def longRunningPoolConfig: LongRunningPoolConfig = config.longRunningPoolConfig + + override protected def longRunningQueueFull(): Unit = { + log.error("Long running pool queue full, skipping!") + } + + override def processActorRequest(request: ActorRequest): Unit = { + request match { + case BVForceUpdate => + log.info("Updating scheduler broker view...") + //ask for topic descriptions + val lastUpdateMillisOption: Option[Long] = topicDescriptionsOption.map(_.lastUpdateMillis) + context.actorSelection(config.schedulerKafkaStateActorPath).tell(KSGetAllTopicDescriptions(lastUpdateMillisOption), self) + context.actorSelection(config.schedulerKafkaStateActorPath).tell(SchedulerKSGetBrokers, self) + + case SMGetBrokerIdentity(id) => + sender ! brokerTopicPartitions.get(id).map { bv => + val bcs = for { + metrics <- bv.metrics + cbm <- combinedBrokerMetric + } yield { + val perMessages = if(cbm.messagesInPerSec.oneMinuteRate > 0) { + BigDecimal(metrics.messagesInPerSec.oneMinuteRate / cbm.messagesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP) + } else ZERO + val perIncoming = if(cbm.bytesInPerSec.oneMinuteRate > 0) { + BigDecimal(metrics.bytesInPerSec.oneMinuteRate / cbm.bytesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP) + } else ZERO + val perOutgoing = if(cbm.bytesOutPerSec.oneMinuteRate > 0) { + BigDecimal(metrics.bytesOutPerSec.oneMinuteRate / cbm.bytesOutPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP) + } else ZERO + BrokerClusterStats(perMessages, perIncoming, perOutgoing) + } + if(bcs.isDefined) { + bv.copy(stats=bcs) + } else { + bv + } + } + + case BVGetBrokerMetrics => + sender ! brokerMetrics + + case BVGetTopicMetrics(topic) => + sender ! topicMetrics.get(topic).map(m => m.values.foldLeft(BrokerMetrics.DEFAULT)((acc,bm) => acc + bm)) + + case BVGetTopicIdentities => + sender ! topicIdentities + + case BVUpdateTopicMetricsForBroker(id, metrics) => + metrics.foreach { + case (topic, bm) => + val tm = topicMetrics.getOrElse(topic, new mutable.HashMap[Int, BrokerMetrics]) + tm.put(id, bm) + topicMetrics.put(topic, tm) + } + + case BVUpdateBrokerMetrics(id, metrics) => + brokerMetrics += (id -> metrics) + combinedBrokerMetric = Option(brokerMetrics.values.foldLeft(BrokerMetrics.DEFAULT)((acc, m) => acc + m)) + for { + bv <- brokerTopicPartitions.get(id) + } { + brokerTopicPartitions.put(id, bv.copy(metrics = Option(metrics))) + } + + case any: Any => log.warning("sbvca : processActorRequest : Received unknown message: {}", any) + } + } + + override def processActorResponse(response: ActorResponse): Unit = { + response match { + case td: TopicDescriptions => + topicDescriptionsOption = Some(td) + updateView() + + case bl: SchedulerBrokerList => + brokerListOption = Some(bl) + updateView() + + case any: Any => log.warning("sbvca : processActorResponse : Received unknown message: {}", any) + } + } + + private[this] def updateView(): Unit = { + for { + brokerList <- brokerListOption + topicDescriptions <- topicDescriptionsOption + } { + val topicIdentity : IndexedSeq[TopicIdentity] = IndexedSeq.empty + + topicIdentities = topicIdentity.map(ti => (ti.topic, ti)).toMap + val topicPartitionByBroker = topicIdentity.flatMap( + ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2) + + //check for 2*broker list size since we schedule 2 jmx calls for each broker + if (config.schedulerConfig.jmxEnabled && hasCapacityFor(2 * brokerListOption.size)) { + implicit val ec = longRunningExecutionContext + val brokerLookup = brokerList.list.map(bi => bi.id -> bi).toMap + topicPartitionByBroker.foreach { + case (brokerId, topicPartitions) => + val brokerInfoOpt = brokerLookup.get(brokerId) + for (broker <- brokerInfoOpt; + host <- broker.actualHost(); + port <- broker.actualPort()) { + longRunning { + Future { + // TODO JMX Port is temporary hardcoded to 9999 + val tryResult = KafkaJMX.doWithConnection(host, 9999) { + mbsc => + topicPartitions.map { + case (topic, id, partitions) => + (topic.topic, + KafkaMetrics.getBrokerMetrics(config.schedulerConfig.version, mbsc, Option(topic.topic))) + } + } + val result = tryResult match { + case scala.util.Failure(t) => + log.error(t, s"Failed to get topic metrics for broker $broker") + topicPartitions.map { + case (topic, id, partitions) => + (topic.topic, BrokerMetrics.DEFAULT) + } + case scala.util.Success(bm) => bm + } + self.tell(BVUpdateTopicMetricsForBroker(broker.id, result), ActorRef.noSender) + } + } + } + } + + brokerList.list.foreach { + broker => + for (host <- broker.actualHost(); + port <- broker.actualPort()) { + longRunning { + Future { + // TODO JMX Port is temporary hardcoded to 9999 + val tryResult = KafkaJMX.doWithConnection(host, 9999) { + mbsc => + KafkaMetrics.getBrokerMetrics(config.schedulerConfig.version, mbsc) + } + + val result = tryResult match { + case scala.util.Failure(t) => + log.error(t, s"Failed to get broker metrics for $broker") + BrokerMetrics.DEFAULT + case scala.util.Success(bm) => bm + } + self.tell(BVUpdateBrokerMetrics(broker.id, result), ActorRef.noSender) + } + } + } + } + } else if (config.schedulerConfig.jmxEnabled) { + log.warning("Not scheduling update of JMX for all brokers, not enough capacity!") + } + + brokerTopicPartitions.clear() + brokerList.list.foreach { + bi => + brokerTopicPartitions.put(bi.id, bi.copy(metrics = brokerMetrics.get(bi.id))) + } + } + } + +} diff --git a/app/scheduler/kafka/manager/SchedulerManagerActor.scala b/app/scheduler/kafka/manager/SchedulerManagerActor.scala new file mode 100644 index 000000000..bdac40316 --- /dev/null +++ b/app/scheduler/kafka/manager/SchedulerManagerActor.scala @@ -0,0 +1,234 @@ +/** + * Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 + * See accompanying LICENSE file. + */ + +package scheduler.kafka.manager + +import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} + +import akka.actor.{ActorPath, Props} +import akka.pattern._ +import akka.util.Timeout +import kafka.manager._ +import kafka.manager.utils.AdminUtils +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.cache.PathChildrenCache +import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex +import org.apache.zookeeper.CreateMode + +import scala.concurrent.duration.{FiniteDuration, _} +import scala.concurrent.{ExecutionContext, Future} +import scala.reflect.ClassTag +import scala.util.Try + +import kafka.manager.ActorModel._ + +case class SchedulerManagerActorConfig(pinnedDispatcherName: String, + baseZkPath : String, + curatorConfig: CuratorConfig, + schedulerConfig: SchedulerConfig, + updatePeriod: FiniteDuration, + threadPoolSize: Int = 2, + maxQueueSize: Int = 100, + // enlarging from 2s to 10s because 2s for Scheduler REST API is not enough + askTimeoutMillis: Long = 10000, + mutexTimeoutMillis: Int = 4000) + +class SchedulerManagerActor(smConfig: SchedulerManagerActorConfig) + extends BaseQueryCommandActor with CuratorAwareActor with BaseZkPath { + + //this is from base zk path trait + override def baseZkPath : String = smConfig.baseZkPath + + //this is for curator aware actor + override def curatorConfig: CuratorConfig = smConfig.curatorConfig + + val longRunningExecutor = new ThreadPoolExecutor( + smConfig.threadPoolSize, smConfig.threadPoolSize,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue[Runnable](smConfig.maxQueueSize)) + val longRunningExecutionContext = ExecutionContext.fromExecutor(longRunningExecutor) + + protected[this] val sharedClusterCurator : CuratorFramework = getCurator(smConfig.schedulerConfig.curatorConfig) + log.info("Starting shared curator...") + sharedClusterCurator.start() + + //create cluster path + Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(baseZkPath)) + require(curator.checkExists().forPath(baseZkPath) != null,s"Cluster path not found : $baseZkPath") + + private[this] val baseTopicsZkPath = zkPath("topics") + + //create cluster path for storing topics state + Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(baseTopicsZkPath)) + require(curator.checkExists().forPath(baseTopicsZkPath) != null,s"Cluster path for topics not found : $baseTopicsZkPath") + + private[this] val mutex = new InterProcessSemaphoreMutex(curator, zkPath("mutex")) + + private[this] val adminUtils = new AdminUtils(smConfig.schedulerConfig.version) + + private[this] val kssProps = Props(classOf[KafkaSchedulerStateActor],sharedClusterCurator, adminUtils.isDeleteSupported, smConfig.schedulerConfig) + private[this] val kafkaSchedulerStateActor : ActorPath = context.actorOf(kssProps.withDispatcher(smConfig.pinnedDispatcherName),"kafka-scheduler-state").path + + private[this] val bvConfig = SchedulerBrokerViewCacheActorConfig( + kafkaSchedulerStateActor, + smConfig.schedulerConfig, + LongRunningPoolConfig(Runtime.getRuntime.availableProcessors(), 1000), + smConfig.updatePeriod) + private[this] val bvcProps = Props(classOf[SchedulerBrokerViewCacheActor],bvConfig) + private[this] val brokerViewCacheActor : ActorPath = context.actorOf(bvcProps,"scheduler-broker-view").path + + private[this] val kscProps = { + val kscaConfig = KafkaSchedulerCommandActorConfig( + smConfig.schedulerConfig, + sharedClusterCurator, + LongRunningPoolConfig(smConfig.threadPoolSize, smConfig.maxQueueSize), + smConfig.askTimeoutMillis, + smConfig.schedulerConfig.version) + Props(classOf[KafkaSchedulerCommandActor],kscaConfig) + } + private[this] val kafkaSchedulerCommandActor : ActorPath = context.actorOf(kscProps,"kafka-scheduler-command").path + + private[this] implicit val timeout: Timeout = FiniteDuration(smConfig.askTimeoutMillis,MILLISECONDS) + + private[this] val clusterManagerTopicsPathCache = new PathChildrenCache(curator,baseTopicsZkPath,true) + + @scala.throws[Exception](classOf[Exception]) + override def preStart() = { + super.preStart() + log.info("Started actor %s".format(self.path)) + log.info("Starting cluster manager topics path cache...") + clusterManagerTopicsPathCache.start(StartMode.BUILD_INITIAL_CACHE) + } + + @scala.throws[Exception](classOf[Exception]) + override def preRestart(reason: Throwable, message: Option[Any]) { + log.error(reason, "Restarting due to [{}] when processing [{}]", + reason.getMessage, message.getOrElse("")) + super.preRestart(reason, message) + } + + @scala.throws[Exception](classOf[Exception]) + override def postStop(): Unit = { + log.info("Stopped actor %s".format(self.path)) + log.info("Shutting down shared curator...") + Try(sharedClusterCurator.close()) + + log.info("Shutting down scheduler manager topics path cache...") + Try(clusterManagerTopicsPathCache.close()) + super.postStop() + } + + override def processActorResponse(response: ActorResponse): Unit = { + response match { + case any: Any => log.warning("cma : processActorResponse : Received unknown message: {}", any) + } + } + + override def processQueryRequest(request: QueryRequest): Unit = { + request match { + case ksRequest: KSRequest => + context.actorSelection(kafkaSchedulerStateActor).forward(ksRequest) + + case bvRequest: BVRequest => + context.actorSelection(brokerViewCacheActor).forward(bvRequest) + + case SMGetView => + implicit val ec = context.dispatcher + val eventualBrokerList = withKafkaSchedulerStateActor(SchedulerKSGetBrokers)(identity[SchedulerBrokerList]) + val eventualTopicList = withKafkaSchedulerStateActor(KSGetTopics)(identity[TopicList]) + val result = for { + bl <- eventualBrokerList + tl <- eventualTopicList + } yield SMView(tl.list.size, bl.list.size, smConfig.schedulerConfig) + result pipeTo sender + + case any: Any => log.warning("sma : processQueryResponse : Received unknown message: {}", any) + } + } + + override def processCommandRequest(request: CommandRequest): Unit = { + request match { + case SMAddBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover) => + implicit val ec = longRunningExecutionContext + + withKafkaCommandActor(KSCAddBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover)){ + kcResponse: KCCommandResult => + Future.successful(SMCommandResult(kcResponse.result)) + } pipeTo sender() + + case SMUpdateBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover) => + implicit val ec = longRunningExecutionContext + + withKafkaCommandActor(KSCUpdateBroker(id, cpus, mem, heap, port, bindAddress, constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover)){ + kcResponse: KCCommandResult => + Future.successful(SMCommandResult(kcResponse.result)) + } pipeTo sender() + + case SMStartBroker(id) => + implicit val ec = longRunningExecutionContext + + withKafkaCommandActor(KSCStartBroker(id)){ + kcResponse: KCCommandResult => + Future.successful(SMCommandResult(kcResponse.result)) + } pipeTo sender() + + case SMStopBroker(id) => + implicit val ec = longRunningExecutionContext + + withKafkaCommandActor(KSCStopBroker(id)){ + kcResponse: KCCommandResult => + Future.successful(SMCommandResult(kcResponse.result)) + } pipeTo sender() + + case SMRemoveBroker(id) => + implicit val ec = longRunningExecutionContext + + withKafkaCommandActor(KSCRemoveBroker(id)){ + kcResponse: KCCommandResult => + Future.successful(SMCommandResult(kcResponse.result)) + } pipeTo sender() + + case SMRebalanceTopics(ids, topics) => + implicit val ec = longRunningExecutionContext + + withKafkaCommandActor(KSCRebalanceTopics(ids, topics)){ + kcResponse: KCCommandResult => + Future.successful(SMCommandResult(kcResponse.result)) + } pipeTo sender() + + case any: Any => log.warning("cma : processCommandRequest : Received unknown message: {}", any) + } + } + + private[this] def withKafkaSchedulerStateActor[Input,Output,FOutput] + (msg: Input)(fn: Output => FOutput)(implicit tag: ClassTag[Output], ec: ExecutionContext) : Future[FOutput] = { + context.actorSelection(kafkaSchedulerStateActor).ask(msg).mapTo[Output].map(fn) + } + + private[this] def withKafkaCommandActor[Input,Output,FOutput] + (msg: Input)(fn: Output => FOutput)(implicit tag: ClassTag[Output], ec: ExecutionContext) : Future[FOutput] = { + context.actorSelection(kafkaSchedulerCommandActor).ask(msg).mapTo[Output].map(fn) + } + + private[this] def modify[T](fn: => T): T = { + try { + mutex.acquire(smConfig.mutexTimeoutMillis,TimeUnit.MILLISECONDS) + fn + } finally { + if(mutex.isAcquiredInThisProcess) { + mutex.release() + } + } + } + + def getNonExistentBrokers(availableBrokers: BrokerList, selectedBrokers: Seq[Int]): Seq[Int] = { + val availableBrokerIds: Set[Int] = availableBrokers.list.map(_.id.toInt).toSet + selectedBrokers filter { b: Int => !availableBrokerIds.contains(b) } + } + + def getNonExistentBrokers(availableBrokers: BrokerList, assignments: Map[Int, Seq[Int]]): Seq[Int] = { + val brokersAssigned = assignments.flatMap({ case (pt, bl) => bl }).toSet.toSeq + getNonExistentBrokers(availableBrokers, brokersAssigned) + } +} diff --git a/app/scheduler/kafka/manager/SchedulerRestClient.scala b/app/scheduler/kafka/manager/SchedulerRestClient.scala new file mode 100644 index 000000000..9c3185fe4 --- /dev/null +++ b/app/scheduler/kafka/manager/SchedulerRestClient.scala @@ -0,0 +1,232 @@ +package scheduler.kafka.manager + +import java.util.Date + +import org.slf4j.LoggerFactory +import play.api.Play.current +import play.api.libs.json.{JsError, JsSuccess} +import play.api.libs.ws._ +import scheduler.kafka.manager.SchedulerRestClient.{AddBrokerResponse, Broker, StatusResponse} +import scheduler.models.form.Failover + +import scala.concurrent.{ExecutionContext, Future} + +object SchedulerRestClient { + + import play.api.libs.functional.syntax._ + import play.api.libs.json._ + + + case class Task(id: String, + slaveId: String, + executorId: String, + hostname: String, + endpoint: Option[String], + state: String) + + implicit val taskReads: Reads[Task] = ( + (__ \ 'id).read[String] and + (__ \ 'slaveId).read[String] and + (__ \ 'executorId).read[String] and + (__ \ 'hostname).read[String] and + (__ \ 'endpoint).readNullable[String] and + (__ \ 'state).read[String] + )(Task) + + case class Failover(delay: String, + maxDelay: String, + maxTries: Option[Int], + failures: Option[Int], + failureTime: Option[Date]) + + implicit val failoverReads: Reads[Failover] = ( + (__ \ 'delay).read[String] and + (__ \ 'maxDelay).read[String] and + (__ \ 'maxTries).readNullable[Int] and + (__ \ 'failures).readNullable[Int] and + (__ \ 'failureTime).readNullable[Date] + )(Failover) + + case class Stickiness(period: String, stopTime: Option[Date], hostname: Option[String]) + + implicit val stickinessReads: Reads[Stickiness] = ( + (__ \ 'period).read[String] and + (__ \ 'stopTime).readNullable[Date] and + (__ \ 'hostname).readNullable[String] + )(Stickiness) + + + case class Broker(id: String, + active: Boolean, + cpus: Double, + mem: Long, + heap: Long, + port: Option[String], + bindAddress: Option[String], + constraints: Option[String], + options: Option[String], + log4jOptions: Option[String], + jvmOptions: Option[String], + stickiness: Stickiness, + failover: Failover, + task: Option[Task]) + + implicit val brokerReads: Reads[Broker] = ( + (__ \ 'id).read[String] and + (__ \ 'active).read[Boolean] and + (__ \ 'cpus).read[Double] and + (__ \ 'mem).read[Long] and + (__ \ 'heap).read[Long] and + (__ \ 'port).readNullable[String] and + (__ \ 'bindAddress).readNullable[String] and + (__ \ 'constraints).readNullable[String] and + (__ \ 'options).readNullable[String] and + (__ \ 'log4jOptions).readNullable[String] and + (__ \ 'jvmOptions).readNullable[String] and + (__ \ 'stickiness).read[Stickiness] and + (__ \ 'failover).read[Failover] and + (__ \ 'task).readNullable[Task] + )(Broker) + + case class StatusResponse(brokers: Option[Seq[Broker]], frameworkId: Option[String]) + + implicit val statusResponseReads: Reads[StatusResponse] = ( + (__ \ 'brokers).readNullable[Seq[Broker]] and + (__ \ 'frameworkId).readNullable[String] + )(StatusResponse) + + + case class AddBrokerResponse(brokers: Seq[Broker]) + + implicit val addBrokerResponseReads: Reads[AddBrokerResponse] = + (__ \ 'brokers).read[Seq[Broker]].map(AddBrokerResponse) + +} + +class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: ExecutionContext) { + private[this] lazy val logger = LoggerFactory.getLogger(this.getClass) + + private val BrokersApiPrefix = s"$apiUrl/api/brokers" + private val StatusUrl = s"$BrokersApiPrefix/status" + private val AddBrokerUrl = s"$BrokersApiPrefix/add" + private val UpdateBrokerUrl = s"$BrokersApiPrefix/update" + private val StartBrokerUrl = s"$BrokersApiPrefix/start" + private val StopBrokerUrl = s"$BrokersApiPrefix/stop" + private val RemoveBrokerUrl = s"$BrokersApiPrefix/remove" + private val RebalanceTopicsUrl = s"$BrokersApiPrefix/rebalance" + + private val Timeout = 10000 + + def getStatus: Future[StatusResponse] = { + val holder: Future[WSResponse] = WS + .url(StatusUrl) + .withRequestTimeout(Timeout) + .get() + + holder.map { + response => response.json.validate[StatusResponse] + }.flatMap { + case JsError(e) => + logger.error(s"Failed to parse status response $e") + Future.failed(new Exception("Failed to parse status response json")) + case JsSuccess(status, _) => + Future.successful(status) + } + } + + def addBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover): Future[Seq[Broker]] = { + + val queryParamsSeq = Seq( + "id" -> Some(id.toString), "cpus" -> cpus.map(_.toString), "mem" -> mem.map(_.toString), "heap" -> heap.map(_.toString), + "port" -> port, "bindAddress" -> bindAddress, "constraints" -> constraints, + "options" -> options, "log4jOptions" -> log4jOptions, "jvmOptions" -> jvmOptions, + "stickinessPeriod" -> stickinessPeriod.map(_.toString), "failoverDelay" -> failover.failoverDelay.map(_.toString), + "failoverMaxDelay" -> failover.failoverMaxDelay.map(_.toString), "failoverMaxTries" -> failover.failoverMaxTries.map(_.toString)).collect { + case (key, Some(v)) => (key, v) + } + + val holder: Future[WSResponse] = WS + .url(AddBrokerUrl) + .withQueryString(queryParamsSeq: _*) + .withRequestTimeout(Timeout) + .get() + + holder.map { + response => response.json.validate[AddBrokerResponse] + }.flatMap { + case JsError(e) => + logger.error(s"Failed to parse add broker response $e") + Future.failed(new Exception("Failed to parse add broker response json")) + case JsSuccess(brokers, _) => + Future.successful(brokers.brokers) + } + } + + def updateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover): Future[Unit] = { + + val queryParamsSeq = Seq( + "id" -> Some(id.toString), "cpus" -> cpus.map(_.toString), "mem" -> mem.map(_.toString), "heap" -> heap.map(_.toString), + "port" -> port, "bindAddress" -> bindAddress, "constraints" -> constraints, + "options" -> options, "log4jOptions" -> log4jOptions, "jvmOptions" -> jvmOptions, + "stickinessPeriod" -> stickinessPeriod.map(_.toString), "failoverDelay" -> failover.failoverDelay.map(_.toString), + "failoverMaxDelay" -> failover.failoverMaxDelay.map(_.toString), "failoverMaxTries" -> failover.failoverMaxTries.map(_.toString)).collect { + case (key, Some(v)) => (key, v) + } + + val holder: Future[WSResponse] = WS + .url(UpdateBrokerUrl) + .withQueryString(queryParamsSeq: _*) + .withRequestTimeout(Timeout) + .get() + + holder.map { _ => () } + } + + def startBroker(id: Int): Future[Unit] = { + val holder: Future[WSResponse] = WS + .url(StartBrokerUrl) + .withQueryString(Seq("id" -> id.toString, "timeout" -> "0"): _*) + .withRequestTimeout(Timeout) + .get() + + holder.map { _ => () } + } + + def stopBroker(id: Int): Future[Unit] = { + val holder: Future[WSResponse] = WS + .url(StopBrokerUrl) + .withQueryString(Seq("id" -> id.toString, "timeout" -> "0"): _*) + .withRequestTimeout(Timeout) + .get() + + holder.map { _ => () } + } + + def removeBroker(id: Int): Future[Unit] = { + val holder: Future[WSResponse] = WS + .url(RemoveBrokerUrl) + .withQueryString(Seq("id" -> id.toString): _*) + .withRequestTimeout(Timeout) + .get() + + holder.map { _ => () } + } + + def rebalanceTopics(ids: String, topics: Option[String]): Future[Unit] = { + val holder: Future[WSResponse] = WS + .url(RebalanceTopicsUrl) + .withQueryString(Seq("ids" -> Some(ids), "topics" -> topics).collect { + case (key, Some(v)) => (key, v) + }: _*) + .withRequestTimeout(Timeout) + .get() + + holder.map { _ => () } + } +} diff --git a/app/scheduler/models/form/BrokerOperations.scala b/app/scheduler/models/form/BrokerOperations.scala new file mode 100644 index 000000000..975fe4785 --- /dev/null +++ b/app/scheduler/models/form/BrokerOperations.scala @@ -0,0 +1,15 @@ +package scheduler.models.form + +sealed trait BrokerOperations + +case class Failover(failoverDelay: Option[String], failoverMaxDelay: Option[String], failoverMaxTries:Option[Int]) + +case class AddBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) + +case class UpdateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) diff --git a/app/scheduler/models/form/RebalanceTopicsOperations.scala b/app/scheduler/models/form/RebalanceTopicsOperations.scala new file mode 100644 index 000000000..72bb6c14a --- /dev/null +++ b/app/scheduler/models/form/RebalanceTopicsOperations.scala @@ -0,0 +1,7 @@ +package scheduler.models.form + +sealed trait RebalanceTopicsOperations + +case class RebalanceTopics(ids: String, topics: Option[String]) extends RebalanceTopicsOperations + + diff --git a/app/scheduler/views/broker/addBroker.scala.html b/app/scheduler/views/broker/addBroker.scala.html new file mode 100644 index 000000000..6b413ce3f --- /dev/null +++ b/app/scheduler/views/broker/addBroker.scala.html @@ -0,0 +1,57 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@import helper._ +@import b3.vertical.fieldConstructor +@import scheduler.controllers.routes +@import scalaz.{\/} +@(schedulerName: String, errorOrForm: kafka.manager.ApiError \/ Form[scheduler.models.form.AddBroker])(implicit request: RequestHeader) + +@theMenu = { +@views.html.navigation.schedulerMenu(schedulerName,"Broker","Add Broker",models.navigation.Menus.schedulerMenus(schedulerName)) +} + +@renderForm(addBrokerForm: Form[scheduler.models.form.AddBroker]) = { +
+ @b3.form(routes.Broker.handleAddBroker(schedulerName)) { + + + + + + +
+ @b3.text(addBrokerForm("id"), '_label -> "ID", 'placeholder -> "", 'autofocus -> true ) + @b3.text(addBrokerForm("cpus"), '_label -> "CPU", 'placeholder -> "") + @b3.text(addBrokerForm("mem"), '_label -> "Memory", 'placeholder -> "") + @b3.text(addBrokerForm("heap"), '_label -> "Heap", 'placeholder -> "") + @b3.text(addBrokerForm("port"), '_label -> "Port", 'placeholder -> "") + @b3.text(addBrokerForm("bindAddress"), '_label -> "Bind Address", 'placeholder -> "") + @b3.text(addBrokerForm("stickinessPeriod"), '_label -> "Stickiness Period", 'placeholder -> "") + @b3.text(addBrokerForm("constraints"), '_label -> "Constraints", 'placeholder -> "") + @b3.text(addBrokerForm("jvmOptions"), '_label -> "Jvm Options", 'placeholder -> "") + @b3.text(addBrokerForm("options"), '_label -> "Options", 'placeholder -> "") + @b3.text(addBrokerForm("log4jOptions"), '_label -> "Log4j Options", 'placeholder -> "") + @b3.text(addBrokerForm("failoverDelay"), '_label -> "Failover Delay", 'placeholder -> "") + @b3.text(addBrokerForm("failoverMaxDelay"), '_label -> "Failover Max Delay", 'placeholder -> "") + @b3.text(addBrokerForm("failoverMaxTries"), '_label -> "Failover Max Tries", 'placeholder -> "") + @b3.submit('class -> "submit-button btn btn-primary"){ Add } + Cancel +
+ } +
+} + +@main( +"Add Broker", +menu = theMenu, +breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndScheduler("Add Broker",schedulerName))) { +
+
+

Add Broker

+ @errorOrForm.fold( views.html.errors.onApiError(_), renderForm(_)) +
+
+} + diff --git a/app/scheduler/views/broker/brokerList.scala.html b/app/scheduler/views/broker/brokerList.scala.html new file mode 100644 index 000000000..45012d03b --- /dev/null +++ b/app/scheduler/views/broker/brokerList.scala.html @@ -0,0 +1,46 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@import kafka.manager.ActorModel.BrokerIdentity +@import scalaz.{\/} +@(schedulerName:String, errorOrBrokers: kafka.manager.ApiError \/ kafka.manager.SchedulerBrokerListExtended) + +@theMenu = { +@views.html.navigation.schedulerMenu(schedulerName,"Brokers","",models.navigation.Menus.schedulerMenus(schedulerName)) +} + +@renderBrokerMetrics(bl: kafka.manager.SchedulerBrokerListExtended) = { +@if(bl.schedulerConfig.jmxEnabled) { +@views.html.common.brokerMetrics(bl.combinedMetric) +} else { + +} +} + +@main( +"Broker List", +menu = theMenu, +breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndScheduler("Brokers",schedulerName))) { +
+
+
+

+ Brokers +

+
+ @errorOrBrokers.fold( views.html.errors.onApiError(_), scheduler.views.html.broker.brokerListContent(schedulerName,_) ) +
+
+
+
+

Combined Metrics

+ @errorOrBrokers.fold( views.html.errors.onApiError(_), bl => renderBrokerMetrics(bl)) +
+
+ +} diff --git a/app/scheduler/views/broker/brokerListContent.scala.html b/app/scheduler/views/broker/brokerListContent.scala.html new file mode 100644 index 000000000..a69ba0012 --- /dev/null +++ b/app/scheduler/views/broker/brokerListContent.scala.html @@ -0,0 +1,33 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@import kafka.manager.ActorModel.SchedulerBrokerIdentity +@(schedulerName:String, brokerListExtended: kafka.manager.SchedulerBrokerListExtended) + + + + + + + @for(broker <- brokerListExtended.list.sortBy(_.id)) { + + + + + + + + + } + +
IdHostPortStateBytes InBytes Out
@broker.id@broker.actualHost().getOrElse("-")@broker.actualPort().getOrElse("-")@broker.state() + + @brokerListExtended.metrics.get(broker.id).map(_.bytesInPerSec.formatOneMinuteRate) + + + + @brokerListExtended.metrics.get(broker.id).map(_.bytesOutPerSec.formatOneMinuteRate) + +
+ diff --git a/app/scheduler/views/broker/brokerView.scala.html b/app/scheduler/views/broker/brokerView.scala.html new file mode 100644 index 000000000..843e6573c --- /dev/null +++ b/app/scheduler/views/broker/brokerView.scala.html @@ -0,0 +1,24 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@import scalaz.{\/} +@(schedulerName:String, brokerId: Int, errorOrBrokerView: kafka.manager.ApiError \/ kafka.manager.ActorModel.SchedulerBrokerIdentity) + +@theMenu = { + @views.html.navigation.schedulerMenu(schedulerName,"Brokers","",models.navigation.Menus.schedulerMenus(schedulerName)) +} + +@main( + "Broker View", + menu = theMenu, + breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withNamedViewAndScheduler("Broker View",schedulerName,brokerId.toString))) { +
+
+
+

Broker Id @brokerId

+
+ @errorOrBrokerView.fold[Html](views.html.errors.onApiError(_), scheduler.views.html.broker.brokerViewContent(schedulerName, brokerId, _)) +
+
+} diff --git a/app/scheduler/views/broker/brokerViewContent.scala.html b/app/scheduler/views/broker/brokerViewContent.scala.html new file mode 100644 index 000000000..8766a375d --- /dev/null +++ b/app/scheduler/views/broker/brokerViewContent.scala.html @@ -0,0 +1,101 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@import b3.vertical.fieldConstructor +@(schedulerName: String, brokerId: Int, brokerView :kafka.manager.ActorModel.SchedulerBrokerIdentity) + +@renderBrokerMetrics = { + @if(brokerView.schedulerConfig.jmxEnabled) { + @views.html.common.brokerMetrics(brokerView.metrics) + } else { + + } +} + +
+
+
+

Summary

+ + + + + + + + + + + + + @if(brokerView.schedulerConfig.jmxEnabled) { + + + + } + +
Status@brokerView.state
CPUs@brokerView.cpus
Memory@brokerView.mem
Heap@brokerView.heap
Constraints@brokerView.constraintsDesc
Options@brokerView.optionsDesc
Log4j Options@brokerView.log4jOptionsDesc
Jvm Options@brokerView.jvmOptions
# of Topics@brokerView.numTopics
# of Partitions@brokerView.numPartitions
% of Messages@brokerView.stats.map(_.perMessages)
% of Incoming@brokerView.stats.map(_.perIncoming)
% of Outgoing@brokerView.stats.map(_.perOutgoing)
+
+
+

Metrics

+ @renderBrokerMetrics +
+
+
+
+

Operations

+ + + + @if(brokerView.active) { + @b3.form(scheduler.controllers.routes.Broker.handleStopBroker(schedulerName, brokerId)) { +
+ @b3.submit('class -> "btn btn-primary btn-block"){ Stop Broker } +
+ } + } else { + @b3.form(scheduler.controllers.routes.Broker.handleStartBroker(schedulerName, brokerId)) { +
+ @b3.submit('class -> "btn btn-primary btn-block"){ Start Broker } +
+ } + @b3.form(scheduler.controllers.routes.Broker.handleRemoveBroker(schedulerName, brokerId)) { +
+ @b3.submit('class -> "btn btn-primary btn-block"){ Remove Broker } +
+ } + Update Broker + } + +
+
+
+
+
+
+
+

Per Topic Detail

+ + + + + + @for((ti,bp) <- brokerView.topicPartitions) { + + + + + + + + + } + +
TopicReplicationTotal PartitionsPartitions on BrokerPartitionsSkewed?
@ti.topic@ti.replicationFactor@ti.partitions@bp.size@bp.mkString("(",",",")")@ti.partitionsByBroker.find(_.id == brokerId).map(_.isSkewed).getOrElse("Unknown")
+
+
+
+ diff --git a/app/scheduler/views/broker/rebalanceTopics.scala.html b/app/scheduler/views/broker/rebalanceTopics.scala.html new file mode 100644 index 000000000..8c95540f9 --- /dev/null +++ b/app/scheduler/views/broker/rebalanceTopics.scala.html @@ -0,0 +1,46 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ + +@import helper._ +@import b3.vertical.fieldConstructor +@import scheduler.controllers.routes +@import scalaz.{\/} +@(schedulerName: String, errorOrForm: kafka.manager.ApiError \/ Form[scheduler.models.form.RebalanceTopics])(implicit request: RequestHeader) + +@theMenu = { +@views.html.navigation.schedulerMenu(schedulerName,"Rebalance Topics","",models.navigation.Menus.schedulerMenus(schedulerName)) +} + +@renderForm(rebalanceForm: Form[scheduler.models.form.RebalanceTopics]) = { +
+ @b3.form(routes.RebalanceTopics.handleRebalance(schedulerName)) { + + + + + + +
+ @b3.text(rebalanceForm("ids"), '_label -> "IDs", 'placeholder -> "", 'autofocus -> true ) + @b3.text(rebalanceForm("topics"), '_label -> "Topics", 'placeholder -> "") + @b3.submit('class -> "submit-button btn btn-primary"){ Submit } + Cancel +
+ } +
+} + +@main( +"Rebalance Topics", +menu = theMenu, +breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndCluster("Rebalance Topics",schedulerName))) { +
+
+

Rebalance

+ @errorOrForm.fold( views.html.errors.onApiError(_), renderForm(_)) +
+
+} + diff --git a/app/scheduler/views/broker/updateBroker.scala.html b/app/scheduler/views/broker/updateBroker.scala.html new file mode 100644 index 000000000..33874e1b7 --- /dev/null +++ b/app/scheduler/views/broker/updateBroker.scala.html @@ -0,0 +1,56 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@import helper._ +@import b3.vertical.fieldConstructor +@import scheduler.controllers.routes +@import scalaz.{\/} +@(schedulerName: String, brokerId:Int, errorOrForm: kafka.manager.ApiError \/ Form[scheduler.models.form.UpdateBroker])(implicit request: RequestHeader) + +@theMenu = { +@views.html.navigation.schedulerMenu(schedulerName,"Broker","Update Broker",models.navigation.Menus.schedulerMenus(schedulerName)) +} + +@renderForm(updateBrokerForm: Form[scheduler.models.form.UpdateBroker]) = { +
+ @b3.form(routes.Broker.handleUpdateBroker(schedulerName, brokerId)) { + + + + + + +
+ @b3.text(updateBrokerForm("id"), '_label -> "ID", 'placeholder -> "", 'readonly -> true) + @b3.text(updateBrokerForm("cpus"), '_label -> "CPU", 'placeholder -> "") + @b3.text(updateBrokerForm("mem"), '_label -> "Memory", 'placeholder -> "") + @b3.text(updateBrokerForm("heap"), '_label -> "Heap", 'placeholder -> "") + @b3.text(updateBrokerForm("port"), '_label -> "Port", 'placeholder -> "") + @b3.text(updateBrokerForm("bindAddress"), '_label -> "Bind Address", 'placeholder -> "") + @b3.text(updateBrokerForm("jvmOptions"), '_label -> "Jvm Options", 'placeholder -> "") + @b3.text(updateBrokerForm("stickinessPeriod"), '_label -> "Stickiness Period", 'placeholder -> "") + @b3.text(updateBrokerForm("failoverDelay"), '_label -> "Failover Delay", 'placeholder -> "") + @b3.text(updateBrokerForm("failoverMaxDelay"), '_label -> "Failover Max Delay", 'placeholder -> "") + @b3.text(updateBrokerForm("failoverMaxTries"), '_label -> "Failover Max Tries", 'placeholder -> "") + @b3.submit('class -> "submit-button btn btn-primary"){ Update } + Cancel +
+ } +
+} + +@main( + "Update Broker", + menu = theMenu, + breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withNamedViewAndScheduler("Broker View",schedulerName,brokerId.toString))) { +
+
+

Update Broker

+
+ @errorOrForm.fold( views.html.errors.onApiError(_), renderForm(_)) +
+
+
+} + diff --git a/app/scheduler/views/scheduler/addScheduler.scala.html b/app/scheduler/views/scheduler/addScheduler.scala.html new file mode 100644 index 000000000..64fbfe309 --- /dev/null +++ b/app/scheduler/views/scheduler/addScheduler.scala.html @@ -0,0 +1,38 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@(addSchedulerForm: Form[kafka.manager.SchedulerConfig])(implicit request: RequestHeader) + +@import b3.vertical.fieldConstructor +@import controllers.routes + +@theMenu = { +@views.html.navigation.defaultMenu(views.html.navigation.menuNav("Scheduler","Add Scheduler",models.navigation.Menus.indexMenu)) +} + +@drawForm(form : Form[kafka.manager.SchedulerConfig]) = { +@b3.form(routes.Cluster.handleAddScheduler) { +
+ @b3.text(form("name"), '_label -> "Scheduler Name", 'placeholder -> "", 'autofocus -> true ) + @b3.text(form("apiUrl"), '_label -> "Scheduler REST API URL", 'placeholder -> "http://master:7000") + @b3.text(form("zkHosts"), '_label -> "Scheduler Zookeeper Hosts", 'placeholder -> "zk1:2181,zk2:2181,zk3:2181") + @b3.select( form("kafkaVersion"), options = kafka.manager.KafkaVersion.formSelectList, '_label -> "Kafka Version" ) + @b3.checkbox(form("jmxEnabled"), '_text -> "Enable JMX Polling") + @b3.submit('class -> "submit-button btn btn-primary"){ Save } + Cancel +
+} +} + +@main("Add Scheduler", menu = theMenu, breadcrumbs = views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withSView("Add Scheduler"))) { +
+
+

Add Scheduler

+
+ @drawForm(addSchedulerForm) +
+
+
+} + diff --git a/app/scheduler/views/scheduler/schedulerList.scala.html b/app/scheduler/views/scheduler/schedulerList.scala.html new file mode 100644 index 000000000..8449ebb80 --- /dev/null +++ b/app/scheduler/views/scheduler/schedulerList.scala.html @@ -0,0 +1,58 @@ +@(schedulers: IndexedSeq[kafka.manager.SchedulerConfig]) + +@import b3.vertical.fieldConstructor +@import scheduler.controllers.routes + + + + + + + @for(scheduler <- schedulers) { + + + + + + } + +
ActiveOperationsVersion
+ @if(scheduler.enabled) { + @scheduler.name + } else { + @scheduler.name + } + + @* + *
+ * @if(scheduler.enabled) { + * Modify + * @b3.form(controllers.routes.Cluster.handleUpdateCluster(scheduler.name)) { + * + * + * + * + * @b3.submit('class -> "btn btn-warning ops-button"){ Disable } + * } + * } else { + * @b3.form(controllers.routes.Cluster.handleUpdateCluster(scheduler.name)) { + * + * + * + * + * @b3.submit('class -> "btn btn-success ops-button"){ Enable } + * } + * @b3.form(controllers.routes.Cluster.handleUpdateCluster(scheduler.name)) { + * + * + * + * + * @b3.submit('class -> "btn btn-danger ops-button"){ Delete } + * } + * } + *
+ *@ +
+ @scheduler.version.toString +
+ diff --git a/app/scheduler/views/scheduler/schedulerView.scala.html b/app/scheduler/views/scheduler/schedulerView.scala.html new file mode 100644 index 000000000..7410e0617 --- /dev/null +++ b/app/scheduler/views/scheduler/schedulerView.scala.html @@ -0,0 +1,19 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@import scalaz.{\/} +@(schedulerName: String, errorOrSchedulerView: kafka.manager.ApiError \/ kafka.manager.ActorModel.SMView) + +@theMenu = { + @views.html.navigation.schedulerMenu(schedulerName,"Scheduler","Summary",models.navigation.Menus.schedulerMenus(schedulerName)) +} + +@main( + "Kafka Manager", + menu = theMenu, + breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndScheduler("Summary",schedulerName))) { +
+ @errorOrSchedulerView.fold(views.html.errors.onApiError(_),scheduler.views.html.scheduler.schedulerViewContent(schedulerName,_)) +
+} diff --git a/app/scheduler/views/scheduler/schedulerViewContent.scala.html b/app/scheduler/views/scheduler/schedulerViewContent.scala.html new file mode 100644 index 000000000..13199e2bb --- /dev/null +++ b/app/scheduler/views/scheduler/schedulerViewContent.scala.html @@ -0,0 +1,33 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@(schedulerName: String, schedulerView: kafka.manager.ActorModel.SMView) + +
+

Scheduler Information

+ + + + + + + + + + + + +
Zookeepers@schedulerView.schedulerConfig.curatorConfig.zkConnect.replace(","," ")
Api url@schedulerView.schedulerConfig.apiUrl.toString
Version@schedulerView.schedulerConfig.version.toString
+
+
+

Scheduler Summary

+ + + + + + + +
Topics@schedulerView.topicsCountBrokers@schedulerView.brokersCount
+
\ No newline at end of file diff --git a/app/views/index.scala.html b/app/views/index.scala.html index 617804a16..ba2ef3dab 100644 --- a/app/views/index.scala.html +++ b/app/views/index.scala.html @@ -3,7 +3,8 @@ * See accompanying LICENSE file. *@ @import scalaz.{\/} -@(errorOrClusters: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMClusterList) +@(errorOrClusters: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMClusterList, +errorOrSchedulers: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMSchedulerList) @main( "Kafka Manager", @@ -18,6 +19,11 @@ @errorOrClusters.fold( _ => Html(""), cl => { views.html.cluster.pendingClusterList(cl.pending) }) + +

Schedulers

+ @errorOrSchedulers.fold(views.html.errors.onApiError(_), sc => { + scheduler.views.html.scheduler.schedulerList(sc.active) + }) } diff --git a/app/views/navigation/schedulerMenu.scala.html b/app/views/navigation/schedulerMenu.scala.html new file mode 100644 index 000000000..6b732462b --- /dev/null +++ b/app/views/navigation/schedulerMenu.scala.html @@ -0,0 +1,20 @@ +@* +* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0 +* See accompanying LICENSE file. +*@ +@(scheduler: String, menuTitle: String, menuItem: String, menuList: IndexedSeq[models.navigation.Menu]) + + diff --git a/build.sbt b/build.sbt index 28af2fd46..e485ba8f4 100644 --- a/build.sbt +++ b/build.sbt @@ -24,6 +24,7 @@ test in assembly := {} libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.3.10", "org.webjars" %% "webjars-play" % "2.3.0-2", + ws, "org.webjars" % "bootstrap" % "3.3.4", "org.webjars" % "jquery" % "2.1.4", "org.webjars" % "backbonejs" % "1.1.2-4", diff --git a/conf/routes b/conf/routes index 9b666b2c0..9d3bfe9a1 100644 --- a/conf/routes +++ b/conf/routes @@ -37,6 +37,21 @@ GET /api/status/:c/availableBrokers controllers.api.KafkaH GET /api/status/:c/:t/underReplicatedPartitions controllers.api.KafkaHealthCheck.underReplicatedPartitions(c:String, t: String) GET /api/status/:c/:t/unavailablePartitions controllers.api.KafkaHealthCheck.unavailablePartitions(c:String, t:String) +GET /addScheduler controllers.Cluster.addScheduler +GET /schedulers/:s scheduler.controllers.SchedulerApplication.getScheduler(s:String) +POST /schedulers controllers.Cluster.handleAddScheduler +GET /schedulers/:s/addBroker scheduler.controllers.Broker.addBroker(s:String) +POST /schedulers/:s/brokers/add scheduler.controllers.Broker.handleAddBroker(s:String) +GET /schedulers/:s/brokers scheduler.controllers.SchedulerApplication.brokers(s:String) +GET /schedulers/:s/brokers/:b scheduler.controllers.SchedulerApplication.broker(s: String, b:Int) +POST /schedulers/:s/broker/:b/start scheduler.controllers.Broker.handleStartBroker(s:String, b: Int) +POST /schedulers/:s/broker/:b/stop scheduler.controllers.Broker.handleStopBroker(s:String, b: Int) +POST /schedulers/:s/broker/:b/remove scheduler.controllers.Broker.handleRemoveBroker(s:String, b: Int) +GET /schedulers/:s/brokers/:b/updateBroker scheduler.controllers.Broker.updateBroker(s:String, b: Int) +POST /schedulers/:s/brokers/:b/update scheduler.controllers.Broker.handleUpdateBroker(s:String, b: Int) +GET /schedulers/:s/rebalanceTopics scheduler.controllers.RebalanceTopics.rebalanceTopics(s:String) +POST /schedulers/:s/rebalanceTopics/run scheduler.controllers.RebalanceTopics.handleRebalance(s:String) + # Versioned Assets GET /vassets/*file controllers.Assets.versioned(path="/public", file: Asset) From daee33f9d323050f5aff03cb3cf2f474f9e84514 Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Wed, 16 Sep 2015 12:29:04 +0300 Subject: [PATCH 08/10] Post merge fixes to resolve compilation errors --- app/controllers/Cluster.scala | 35 ++++- app/kafka/manager/ActorModel.scala | 99 +++++++++++++- app/kafka/manager/KafkaManager.scala | 127 ++++++++++++++++++ .../manager/KafkaSchedulerStateActor.scala | 4 +- .../kafka/manager/SchedulerManagerActor.scala | 2 +- app/views/index.scala.html | 4 +- 6 files changed, 264 insertions(+), 7 deletions(-) diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala index a92593200..bbb44e86f 100644 --- a/app/controllers/Cluster.scala +++ b/app/controllers/Cluster.scala @@ -6,7 +6,7 @@ package controllers import features.{KMClusterManagerFeature, ApplicationFeatures} -import kafka.manager.{KafkaVersion, ApiError, ClusterConfig} +import kafka.manager.{SchedulerConfig, KafkaVersion, ApiError, ClusterConfig} import models.FollowLink import models.form._ import play.api.data.Form @@ -74,6 +74,17 @@ object Cluster extends Controller { )(ClusterConfig.apply)(ClusterConfig.customUnapply) ) + val schedulerConfigForm = Form( + mapping( + "name" -> nonEmptyText.verifying(maxLength(250), validateName), + "kafkaVersion" -> nonEmptyText.verifying(validateKafkaVersion), + "apiUrl" -> nonEmptyText.verifying(validateZkHosts), + "zkHosts" -> nonEmptyText.verifying(validateZkHosts), + "zkMaxRetry" -> ignored(100 : Int), + "jmxEnabled" -> boolean + )(SchedulerConfig.apply)(SchedulerConfig.customUnapply) + ) + val updateForm = Form( mapping( "operation" -> nonEmptyText.verifying(validateOperation), @@ -110,6 +121,28 @@ object Cluster extends Controller { } } + def addScheduler = Action.async { implicit request => + Future.successful(Ok(scheduler.views.html.scheduler.addScheduler(schedulerConfigForm))) + } + + def handleAddScheduler = Action.async { implicit request => + schedulerConfigForm.bindFromRequest.fold( + formWithErrors => Future.successful(BadRequest(scheduler.views.html.scheduler.addScheduler(formWithErrors))), + schedulerConfig => { + kafkaManager.addScheduler(schedulerConfig.name, schedulerConfig.version.toString, schedulerConfig.apiUrl, schedulerConfig.curatorConfig.zkConnect, jmxEnabled = true).map { errorOrSuccess => + Ok(views.html.common.resultOfCommand( + views.html.navigation.defaultMenu(), + models.navigation.BreadCrumbs.withView("Add Scheduler"), + errorOrSuccess, + "Add Scheduler", + FollowLink("Go to scheduler view.",scheduler.controllers.routes.SchedulerApplication.getScheduler(schedulerConfig.name).toString()), + FollowLink("Try again.",routes.Cluster.addScheduler().toString()) + )) + } + } + ) + } + def updateCluster(c: String) = Action.async { implicit request => featureGate(KMClusterManagerFeature) { kafkaManager.getClusterConfig(c).map { errorOrClusterConfig => diff --git a/app/kafka/manager/ActorModel.scala b/app/kafka/manager/ActorModel.scala index dac47a661..731b39ef8 100644 --- a/app/kafka/manager/ActorModel.scala +++ b/app/kafka/manager/ActorModel.scala @@ -5,11 +5,12 @@ package kafka.manager -import java.util.Properties +import java.util.{Date, Properties} import org.joda.time.DateTime import kafka.manager.utils.TopicAndPartition import org.slf4j.LoggerFactory +import scheduler.models.form.Failover import scala.collection.immutable.Queue import scala.util.Try @@ -441,4 +442,100 @@ object ActorModel { LogkafkaIdentity(hostname, lct.isDefined, identitySet.toMap) } } + + case class SMGetBrokerIdentity(id: Int) extends BVRequest + + case object SMGetView extends QueryRequest + case class SMView(topicsCount: Int, brokersCount: Int, schedulerConfig: SchedulerConfig) extends QueryResponse + + case class SMAddBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) extends CommandRequest + + case class SMUpdateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) extends CommandRequest + + case class SMCommandResult(result: Try[Unit]) extends CommandResponse + + case class KSCAddBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) extends CommandRequest + + case class KSCUpdateBroker(id: Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover) extends CommandRequest + + case class SMStartBroker(brokerId: Int) extends CommandRequest + case class SMStopBroker(brokerId: Int) extends CommandRequest + case class SMRemoveBroker(brokerId: Int) extends CommandRequest + case class SMRebalanceTopics(ids: String, topics:Option[String]) extends CommandRequest + + case class KSCStartBroker(id: Int) extends CommandRequest + case class KSCStopBroker(id: Int) extends CommandRequest + case class KSCRemoveBroker(id: Int) extends CommandRequest + case class KSCRebalanceTopics(ids: String, topics:Option[String]) extends CommandRequest + + case class KMSchedulerCommandRequest(scheduler: String, request: CommandRequest) extends CommandRequest + case class KMSchedulerList(active: IndexedSeq[SchedulerConfig], pending : IndexedSeq[SchedulerConfig]) extends QueryResponse + case object KMGetAllSchedulers extends QueryRequest + case class KMAddScheduler(config: SchedulerConfig) extends CommandRequest + case class KMSchedulerQueryRequest(schedulerName: String, request: QueryRequest) extends QueryRequest + case class KMGetSchedulerConfig(schedulerName: String) extends QueryRequest + case class KMSchedulerConfigResult(result: Try[SchedulerConfig]) extends QueryResponse + + case object SchedulerKSGetBrokers extends KSRequest + case class SchedulerBrokerList(list: Seq[SchedulerBrokerIdentity], schedulerConfig: SchedulerConfig) extends QueryResponse + + case class SchedulerBrokerTaskIdentity(id: String, + slaveId: String, + executorId: String, + hostname: String, + endpoint: Option[String], + state: String) + + case class SchedulerBrokerStickinessIdentity(period: String, + stopTime: Option[Date], + hostname: Option[String]) + + case class SchedulerBrokerFailoverIdentity(delay: String, + maxDelay: String, + maxTries: Option[Int], + failures: Option[Int], + failureTime: Option[Date]) + + case class SchedulerBrokerIdentity(id: Int, active: Boolean, cpus: Double, mem: Long, heap: Long, port: Option[String], + bindAddress: Option[String], constraints: Seq[(String, String)], options: Seq[(String, String)], + log4jOptions: Seq[(String, String)], jvmOptions: Option[String], + stickiness: SchedulerBrokerStickinessIdentity, + failover: SchedulerBrokerFailoverIdentity, + task: Option[SchedulerBrokerTaskIdentity], + schedulerConfig: SchedulerConfig = null, + metrics: Option[BrokerMetrics] = None, + stats: Option[BrokerClusterStats] = None) { + + def actualHost(): Option[String] = task.flatMap(t => t.endpoint.map(_.split(":")(0))) + + def actualPort(): Option[String] = task.flatMap(t => t.endpoint.map(_.split(":")(1))) + + def numTopics() = 0 + def numPartitions() = 0 + + def topicPartitions() = Seq.empty[(TopicIdentity, IndexedSeq[Int])] + + def constraintsDesc = constraints.map { case (k, v) => s"$k=$v" }.mkString + def optionsDesc = options.map { case (k, v) => s"$k=$v" }.mkString + def log4jOptionsDesc = log4jOptions.map { case (k, v) => s"$k=$v" }.mkString + + def state(): String = { + if (active) + if (actualHost().isEmpty) "starting" else "running" + else + "stopped|failed" + } + } } diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 8a95b5150..f695f8d2f 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -13,6 +13,7 @@ import akka.util.Timeout import com.typesafe.config.{ConfigFactory, Config} import kafka.manager.ActorModel._ import org.slf4j.{LoggerFactory, Logger} +import scheduler.models.form.Failover import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ @@ -25,6 +26,7 @@ import scala.util.{Success, Failure, Try} case class TopicListExtended(list: IndexedSeq[(String, Option[TopicIdentity])], deleteSet: Set[String], underReassignments: IndexedSeq[String], clusterContext: ClusterContext) case class BrokerListExtended(list: IndexedSeq[BrokerIdentity], metrics: Map[Int,BrokerMetrics], combinedMetric: Option[BrokerMetrics], clusterContext: ClusterContext) case class LogkafkaListExtended(list: IndexedSeq[(String, Option[LogkafkaIdentity])], deleteSet: Set[String]) +case class SchedulerBrokerListExtended(list: Seq[SchedulerBrokerIdentity], metrics: Map[Int,BrokerMetrics], combinedMetric: Option[BrokerMetrics], schedulerConfig: SchedulerConfig) case class ApiError(msg: String) object ApiError { private[this] val log : Logger = LoggerFactory.getLogger(classOf[ApiError]) @@ -661,4 +663,129 @@ class KafkaManager(akkaConfig: Config) ) } } + + def addScheduler(schedulerName: String, version: String, apiUrl: String, zkHosts: String, jmxEnabled: Boolean): Future[ApiError \/ + Unit] = + { + val sc = SchedulerConfig(schedulerName, apiUrl, CuratorConfig(zkHosts), enabled = true, KafkaVersion(version), jmxEnabled = jmxEnabled) + + tryWithKafkaManagerActor(KMAddScheduler(sc)) { result: KMCommandResult => + result.result.get + } + } + + def getSchedulerView(schedulerName: String): Future[ApiError \/ SMView] = { + tryWithKafkaManagerActor(KMSchedulerQueryRequest(schedulerName, SMGetView))(identity[SMView]) + } + + def getSchedulerBrokerList(schedulerName: String): Future[ApiError \/ SchedulerBrokerListExtended] = { + implicit val ec = apiExecutionContext + + val futureBrokerList = tryWithKafkaManagerActor(KMSchedulerQueryRequest(schedulerName, SchedulerKSGetBrokers))(identity[SchedulerBrokerList]) + futureBrokerList.map { + case \/-(SchedulerBrokerList(identities, config)) => + \/-(SchedulerBrokerListExtended(identities, Map.empty, None, config)) + case a : -\/[ApiError] => + a + } + } + + def getSchedulerConfig(schedulerName: String): Future[ApiError \/ SchedulerConfig] = { + tryWithKafkaManagerActor(KMGetSchedulerConfig(schedulerName)) { result: KMSchedulerConfigResult => + result.result.get + } + } + + def getSchedulerList: Future[ApiError \/ KMSchedulerList] = { + tryWithKafkaManagerActor(KMGetAllSchedulers)(identity[KMSchedulerList]) + } + + + def addBroker(schedulerName:String, id:Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMAddBroker(id, cpus, mem, heap, port, bindAddress, + constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + + } + + def updateBroker(schedulerName:String, id:Int, cpus: Option[Double], mem: Option[Long], heap: Option[Long], port: Option[String], + bindAddress: Option[String], constraints: Option[String], options: Option[String], + log4jOptions: Option[String], jvmOptions: Option[String], stickinessPeriod: Option[String], + failover: Failover): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMUpdateBroker(id, cpus, mem, heap, port, bindAddress, + constraints, options, log4jOptions, jvmOptions, stickinessPeriod, failover))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + + } + + def getBrokerIdentity(schedulerName: String, brokerId: Int): Future[ApiError \/ SchedulerBrokerIdentity] = { + val futureView = tryWithKafkaManagerActor( + KMSchedulerQueryRequest( + schedulerName, + SMGetBrokerIdentity(brokerId) + ) + )(identity[Option[SchedulerBrokerIdentity]]) + + implicit val ec = apiExecutionContext + futureView.flatMap[ApiError \/ SchedulerBrokerIdentity] { errOrView => + errOrView.fold( + { err: ApiError => + Future.successful(-\/[ApiError](err)) + }, { viewOption: Option[SchedulerBrokerIdentity] => + viewOption.fold { + Future.successful[ApiError \/ SchedulerBrokerIdentity](-\/(ApiError(s"Broker not found $brokerId for scheduler $schedulerName"))) + } { view => + Future.successful(\/-(view)) + } + } + ) + } + } + + def startBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMStartBroker(brokerId))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } + + def stopBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMStopBroker(brokerId))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } + + def removeBroker(schedulerName: String, brokerId: Int): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMRemoveBroker(brokerId))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } + + def rebalanceTopics(schedulerName: String, ids: String, topics: Option[String]): Future[ApiError \/ Unit] = + { + implicit val ec = apiExecutionContext + withKafkaManagerActor(KMSchedulerCommandRequest(schedulerName, SMRebalanceTopics(ids, topics))) { + result: Future[SMCommandResult] => + result.map(cmr => toDisjunction(cmr.result)) + } + } } diff --git a/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala b/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala index 936d8baa8..3d6a4b0df 100644 --- a/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala +++ b/app/scheduler/kafka/manager/KafkaSchedulerStateActor.scala @@ -11,7 +11,6 @@ import kafka.manager.{BaseQueryCommandActor, SchedulerConfig} import org.apache.curator.framework.CuratorFramework class KafkaSchedulerStateActor(curator: CuratorFramework, - deleteSupported: Boolean, schedulerConfig: SchedulerConfig) extends BaseQueryCommandActor { val schedulerRestClient = new SchedulerRestClient(schedulerConfig.apiUrl)(play.api.libs.concurrent.Execution.Implicits.defaultContext) @@ -45,10 +44,11 @@ class KafkaSchedulerStateActor(curator: CuratorFramework, override def processQueryRequest(request: QueryRequest): Unit = { request match { case KSGetTopics => - sender ! TopicList(IndexedSeq(), Set()) + sender ! TopicList(IndexedSeq(), Set(), null) case KSGetAllTopicDescriptions(lastUpdateMillisOption) => sender ! TopicDescriptions(IndexedSeq.empty, 0L) + case SchedulerKSGetBrokers => implicit val ec = context.dispatcher diff --git a/app/scheduler/kafka/manager/SchedulerManagerActor.scala b/app/scheduler/kafka/manager/SchedulerManagerActor.scala index bdac40316..9fd939dab 100644 --- a/app/scheduler/kafka/manager/SchedulerManagerActor.scala +++ b/app/scheduler/kafka/manager/SchedulerManagerActor.scala @@ -67,7 +67,7 @@ class SchedulerManagerActor(smConfig: SchedulerManagerActorConfig) private[this] val adminUtils = new AdminUtils(smConfig.schedulerConfig.version) - private[this] val kssProps = Props(classOf[KafkaSchedulerStateActor],sharedClusterCurator, adminUtils.isDeleteSupported, smConfig.schedulerConfig) + private[this] val kssProps = Props(classOf[KafkaSchedulerStateActor],sharedClusterCurator, smConfig.schedulerConfig) private[this] val kafkaSchedulerStateActor : ActorPath = context.actorOf(kssProps.withDispatcher(smConfig.pinnedDispatcherName),"kafka-scheduler-state").path private[this] val bvConfig = SchedulerBrokerViewCacheActorConfig( diff --git a/app/views/index.scala.html b/app/views/index.scala.html index f3ff470a3..89d3aeb48 100644 --- a/app/views/index.scala.html +++ b/app/views/index.scala.html @@ -4,8 +4,8 @@ *@ @import scalaz.{\/} @(errorOrClusters: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMClusterList, -errorOrSchedulers: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMSchedulerList) -(implicit af: features.ApplicationFeatures) +errorOrSchedulers: kafka.manager.ApiError \/ kafka.manager.ActorModel.KMSchedulerList +)(implicit af: features.ApplicationFeatures) @main( "Kafka Manager", From 9db1d24f2ad9e41a27ae28aa1f98f23e9ac91edd Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Wed, 16 Sep 2015 16:02:56 +0300 Subject: [PATCH 09/10] Fixed uris according to latest changes in kafka-mesos --- .../kafka/manager/SchedulerRestClient.scala | 33 ++++++++++--------- .../views/broker/rebalanceTopics.scala.html | 4 +-- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/app/scheduler/kafka/manager/SchedulerRestClient.scala b/app/scheduler/kafka/manager/SchedulerRestClient.scala index 9c3185fe4..51a1b5dbd 100644 --- a/app/scheduler/kafka/manager/SchedulerRestClient.scala +++ b/app/scheduler/kafka/manager/SchedulerRestClient.scala @@ -106,14 +106,16 @@ object SchedulerRestClient { class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: ExecutionContext) { private[this] lazy val logger = LoggerFactory.getLogger(this.getClass) - private val BrokersApiPrefix = s"$apiUrl/api/brokers" - private val StatusUrl = s"$BrokersApiPrefix/status" - private val AddBrokerUrl = s"$BrokersApiPrefix/add" - private val UpdateBrokerUrl = s"$BrokersApiPrefix/update" - private val StartBrokerUrl = s"$BrokersApiPrefix/start" - private val StopBrokerUrl = s"$BrokersApiPrefix/stop" - private val RemoveBrokerUrl = s"$BrokersApiPrefix/remove" - private val RebalanceTopicsUrl = s"$BrokersApiPrefix/rebalance" + private val BrokerApiPrefix = s"$apiUrl/api/broker" + private val StatusUrl = s"$BrokerApiPrefix/list" + private val AddBrokerUrl = s"$BrokerApiPrefix/add" + private val UpdateBrokerUrl = s"$BrokerApiPrefix/update" + private val StartBrokerUrl = s"$BrokerApiPrefix/start" + private val StopBrokerUrl = s"$BrokerApiPrefix/stop" + private val RemoveBrokerUrl = s"$BrokerApiPrefix/remove" + + private val TopicApiPrefix = s"$apiUrl/api/topic" + private val RebalanceTopicsUrl = s"$TopicApiPrefix/rebalance" private val Timeout = 10000 @@ -140,7 +142,7 @@ class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: Exe failover: Failover): Future[Seq[Broker]] = { val queryParamsSeq = Seq( - "id" -> Some(id.toString), "cpus" -> cpus.map(_.toString), "mem" -> mem.map(_.toString), "heap" -> heap.map(_.toString), + "broker" -> Some(id.toString), "cpus" -> cpus.map(_.toString), "mem" -> mem.map(_.toString), "heap" -> heap.map(_.toString), "port" -> port, "bindAddress" -> bindAddress, "constraints" -> constraints, "options" -> options, "log4jOptions" -> log4jOptions, "jvmOptions" -> jvmOptions, "stickinessPeriod" -> stickinessPeriod.map(_.toString), "failoverDelay" -> failover.failoverDelay.map(_.toString), @@ -155,7 +157,8 @@ class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: Exe .get() holder.map { - response => response.json.validate[AddBrokerResponse] + response => + response.json.validate[AddBrokerResponse] }.flatMap { case JsError(e) => logger.error(s"Failed to parse add broker response $e") @@ -171,7 +174,7 @@ class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: Exe failover: Failover): Future[Unit] = { val queryParamsSeq = Seq( - "id" -> Some(id.toString), "cpus" -> cpus.map(_.toString), "mem" -> mem.map(_.toString), "heap" -> heap.map(_.toString), + "broker" -> Some(id.toString), "cpus" -> cpus.map(_.toString), "mem" -> mem.map(_.toString), "heap" -> heap.map(_.toString), "port" -> port, "bindAddress" -> bindAddress, "constraints" -> constraints, "options" -> options, "log4jOptions" -> log4jOptions, "jvmOptions" -> jvmOptions, "stickinessPeriod" -> stickinessPeriod.map(_.toString), "failoverDelay" -> failover.failoverDelay.map(_.toString), @@ -191,7 +194,7 @@ class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: Exe def startBroker(id: Int): Future[Unit] = { val holder: Future[WSResponse] = WS .url(StartBrokerUrl) - .withQueryString(Seq("id" -> id.toString, "timeout" -> "0"): _*) + .withQueryString(Seq("broker" -> id.toString, "timeout" -> "0"): _*) .withRequestTimeout(Timeout) .get() @@ -201,7 +204,7 @@ class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: Exe def stopBroker(id: Int): Future[Unit] = { val holder: Future[WSResponse] = WS .url(StopBrokerUrl) - .withQueryString(Seq("id" -> id.toString, "timeout" -> "0"): _*) + .withQueryString(Seq("broker" -> id.toString, "timeout" -> "0"): _*) .withRequestTimeout(Timeout) .get() @@ -211,7 +214,7 @@ class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: Exe def removeBroker(id: Int): Future[Unit] = { val holder: Future[WSResponse] = WS .url(RemoveBrokerUrl) - .withQueryString(Seq("id" -> id.toString): _*) + .withQueryString(Seq("broker" -> id.toString): _*) .withRequestTimeout(Timeout) .get() @@ -221,7 +224,7 @@ class SchedulerRestClient(val apiUrl: String)(implicit val executionContext: Exe def rebalanceTopics(ids: String, topics: Option[String]): Future[Unit] = { val holder: Future[WSResponse] = WS .url(RebalanceTopicsUrl) - .withQueryString(Seq("ids" -> Some(ids), "topics" -> topics).collect { + .withQueryString(Seq("broker" -> Some(ids), "topic" -> topics).collect { case (key, Some(v)) => (key, v) }: _*) .withRequestTimeout(Timeout) diff --git a/app/scheduler/views/broker/rebalanceTopics.scala.html b/app/scheduler/views/broker/rebalanceTopics.scala.html index 8c95540f9..9cbb542c0 100644 --- a/app/scheduler/views/broker/rebalanceTopics.scala.html +++ b/app/scheduler/views/broker/rebalanceTopics.scala.html @@ -20,8 +20,8 @@ - @b3.text(rebalanceForm("ids"), '_label -> "IDs", 'placeholder -> "", 'autofocus -> true ) - @b3.text(rebalanceForm("topics"), '_label -> "Topics", 'placeholder -> "") + @b3.text(rebalanceForm("ids"), '_label -> "Broker Ids Expression", 'placeholder -> "", 'autofocus -> true ) + @b3.text(rebalanceForm("topics"), '_label -> "Topic Expression", 'placeholder -> "") @b3.submit('class -> "submit-button btn btn-primary"){ Submit } Cancel From 39e2d0d296e131b8f91c313e9933dd5e68a648cf Mon Sep 17 00:00:00 2001 From: Sjoerd Mulder Date: Wed, 18 Nov 2015 14:07:17 +0100 Subject: [PATCH 10/10] Fixing build by excluding oauth (that contains logging) --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index baede3a2e..c899915d1 100644 --- a/build.sbt +++ b/build.sbt @@ -27,7 +27,7 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-slf4j" % "2.3.14", "com.google.code.findbugs" % "jsr305" % "2.0.1", "org.webjars" %% "webjars-play" % "2.3.0-2", - ws, + ws exclude("oauth.signpost","signpost-commonshttp4") force(), "org.webjars" % "bootstrap" % "3.3.4", "org.webjars" % "jquery" % "2.1.4", "org.webjars" % "backbonejs" % "1.1.2-4",