Skip to content

Commit

Permalink
Merge pull request #54 from robobario/MGDSTRM-9800-authorizer-enhance…
Browse files Browse the repository at this point in the history
…ment

MGDSTRM-9800 authorizer enhancement
  • Loading branch information
robobario authored Sep 29, 2022
2 parents 91430f0 + 28b7ff9 commit c293e85
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 104 deletions.
32 changes: 15 additions & 17 deletions src/main/java/io/bf2/kafka/authorizer/CustomAclAuthorizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.lang.String.format;

/**
* A authorizer for Kafka that defines custom ACLs. The configuration is provided as
* a string a semicolon-delimited key/value pairs to specify
Expand Down Expand Up @@ -101,7 +103,8 @@ public class CustomAclAuthorizer implements Authorizer {
private static final int CREATE_PARTITIONS_APIKEY = 37;
private static final Logger log = LoggerFactory.getLogger(CustomAclAuthorizer.class);

static final String CREATE_ACL_INVALID_PRINCIPAL = "Invalid ACL principal name";
static final String INVALID_ACL_PRINCIPAL_RESTRICTED_TEMPLATE = "ACL rules including principal '%s' are prohibited - this principal is restricted";
static final String INVALID_ACL_PRINCIPAL_NON_USER_PREFIXED_TEMPLATE = "ACL rules including principal '%s' are prohibited - principal is not type User";
static final String CREATE_ACL_INVALID_BINDING = "Invalid ACL resource or operation";

static final String CONFIG_PREFIX = Config.PREFIX + "authorizer.";
Expand Down Expand Up @@ -421,28 +424,23 @@ public List<CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestCont
.map(binding -> {
final CompletionStage<AclCreateResult> result;

if (!binding.entry().principal().startsWith(CustomAclBinding.USER_TYPE_PREFIX)) {
/* Reject ACL operations as invalid where the principal named in the ACL binding is the principal performing the operation */
log.info("Rejected attempt by user {} to create ACL binding with invalid principal name: {}",
String bindingPrincipal = binding.entry().principal();
if (!bindingPrincipal.startsWith(CustomAclBinding.USER_TYPE_PREFIX)) {
log.info("Rejected attempt by user {} to create ACL binding with incorrect prefixing: {}",
requestContext.principal().getName(),
binding.entry().principal());
result = errorResult(AclCreateResult::new, CREATE_ACL_INVALID_PRINCIPAL);
} else if (Objects.equals(toString(requestContext.principal()), binding.entry().principal())) {
/* Reject ACL operations as invalid where the principal named in the ACL binding is the principal performing the operation */
log.info("Rejected attempt by user {} to self-assign ACL binding",
requestContext.principal().getName());
result = errorResult(AclCreateResult::new, CREATE_ACL_INVALID_PRINCIPAL);
} else if (hasPrincipalBindings(binding.entry().principal())) {
/* Reject ACL operations as invalid where the principal named in the ACL binding is a principal with configured custom ACLs */
log.info("Rejected attempt by user {} to create ACL binding for principal {} with existing custom ACL configuration",
bindingPrincipal);
result = errorResult(AclCreateResult::new, format(INVALID_ACL_PRINCIPAL_NON_USER_PREFIXED_TEMPLATE, bindingPrincipal));
} else if (hasPrincipalBindings(bindingPrincipal)) {
/* Reject ACL operations as invalid where the principal named in the ACL binding is a restricted principal with configured custom ACLs */
log.info("Rejected attempt by user {} to create ACL binding for restricted principal {}",
requestContext.principal().getName(),
binding.entry().principal());
result = errorResult(AclCreateResult::new, CREATE_ACL_INVALID_PRINCIPAL);
bindingPrincipal);
result = errorResult(AclCreateResult::new, format(INVALID_ACL_PRINCIPAL_RESTRICTED_TEMPLATE, bindingPrincipal));
} else if (!isAclBindingAllowed(binding)) {
/* Request to create an ACL that is not explicitly allowed */
log.info("Rejected attempt by user {} to create ACL binding for principal {} with existing custom ACL configuration",
requestContext.principal().getName(),
binding.entry().principal());
bindingPrincipal);
result = errorResult(AclCreateResult::new, CREATE_ACL_INVALID_BINDING);
} else {
log.debug("Delegating createAcls to parent");
Expand Down
173 changes: 86 additions & 87 deletions src/test/java/io/bf2/kafka/authorizer/CustomAclAuthorizerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
Expand All @@ -23,7 +22,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;

import java.io.IOException;
import java.net.InetAddress;
Expand All @@ -34,17 +32,22 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class CustomAclAuthorizerTest {
Expand All @@ -59,9 +62,9 @@ static void initialize() throws IOException {

@BeforeEach
void setup() {
this.delegate = Mockito.mock(kafka.security.authorizer.AclAuthorizer.class);
this.delegate = mock(kafka.security.authorizer.AclAuthorizer.class);

Mockito.when(this.delegate.authorize(Mockito.any(AuthorizableRequestContext.class), Mockito.anyList()))
when(this.delegate.authorize(any(AuthorizableRequestContext.class), anyList()))
.thenAnswer(invocation -> {
int count = invocation.getArgument(1, List.class).size();
List<AuthorizationResult> results = new ArrayList<>(count);
Expand All @@ -71,7 +74,7 @@ void setup() {
return results;
});

Mockito.when(this.delegate.acls(Mockito.any(AclBindingFilter.class)))
when(this.delegate.acls(any(AclBindingFilter.class)))
.thenReturn(Collections.emptyList());
}

Expand Down Expand Up @@ -183,17 +186,13 @@ void testAuthorize(String title,
AuthorizationResult expectedResult) throws IOException {

KafkaPrincipal superUser = SecurityUtils.parseKafkaPrincipal("User:admin");
Mockito.when(this.delegate.isSuperUser(superUser)).thenReturn(Boolean.TRUE);
when(this.delegate.isSuperUser(superUser)).thenReturn(Boolean.TRUE);

try (CustomAclAuthorizer auth = new CustomAclAuthorizer(this.delegate)) {
auth.configure(config);

String[] principalComponents = principal.split(":");
AuthorizableRequestContext rc = Mockito.mock(AuthorizableRequestContext.class);
when(rc.clientAddress()).thenReturn(InetAddress.getLoopbackAddress());
when(rc.listenerName()).thenReturn(listener);
when(rc.principal()).thenReturn(new KafkaPrincipal(principalComponents[0], principalComponents[1]));
when(rc.requestType()).thenReturn(requestType);
AuthorizableRequestContext rc = createMockRequestContext(listener, principalComponents[0], principalComponents[1], requestType);

Action action = new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL), 0, true, true);
List<AuthorizationResult> results = auth.authorize(rc, Arrays.asList(action));
Expand All @@ -203,76 +202,47 @@ void testAuthorize(String title,
}
}


@ParameterizedTest
@CsvSource({
"Denied for principal missing 'User:' prefix, user2",
"Denied for principal in static configuration, User:anonymous",
"Denied for principal being the requestor, User:owner1"
"Denied for principal missing 'User:' prefix, user2,ACL rules including principal 'user2' are prohibited - principal is not type User",
"Denied for principal in static configuration, User:anonymous, ACL rules including principal 'User:anonymous' are prohibited - this principal is restricted"
})
void testCreateAclsDeniedForInvalidPrincipal(String title, String principal) throws IOException {
try (CustomAclAuthorizer auth = new CustomAclAuthorizer(this.delegate)) {
auth.configure(config);

AuthorizableRequestContext rc = Mockito.mock(AuthorizableRequestContext.class);
Mockito.when(rc.clientAddress()).thenReturn(InetAddress.getLoopbackAddress());
Mockito.when(rc.listenerName()).thenReturn("security-9095");
Mockito.when(rc.principal()).thenReturn(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "owner1"));
Mockito.when(rc.requestType()).thenReturn((int) ApiKeys.CREATE_ACLS.id);

AclBinding readUser1Topics = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "user1_", PatternType.PREFIXED),
new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
AclBinding writeUser1Topics = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "user1_", PatternType.PREFIXED),
new AccessControlEntry(principal, "*", AclOperation.WRITE, AclPermissionType.ALLOW));

var bindings = List.of(readUser1Topics, writeUser1Topics);
var results = auth.createAcls(rc, bindings);
void testCreateAclsDeniedForInvalidPrincipal(String title, String principal, String error) throws IOException {
AclBinding binding1 = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "user1_", PatternType.PREFIXED),
new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
AclBinding binding2 = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "user1_", PatternType.PREFIXED),
new AccessControlEntry(principal, "*", AclOperation.WRITE, AclPermissionType.ALLOW));
List<AclBinding> bindings = List.of(binding1, binding2);
createAclsAndExpect("owner1", bindings, (aclCreateResult) -> isFailedWithError(aclCreateResult, error), 2);
}

