Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rest probe endpoints(#108) #111

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ ksm {

}

server {
port = 8080
port = ${?SERVER_PORT}
}

parser {
csv {
delimiter = ","
Expand Down
11 changes: 9 additions & 2 deletions src/main/scala/io/conduktor/ksm/AclSynchronizer.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.conduktor.ksm

import io.conduktor.ksm.source.SourceAcl
import io.conduktor.ksm.notification.Notification
import io.conduktor.ksm.source.{ParsingContext, SourceAcl}
import io.conduktor.ksm.web.Probe
import kafka.security.auth.{Acl, Authorizer, Resource}
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -62,12 +62,13 @@ class AclSynchronizer(
notification: Notification,
numFailedRefreshesBeforeNotification: Int,
readOnly: Boolean = false
) extends Runnable {
) extends Runnable with Probe {

import AclSynchronizer._

private var sourceAclsCache: Set[(Resource, Acl)] = _
private var failedRefreshes: Int = 0
private var isRefreshFailing = false

if (readOnly) {
log.warn("""
Expand All @@ -84,6 +85,7 @@ class AclSynchronizer(
Try(sourceAcl.refresh()) match {
case Success(result) =>
failedRefreshes = 0
isRefreshFailing = false
result match {
// the source has not changed
case None =>
Expand Down Expand Up @@ -125,6 +127,7 @@ class AclSynchronizer(
case Failure(e) =>
// errors such as HTTP exceptions when refreshing
failedRefreshes += 1
isRefreshFailing = true
try {
log.error("Exceptions while refreshing ACL source:", e)
if(failedRefreshes >= numFailedRefreshesBeforeNotification){
Expand All @@ -147,4 +150,8 @@ class AclSynchronizer(
sourceAcl.close()
notification.close()
}

override def isSuccessful: Boolean = {
!isRefreshFailing
}
}
5 changes: 5 additions & 0 deletions src/main/scala/io/conduktor/ksm/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class AppConfig(config: Config) {
val readOnly: Boolean = ksmConfig.getBoolean("readonly")
}

object Server {
private val serverConfig = config.getConfig("server")
val port: Int = serverConfig.getInt("port")
}

object Parser {
private val aclParserConfig = config.getConfig("parser")
val csvDelimiter: Char =
Expand Down
7 changes: 6 additions & 1 deletion src/main/scala/io/conduktor/ksm/KafkaSecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.conduktor.ksm

import com.typesafe.config.ConfigFactory
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.web.Server
import org.slf4j.LoggerFactory

import java.util.concurrent.atomic.AtomicBoolean
Expand All @@ -18,6 +19,7 @@ object KafkaSecurityManager extends App {
var aclSynchronizer: AclSynchronizer = _
val parserRegistry: AclParserRegistry = new AclParserRegistry(appConfig)
val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
var server: Server = _

// For backward compatibility, see https://github.com/conduktor/kafka-security-manager/issues/103
val oldExtractConfig = sys.env.get("KSM_EXTRACT")
Expand All @@ -37,6 +39,7 @@ object KafkaSecurityManager extends App {
appConfig.KSM.numFailedRefreshesBeforeNotification,
appConfig.KSM.readOnly
)
server = new Server(appConfig.Server.port, List(aclSynchronizer))

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
Expand All @@ -54,6 +57,7 @@ object KafkaSecurityManager extends App {
log.info(
"Continuous mode: ACL will be synchronized every " + appConfig.KSM.refreshFrequencyMs + " ms."
)
server.start()
val handle = scheduler.scheduleAtFixedRate(
aclSynchronizer,
0,
Expand All @@ -68,13 +72,14 @@ object KafkaSecurityManager extends App {
} finally {
shutdown()
}

}

def shutdown(): Unit = {
log.info("Kafka Security Manager is shutting down...")
isCancelled = new AtomicBoolean(true)
aclSynchronizer.close()
if (server != null)
server.stop()
scheduler.shutdownNow()
}
}
23 changes: 23 additions & 0 deletions src/main/scala/io/conduktor/ksm/web/ProbeHandler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.conduktor.ksm.web

import com.sun.net.httpserver.{HttpExchange, HttpHandler}

trait Probe {
// implementation should be non blocking
def isSuccessful: Boolean
}

class ProbeHandler(probes: List[Probe]) extends HttpHandler {
override def handle(exc: HttpExchange): Unit = {
val checkup = probes.forall(p => p.isSuccessful)
val payload = Server.responseMapper
.createObjectNode()
.put("success", checkup)
val response = Server.responseMapper.writeValueAsString(payload)
val responseCode = if (checkup) 200 else 500
exc.sendResponseHeaders(responseCode, response.length())
val os = exc.getResponseBody
os.write(response.getBytes)
os.close()
}
}
29 changes: 29 additions & 0 deletions src/main/scala/io/conduktor/ksm/web/Server.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.conduktor.ksm.web

import com.fasterxml.jackson.databind.ObjectMapper
import com.sun.net.httpserver.HttpServer
import org.slf4j.LoggerFactory

import java.net.InetSocketAddress
import java.util.concurrent.Executors

object Server {
val responseMapper = new ObjectMapper()
}

class Server(port: Int, livenessProbes: List[Probe]) {
private val log = LoggerFactory.getLogger(Server.getClass)
private val server = HttpServer.create(new InetSocketAddress(port), 0)
server.createContext("/api/probe/ready", new ProbeHandler(List()))
server.createContext("/api/probe/alive", new ProbeHandler(livenessProbes))

def start(): Unit = {
log.info("Staring server on {}", port)
server.setExecutor(Executors.newSingleThreadExecutor())
server.start()
}

def stop(): Unit = {
server.stop(0)
}
}
3 changes: 3 additions & 0 deletions src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class AclSynchronizerTest
aclSynchronizer.run()
dummyNotification.addedAcls.size shouldBe 3
dummyNotification.removedAcls.size shouldBe 0
aclSynchronizer.isSuccessful shouldBe true
eventually(timeout(3000 milliseconds), interval(200 milliseconds)) {
simpleAclAuthorizer
.getAcls() shouldBe Map(res1 -> Set(acl1, acl2), res2 -> Set(acl3))
Expand All @@ -251,6 +252,7 @@ class AclSynchronizerTest
dummyNotification.reset()
dummySourceAcl.setErrorNext()
aclSynchronizer.run()
aclSynchronizer.isSuccessful shouldBe false
dummyNotification.addedAcls.size shouldBe 0
dummyNotification.removedAcls.size shouldBe 0
dummyNotification.errorCounter shouldBe 1
Expand All @@ -264,6 +266,7 @@ class AclSynchronizerTest
aclSynchronizer.run()
dummyNotification.addedAcls.size shouldBe 1
dummyNotification.removedAcls.size shouldBe 1
aclSynchronizer.isSuccessful shouldBe true
eventually(timeout(3000 milliseconds), interval(200 milliseconds)) {
simpleAclAuthorizer.getAcls() shouldBe Map(
res1 -> Set(acl1),
Expand Down
60 changes: 60 additions & 0 deletions src/test/scala/io/conduktor/ksm/web/ServerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.conduktor.ksm.web

import org.scalatest.{BeforeAndAfterAll, FlatSpec}
import skinny.http.{HTTP, Request}

class LivenessTestProbe extends Probe {
var success = false;

override def isSuccessful: Boolean = {
success
}
}

class ServerTest extends FlatSpec with BeforeAndAfterAll {
private val livenessTestProbe1 = new LivenessTestProbe
private val livenessTestProbe2 = new LivenessTestProbe
private val testSubject =
new Server(7777, List(livenessTestProbe1, livenessTestProbe2))
private val testServerUrl = "http://localhost:7777";
private val testReadyEndpoint = testServerUrl + "/api/probe/ready"
private val testAliveEndpoint = testServerUrl + "/api/probe/alive"

override protected def beforeAll(): Unit = {
testSubject.start()
}

override protected def afterAll(): Unit = {
testSubject.stop()
}

"get ready probe endpoint" should "return 200 with success true" in {
val response = HTTP.get(Request(testReadyEndpoint))
assert(response.status == 200)
assert(new String(response.body) == "{\"success\":true}")
}

"get alive probe endpoint" should "return 200 with success true, if all probes are successful" in {
livenessTestProbe1.success = true
livenessTestProbe2.success = true
val response = HTTP.get(Request(testAliveEndpoint))
assert(response.status == 200)
assert(new String(response.body) == "{\"success\":true}")
}

"get alive probe endpoint" should "return 500 with success false, if some probes are un-successful" in {
livenessTestProbe1.success = true
livenessTestProbe2.success = false
val response = HTTP.get(Request(testAliveEndpoint))
assert(response.status == 500)
assert(new String(response.body) == "{\"success\":false}")
}

"get alive probe endpoint" should "return 500 with success false, if all probes are un-successful" in {
livenessTestProbe1.success = false
livenessTestProbe2.success = false
val response = HTTP.get(Request(testAliveEndpoint))
assert(response.status == 500)
assert(new String(response.body) == "{\"success\":false}")
}
}