Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3328: SimpleAclAuthorizer can lose ACLs with frequent add/remov… #1006

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 133 additions & 67 deletions core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package kafka.security.auth
import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
import org.apache.zookeeper.Watcher.Event.KeeperState


import kafka.network.RequestChannel.Session
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.I0Itec.zkclient.IZkStateListener
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.zookeeper.KeeperException
import scala.collection.JavaConverters._
import org.apache.log4j.Logger

Expand Down Expand Up @@ -71,7 +70,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private var zkUtils: ZkUtils = null
private var aclChangeListener: ZkNodeChangeNotificationListener = null

private val aclCache = new scala.collection.mutable.HashMap[Resource, Set[Acl]]
private case class VersionedAcls(acls: Set[Acl], version: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment on what version is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change the variable to zkVersion to make it more clear.

private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls]
private val lock = new ReentrantReadWriteLock()

/**
Expand Down Expand Up @@ -163,68 +163,52 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}

override def addAcls(acls: Set[Acl], resource: Resource) {
if (acls != null && acls.nonEmpty) {
val updatedAcls = getAcls(resource) ++ acls
val path = toResourcePath(resource)

if (zkUtils.pathExists(path))
zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
else
zkUtils.createPersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))

updateAclChangedFlag(resource)
inWriteLock(lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to make more evident what the lock is covering, I think you can move it inside the if block. This observation also holds for other lock blocks in the patch.

if (acls != null && acls.nonEmpty) {
updateResourceAcls(resource) { currentAcls =>
currentAcls ++ acls
}
}
}
}

override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
if (zkUtils.pathExists(toResourcePath(resource))) {
val existingAcls = getAcls(resource)
val filteredAcls = existingAcls.filter((acl: Acl) => !aclsTobeRemoved.contains(acl))

val aclNeedsRemoval = (existingAcls != filteredAcls)
if (aclNeedsRemoval) {
val path: String = toResourcePath(resource)
if (filteredAcls.nonEmpty)
zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
else
zkUtils.deletePath(toResourcePath(resource))

updateAclChangedFlag(resource)
inWriteLock(lock) {
updateResourceAcls(resource) { currentAcls =>
currentAcls.diff(aclsTobeRemoved)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, is there a reason you used diff instead of the more common --?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Often I prefer the named method over the symbolic one. I find it can be more clear, especially to those less familiar with Scala. diff just calls --. Happy to change it if you think I should

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think -- is clearer in this case (it's widely used as the opposite of ++). diff is rarely used in comparison in my experience,

}

aclNeedsRemoval
} else false
}
}

override def removeAcls(resource: Resource): Boolean = {
if (zkUtils.pathExists(toResourcePath(resource))) {
zkUtils.deletePath(toResourcePath(resource))
inWriteLock(lock) {
val result = zkUtils.deletePath(toResourcePath(resource))
updateCache(resource, VersionedAcls(Set(), 0))
updateAclChangedFlag(resource)
true
} else false
result
}
}

override def getAcls(resource: Resource): Set[Acl] = {
inReadLock(lock) {
aclCache.get(resource).getOrElse(Set.empty[Acl])
aclCache.get(resource).map(_.acls).getOrElse(Set.empty[Acl])
}
}

private def getAclsFromZk(resource: Resource): Set[Acl] = {
val aclJson = zkUtils.readDataMaybeNull(toResourcePath(resource))._1
aclJson.map(Acl.fromJson).getOrElse(Set.empty)
}

override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
aclCache.mapValues { acls =>
acls.filter(_.principal == principal)
}.filter { case (_, acls) =>
acls.nonEmpty
}.toMap
inReadLock(lock) {
aclCache.mapValues { versionedAcls =>
versionedAcls.acls.filter(_.principal == principal)
}.filter { case (_, acls) =>
acls.nonEmpty
}.toMap
}
}

override def getAcls(): Map[Resource, Set[Acl]] = {
aclCache.toMap
inReadLock(lock) {
aclCache.mapValues(_.acls).toMap
}
}

def close() {
Expand All @@ -233,25 +217,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}

private def loadCache() {
var acls = Set.empty[Acl]
val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
for (rType <- resourceTypes) {
val resourceType = ResourceType.fromString(rType)
val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
val resourceNames = zkUtils.getChildren(resourceTypePath)
for (resourceName <- resourceNames) {
acls = getAclsFromZk(Resource(resourceType, resourceName.toString))
updateCache(new Resource(resourceType, resourceName), acls)
}
}
}

private def updateCache(resource: Resource, acls: Set[Acl]) {
inWriteLock(lock) {
if (acls.nonEmpty)
aclCache.put(resource, acls)
else
aclCache.remove(resource)
val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
for (rType <- resourceTypes) {
val resourceType = ResourceType.fromString(rType)
val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
val resourceNames = zkUtils.getChildren(resourceTypePath)
for (resourceName <- resourceNames) {
val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString))
updateCache(new Resource(resourceType, resourceName), versionedAcls)
}
}
}
}

Expand All @@ -264,16 +240,106 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
authorizerLogger.debug(s"Principal = $principal is $permissionType Operation = $operation from host = $host on resource = $resource")
}

/**
* Safely updates the resources acls by ensureing reads and writes respect the expected zookeeper version.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: ensuring.

* Continues to retry until it succesfully updates.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this statement. We return a boolean indicating if an update was made so it seems a bit contradictory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boolean indicates if the ACLs content changed. Regardless we need to update zookeeper to check/increment the version at a minimum. I can clarify this doc.

*
* @param resource the resource to change acls for
* @param getNewAcls function to transform existing acls to new acls
* @return boolean indicating if a change was made
*/
private def updateResourceAcls(resource: Resource)(getNewAcls: Set[Acl] => Set[Acl]): Boolean = {
val path = toResourcePath(resource)

var currentVersionedAcls =
if(aclCache.contains(resource))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: space after if. It would be good to check the rest of the code for this.

getAclsFromCache(resource)
else
getAclsFromZk(resource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

var newVersionedAcls = currentVersionedAcls
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we never use the value initially assigned to this, right? I'd make that explicit and assign null.

var writeComplete = false
while (!writeComplete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you want to spin it until the write is complete. If there is an error and the operation does not complete, then perhaps it should be up to the caller to call it again. Could you elaborate on the case you have in mind and that makes this while loop necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goal of this while loop is to retry the update to zookeeper if we fail based on the zookeeper version. The zookeeper version could be incorrect if the cached version in this instance is not up to date. That is most likely to occur if there are concurrent updates on a separate instance of the authorizer.

If the failure is based on anything other than version, or the handled exceptions. Then an exception will be thrown and propagated back to the user.

Ideally I would use the sync call to prevent any "spinning" but the ZkClient we use does not have that method (sgroschupf/zkclient#44).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with sync is that it is an asynchronous call and I believe ZkClient only supports the synchronous API. I don't think it is worth using the sync call here because it won't stop concurrent accesses anyway. When we move to the async API for the controller, then perhaps we can make this change here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. Thanks for clarifying.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that we should bound the number of iterations of the loop to be extra safe. It is unlikely that it happens, but if some reason it has to iterate over this loop many times, then it might be better to fail the operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpj Do you have a suggest number of tries we should limit this too?

Under what scenarios do you think it would loop indefinitely without some other exception from zookeeper breaking the loop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a right answer here for the number of retries, and it should really never happen that you're looping indefinitely other than getting a bad version on every attempt. But, it does feel safer to have a safeguard for the case that something goes wrong. If you want a guess, I'd say 5 times but I'm happy if you want to go higher.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpj I will set this to 15 to be "safe" but ensure its not triggered too soon. I started with 10 but my "high concurrency" test actually triggered it and failed.

val newAcls = getNewAcls(currentVersionedAcls.acls)
val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
try {
val (updateSucceeded, updateVersion) =
if(!newAcls.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add space after if?

zkUtils.conditionalUpdatePersistentPathIfExists(path, data, currentVersionedAcls.version)
} else {
trace(s"Deleting path for $resource because it had no acls remaining")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I think we should write it as ACLs when logging.

(deletePath(path, currentVersionedAcls.version), 0)
}
if(!updateSucceeded) {
trace(s"Failed to update acls for $resource. Used version ${currentVersionedAcls.version}. Reading data and retrying update.")
currentVersionedAcls = getAclsFromZk(resource);
}
newVersionedAcls = VersionedAcls(newAcls, updateVersion)
writeComplete = updateSucceeded
} catch {
case e: ZkNoNodeException =>
try {
debug(s"Node for $resource does not exist, attempting to create it.")
zkUtils.createPersistentPath(path, data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a corner case here I think. If data is empty, then you don't want to create the znode. I don't think you're expecting such a call, which makes sense, but there isn't anything that prevents a caller from making that mistake.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch I will fix that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing to watch out for here is that the delete can also throw a no node exception, and in this case you also don't want to create the znode. I suppose that there will be no data to write like in my previous comment, so it should be fine, but I wanted to raise the point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the updated patch, I handle empty data and No node exceptions in the delete call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be a style comment, so it is arguable whether it is better or not. I'd rather have this try/catch wrapping the call to conditionalUpdatePersistentPathIfExists because we are specifically interested in the exception that it can throw. I think it makes the intent more clear if we move it there, but again, up to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpj I moved the update logic to its own private method. Pushing an update soon.

newVersionedAcls = VersionedAcls(newAcls, 0)
writeComplete = true
} catch {
case e: ZkNodeExistsException =>
debug(s"Failed to create node for $resource because it already exists. Reading data and retrying update.")
currentVersionedAcls = getAclsFromZk(resource);
}
}
}

if(newVersionedAcls.acls != currentVersionedAcls.acls) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? If we successfully updated ZK, it's probably simpler to always propagate the change even if the content is identical.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not required, just an optimization. I am happy to remove it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Upon further review this is required to support the delete contract. It returns a boolean that indicates if any acls were actually deleted. I can move the propagation outside of the check, but I will still need the check to return true or false.

Should I move the propagation out of the acls change check?

debug(s"Updated acls for $resource to ${newVersionedAcls.acls} with version ${newVersionedAcls.version}")
updateCache(resource, newVersionedAcls)
updateAclChangedFlag(resource)
true
} else {
debug(s"Updated acls for $resource, no change was made")
updateCache(resource, newVersionedAcls) // Even if no change, update the version
false
}
}

private def deletePath(path: String, expectedVersion: Int): Boolean = {
try {
zkUtils.zkConnection.getZookeeper.delete(path, expectedVersion) // Workaround until zkClient supports versioned deletes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we use the raw ZK api here directly, we will need to handle the ConnectionLossException which zkclient hides from us. Perhaps we should just patch zkclient to expose versioned deleted and have zkclient release a new version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao This workaround was suggested by @fpj as an intermediate solution until that change occurs. I filed an issue in the ZkClient project. (sgroschupf/zkclient#45)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao @granthenke agreed that fixing zkclient is the ideal option to make it all uniform. I was thinking that even with zkclient you can get a connection timeout exception or whatever it is called because the client might not be able to reconnect in a long time. Consequently, the caller of updateResourceAcls needs to be ready to either recover or retry in the case the update doesn't succeed. If we get a connection loss event, then we could just retry. If we get a session expired, then we need to propagate the exception up perhaps wrapping it with some other kafka exception so that we don't expose session expiration. The zkclient instance will session as Jun says, so if the caller retries and zkclient is able to create the new session, then we will be able to perform the delete.

We can try to have zkclient fixed, we did it for 0.9 at the last minute and it worked, so it might be doable. Otherwise, I think the workaround isn't bad, but if we go with this workaround, then we should have a jira for Kafka as well so that we remember that it is there. Does it make sense?

true
} catch {
case e: KeeperException.NoNodeException => true // This path was already deleted
case e: KeeperException.BadVersionException => false
}
}

private def getAclsFromCache(resource: Resource): VersionedAcls = {
aclCache.get(resource).getOrElse(throw new IllegalArgumentException(s"Acls do not exist in the cache for resource $resource"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map also has a getOrElse so you can replace get followed by getOrElse with a single getOrElse.

}

private def getAclsFromZk(resource: Resource): VersionedAcls = {
val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
}

private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {
if (versionedAcls.acls.nonEmpty) {
aclCache.put(resource, versionedAcls)
} else {
aclCache.remove(resource)
}
}

private def updateAclChangedFlag(resource: Resource) {
zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
}

object AclChangedNotificationHandler extends NotificationHandler {

override def processNotification(notificationMessage: String) {
val resource: Resource = Resource.fromString(notificationMessage)
val acls = getAclsFromZk(resource)
updateCache(resource, acls)
inWriteLock(lock) {
val versionedAcls = getAclsFromZk(resource)
updateCache(resource, versionedAcls)
}
}
}
}
Loading