assertEquals(2, results.size());
assertTrue(results.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.map(AclCreateResult::exception)
.map(Optional::get)
.allMatch(e -> e instanceof ApiException
&& CustomAclAuthorizer.CREATE_ACL_INVALID_PRINCIPAL.equals(e.getMessage())));
}
@Test
void testCreateAclsAllowedWhenPrincipalGivesThemselvesACLRules() throws IOException {
CompletableFuture<AclCreateResult> successFuture = CompletableFuture.completedFuture(new AclCreateResult(null));
doReturn(List.of(successFuture)).when(delegate).createAcls(any(), anyList());

AclBinding binding = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "user1_", PatternType.PREFIXED),
new AccessControlEntry("User:" + "tom", "*", AclOperation.READ, AclPermissionType.ALLOW));
createAclsAndExpect("tom", List.of(binding), (aclCreateResult) -> aclCreateResult.exception().isEmpty(), 1);
}

@Test
void testCreateAclsDeniedForInvalidBinding() throws IOException {
try (CustomAclAuthorizer auth = new CustomAclAuthorizer(this.delegate)) {
auth.configure(config);

AuthorizableRequestContext rc = Mockito.mock(AuthorizableRequestContext.class);
Mockito.when(rc.clientAddress()).thenReturn(InetAddress.getLoopbackAddress());
Mockito.when(rc.listenerName()).thenReturn("security-9095");
Mockito.when(rc.principal()).thenReturn(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "owner1"));
Mockito.when(rc.requestType()).thenReturn((int) ApiKeys.CREATE_ACLS.id);

AclBinding readUser1Topics = new AclBinding(
new ResourcePattern(ResourceType.CLUSTER, "my-cluster", PatternType.LITERAL),
new AccessControlEntry("User:user1", "*", AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW));
AclBinding writeUser1Topics = new AclBinding(
new ResourcePattern(ResourceType.CLUSTER, "my-cluster", PatternType.LITERAL),
new AccessControlEntry("User:user1", "*", AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW));
AclBinding describleAllDelegationTokens = new AclBinding(
new ResourcePattern(ResourceType.DELEGATION_TOKEN, "*", PatternType.LITERAL),
new AccessControlEntry("User:user1", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));

var bindings = List.of(readUser1Topics, writeUser1Topics, describleAllDelegationTokens);
var results = auth.createAcls(rc, bindings);

assertEquals(3, results.size());
assertTrue(results.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.map(AclCreateResult::exception)
.map(Optional::get)
.allMatch(e -> e instanceof ApiException
&& CustomAclAuthorizer.CREATE_ACL_INVALID_BINDING.equals(e.getMessage())));
}
AclBinding readUser1Topics = new AclBinding(
new ResourcePattern(ResourceType.CLUSTER, "my-cluster", PatternType.LITERAL),
new AccessControlEntry("User:user1", "*", AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW));
AclBinding writeUser1Topics = new AclBinding(
new ResourcePattern(ResourceType.CLUSTER, "my-cluster", PatternType.LITERAL),
new AccessControlEntry("User:user1", "*", AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW));
AclBinding describleAllDelegationTokens = new AclBinding(
new ResourcePattern(ResourceType.DELEGATION_TOKEN, "*", PatternType.LITERAL),
new AccessControlEntry("User:user1", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
var bindings = List.of(readUser1Topics, writeUser1Topics, describleAllDelegationTokens);
createAclsAndExpect("owner1", bindings, (aclCreateResult) -> isFailedWithError(aclCreateResult, CustomAclAuthorizer.CREATE_ACL_INVALID_BINDING), 3);
}

@ParameterizedTest
Expand All @@ -287,7 +257,7 @@ void testCreateAclsDeniedForInvalidBinding() throws IOException {
void testPartitionLimitEnforcementFeatureFlag(String featureFlag, AuthorizationResult result, boolean isSuperUser)
throws Exception {
KafkaPrincipal superUser = SecurityUtils.parseKafkaPrincipal("User:admin");
Mockito.when(this.delegate.isSuperUser(superUser)).thenReturn(isSuperUser);
when(this.delegate.isSuperUser(superUser)).thenReturn(isSuperUser);
PartitionCounter partitionCounter = generateMockPartitionCounter(1001, false, Boolean.parseBoolean(featureFlag));
try (CustomAclAuthorizer auth = new CustomAclAuthorizer(this.delegate, partitionCounter)) {
Map<String, Object> customConfig = new HashMap<>(config);
Expand All @@ -298,28 +268,57 @@ void testPartitionLimitEnforcementFeatureFlag(String featureFlag, AuthorizationR

auth.configure(customConfig);

AuthorizableRequestContext rc = Mockito.mock(AuthorizableRequestContext.class);
Mockito.when(rc.clientAddress()).thenReturn(InetAddress.getLoopbackAddress());
Mockito.when(rc.listenerName()).thenReturn("security-9095");
Mockito.when(rc.principal()).thenReturn(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin"));
Mockito.when(rc.requestType()).thenReturn(37);
AuthorizableRequestContext rc = createMockRequestContext("security-9095", KafkaPrincipal.USER_TYPE, "admin", ApiKeys.CREATE_PARTITIONS.id);

Action action = new Action(AclOperation.ALTER, new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL), 0, true, true);
List<AuthorizationResult> results = auth.authorize(rc, Arrays.asList(action));
List<AuthorizationResult> results = auth.authorize(rc, List.of(action));

assertEquals(1, results.size());
assertEquals(result, results.get(0));
}
}


private void createAclsAndExpect(String user, List<AclBinding> bindings, Predicate<AclCreateResult> allMatch, int expectedResults) throws IOException {
try (CustomAclAuthorizer auth = new CustomAclAuthorizer(this.delegate)) {
auth.configure(config);

AuthorizableRequestContext rc = createMockRequestContext("security-9095", KafkaPrincipal.USER_TYPE, user, ApiKeys.CREATE_ACLS.id);

var results = auth.createAcls(rc, bindings);

List<AclCreateResult> createResults = results.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList());

assertEquals(expectedResults, createResults.size());
assertTrue(createResults.stream().allMatch(allMatch));
}
}

private static AuthorizableRequestContext createMockRequestContext(String listener, String principalType, String principalName, int requestType) {
AuthorizableRequestContext rc = mock(AuthorizableRequestContext.class);
when(rc.clientAddress()).thenReturn(InetAddress.getLoopbackAddress());
when(rc.listenerName()).thenReturn(listener);
when(rc.principal()).thenReturn(new KafkaPrincipal(principalType, principalName));
when(rc.requestType()).thenReturn(requestType);
return rc;
}

private static boolean isFailedWithError(AclCreateResult aclCreateResult, String error) {
return aclCreateResult.exception().isPresent() && error.equals(aclCreateResult.exception().get().getMessage());
}


private PartitionCounter generateMockPartitionCounter(int numPartitions, boolean response, boolean limitEnforced)
throws InterruptedException, ExecutionException, TimeoutException {
PartitionCounter partitionCounter = Mockito.mock(PartitionCounter.class);
Mockito.when(partitionCounter.getMaxPartitions()).thenReturn(1000);
Mockito.when(partitionCounter.getExistingPartitionCount()).thenReturn(numPartitions);
Mockito.when(partitionCounter.countExistingPartitions()).thenReturn(numPartitions);
Mockito.when(partitionCounter.reservePartitions(Mockito.anyInt())).thenReturn(response);
Mockito.when(partitionCounter.isLimitEnforced()).thenReturn(limitEnforced);
PartitionCounter partitionCounter = mock(PartitionCounter.class);
when(partitionCounter.getMaxPartitions()).thenReturn(1000);
when(partitionCounter.getExistingPartitionCount()).thenReturn(numPartitions);
when(partitionCounter.countExistingPartitions()).thenReturn(numPartitions);
when(partitionCounter.reservePartitions(anyInt())).thenReturn(response);
when(partitionCounter.isLimitEnforced()).thenReturn(limitEnforced);

return partitionCounter;
}
Expand Down

0 comments on commit c293e85

Please sign in to comment.