Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3328: SimpleAclAuthorizer can lose ACLs with frequent add/remov… #1006

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 146 additions & 67 deletions core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package kafka.security.auth
import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
import org.apache.zookeeper.Watcher.Event.KeeperState


import kafka.network.RequestChannel.Session
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.I0Itec.zkclient.IZkStateListener
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.zookeeper.KeeperException
import scala.collection.JavaConverters._
import org.apache.log4j.Logger

Expand Down Expand Up @@ -62,6 +62,8 @@ object SimpleAclAuthorizer {

//prefix of all the change notification sequence node.
val AclChangedPrefix = "acl_changes_"

private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
}

class SimpleAclAuthorizer extends Authorizer with Logging {
Expand All @@ -71,7 +73,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private var zkUtils: ZkUtils = null
private var aclChangeListener: ZkNodeChangeNotificationListener = null

private val aclCache = new scala.collection.mutable.HashMap[Resource, Set[Acl]]
private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls]
private val lock = new ReentrantReadWriteLock()

/**
Expand Down Expand Up @@ -163,68 +165,52 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}

override def addAcls(acls: Set[Acl], resource: Resource) {
if (acls != null && acls.nonEmpty) {
val updatedAcls = getAcls(resource) ++ acls
val path = toResourcePath(resource)

if (zkUtils.pathExists(path))
zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
else
zkUtils.createPersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))

updateAclChangedFlag(resource)
inWriteLock(lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to make more evident what the lock is covering, I think you can move it inside the if block. This observation also holds for other lock blocks in the patch.

if (acls != null && acls.nonEmpty) {
updateResourceAcls(resource) { currentAcls =>
currentAcls ++ acls
}
}
}
}

override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
if (zkUtils.pathExists(toResourcePath(resource))) {
val existingAcls = getAcls(resource)
val filteredAcls = existingAcls.filter((acl: Acl) => !aclsTobeRemoved.contains(acl))

val aclNeedsRemoval = (existingAcls != filteredAcls)
if (aclNeedsRemoval) {
val path: String = toResourcePath(resource)
if (filteredAcls.nonEmpty)
zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
else
zkUtils.deletePath(toResourcePath(resource))

updateAclChangedFlag(resource)
inWriteLock(lock) {
updateResourceAcls(resource) { currentAcls =>
currentAcls -- aclsTobeRemoved
}

aclNeedsRemoval
} else false
}
}

override def removeAcls(resource: Resource): Boolean = {
if (zkUtils.pathExists(toResourcePath(resource))) {
zkUtils.deletePath(toResourcePath(resource))
inWriteLock(lock) {
val result = zkUtils.deletePath(toResourcePath(resource))
updateCache(resource, VersionedAcls(Set(), 0))
updateAclChangedFlag(resource)
true
} else false
result
}
}

override def getAcls(resource: Resource): Set[Acl] = {
inReadLock(lock) {
aclCache.get(resource).getOrElse(Set.empty[Acl])
aclCache.get(resource).map(_.acls).getOrElse(Set.empty[Acl])
}
}

private def getAclsFromZk(resource: Resource): Set[Acl] = {
val aclJson = zkUtils.readDataMaybeNull(toResourcePath(resource))._1
aclJson.map(Acl.fromJson).getOrElse(Set.empty)
}

override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
aclCache.mapValues { acls =>
acls.filter(_.principal == principal)
}.filter { case (_, acls) =>
acls.nonEmpty
}.toMap
inReadLock(lock) {
aclCache.mapValues { versionedAcls =>
versionedAcls.acls.filter(_.principal == principal)
}.filter { case (_, acls) =>
acls.nonEmpty
}.toMap
}
}

override def getAcls(): Map[Resource, Set[Acl]] = {
aclCache.toMap
inReadLock(lock) {
aclCache.mapValues(_.acls).toMap
}
}

def close() {
Expand All @@ -233,25 +219,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}

private def loadCache() {
var acls = Set.empty[Acl]
val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
for (rType <- resourceTypes) {
val resourceType = ResourceType.fromString(rType)
val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
val resourceNames = zkUtils.getChildren(resourceTypePath)
for (resourceName <- resourceNames) {
acls = getAclsFromZk(Resource(resourceType, resourceName.toString))
updateCache(new Resource(resourceType, resourceName), acls)
}
}
}

private def updateCache(resource: Resource, acls: Set[Acl]) {
inWriteLock(lock) {
if (acls.nonEmpty)
aclCache.put(resource, acls)
else
aclCache.remove(resource)
val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
for (rType <- resourceTypes) {
val resourceType = ResourceType.fromString(rType)
val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
val resourceNames = zkUtils.getChildren(resourceTypePath)
for (resourceName <- resourceNames) {
val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString))
updateCache(new Resource(resourceType, resourceName), versionedAcls)
}
}
}
}

Expand All @@ -264,16 +242,117 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
authorizerLogger.debug(s"Principal = $principal is $permissionType Operation = $operation from host = $host on resource = $resource")
}

/**
* Safely updates the resources ACLs by ensuring reads and writes respect the expected zookeeper version.
* Continues to retry until it succesfully updates zookeeper.
*
* Returns a boolean indicating if the content of the ACLs was actually changed.
*
* @param resource the resource to change ACLs for
* @param getNewAcls function to transform existing acls to new ACLs
* @return boolean indicating if a change was made
*/
private def updateResourceAcls(resource: Resource)(getNewAcls: Set[Acl] => Set[Acl]): Boolean = {
val path = toResourcePath(resource)

var currentVersionedAcls =
if (aclCache.contains(resource))
getAclsFromCache(resource)
else
getAclsFromZk(resource)
var newVersionedAcls: VersionedAcls = null
var writeComplete = false
while (!writeComplete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you want to spin it until the write is complete. If there is an error and the operation does not complete, then perhaps it should be up to the caller to call it again. Could you elaborate on the case you have in mind and that makes this while loop necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goal of this while loop is to retry the update to zookeeper if we fail based on the zookeeper version. The zookeeper version could be incorrect if the cached version in this instance is not up to date. That is most likely to occur if there are concurrent updates on a separate instance of the authorizer.

If the failure is based on anything other than version, or the handled exceptions. Then an exception will be thrown and propagated back to the user.

Ideally I would use the sync call to prevent any "spinning" but the ZkClient we use does not have that method (sgroschupf/zkclient#44).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with sync is that it is an asynchronous call and I believe ZkClient only supports the synchronous API. I don't think it is worth using the sync call here because it won't stop concurrent accesses anyway. When we move to the async API for the controller, then perhaps we can make this change here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. Thanks for clarifying.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that we should bound the number of iterations of the loop to be extra safe. It is unlikely that it happens, but if some reason it has to iterate over this loop many times, then it might be better to fail the operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpj Do you have a suggest number of tries we should limit this too?

Under what scenarios do you think it would loop indefinitely without some other exception from zookeeper breaking the loop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a right answer here for the number of retries, and it should really never happen that you're looping indefinitely other than getting a bad version on every attempt. But, it does feel safer to have a safeguard for the case that something goes wrong. If you want a guess, I'd say 5 times but I'm happy if you want to go higher.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpj I will set this to 15 to be "safe" but ensure its not triggered too soon. I started with 10 but my "high concurrency" test actually triggered it and failed.

val newAcls = getNewAcls(currentVersionedAcls.acls)
val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
val (updateSucceeded, updateVersion) =
if (!newAcls.isEmpty) {
updatePath(path, data, currentVersionedAcls.zkVersion)
} else {
trace(s"Deleting path for $resource because it had no ACLs remaining")
(deletePath(path, currentVersionedAcls.zkVersion), 0)
}

if (!updateSucceeded) {
trace(s"Failed to update ACLs for $resource. Used version ${currentVersionedAcls.zkVersion}. Reading data and retrying update.")
currentVersionedAcls = getAclsFromZk(resource);
} else {
newVersionedAcls = VersionedAcls(newAcls, updateVersion)
writeComplete = updateSucceeded
}
}

if (newVersionedAcls.acls != currentVersionedAcls.acls) {
debug(s"Updated ACLs for $resource to ${newVersionedAcls.acls} with version ${newVersionedAcls.zkVersion}")
updateCache(resource, newVersionedAcls)
updateAclChangedFlag(resource)
true
} else {
debug(s"Updated ACLs for $resource, no change was made")
updateCache(resource, newVersionedAcls) // Even if no change, update the version
false
}
}

/**
* Updates a zookeeper path with an expected version. If the topic does not exist, it will create it.
* Returns if the update was successful and the new version.
*/
private def updatePath(path: String, data: String, expectedVersion: Int): (Boolean, Int) = {
try {
zkUtils.conditionalUpdatePersistentPathIfExists(path, data, expectedVersion)
} catch {
case e: ZkNoNodeException =>
try {
debug(s"Node $path does not exist, attempting to create it.")
zkUtils.createPersistentPath(path, data)
(true, 0)
} catch {
case e: ZkNodeExistsException =>
debug(s"Failed to create node for $path because it already exists.")
(false, 0)
}
}
}

private def deletePath(path: String, expectedVersion: Int): Boolean = {
try {
zkUtils.zkClient.delete(path, expectedVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor point, but I'm wondering if it is best to have a call in ZkUtils so that we consistently access zkclient through ZkUtils.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpj I chose not to because it's not used anywhere else yet and I didn't want to think through and test "generic logic" for all uses. This code is contained and tested with respect to the authorizer. I thought was was an okay choice because ZkClient is used in many other places throughout the code base as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change isn't strictly necessary, and in fact it makes it less efficient because we have an additional call. I was thinking about the case that we want to replace zkclient with direct calls to the raw zookeeper client (KAFKA-3210) and as I was thinking about doing it, we'd just rewire ZkUtils. It's not a big deal if you don't want to change, though. Just let me know so that we can wrap this one up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpj I can move it.

true
} catch {
case e: KeeperException.NoNodeException => true // This path was already deleted
case e: KeeperException.BadVersionException => false
}
}

private def getAclsFromCache(resource: Resource): VersionedAcls = {
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
}

private def getAclsFromZk(resource: Resource): VersionedAcls = {
val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
}

private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {
if (versionedAcls.acls.nonEmpty) {
aclCache.put(resource, versionedAcls)
} else {
aclCache.remove(resource)
}
}

private def updateAclChangedFlag(resource: Resource) {
zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
}

object AclChangedNotificationHandler extends NotificationHandler {

override def processNotification(notificationMessage: String) {
val resource: Resource = Resource.fromString(notificationMessage)
val acls = getAclsFromZk(resource)
updateCache(resource, acls)
inWriteLock(lock) {
val versionedAcls = getAclsFromZk(resource)
updateCache(resource, versionedAcls)
}
}
}
}
Loading