Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]PIP 167: Make it Configurable to Require Subscription Permission for Consumer #246

Merged
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# If a namespace has no roles configured in the subscription permission for a given subscription name,
# allow all roles that have permission to consume a the topic to consume from the subscription.
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
# permission.
grantImplicitPermissionOnSubscription=true

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=
Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# If a namespace has no roles configured in the subscription permission for a given subscription name,
# allow all roles that have permission to consume a the topic to consume from the subscription.
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
# permission.
grantImplicitPermissionOnSubscription=true

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=
Expand Down
6 changes: 6 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# If a namespace has no roles configured in the subscription permission for a given subscription name,
# allow all roles that have permission to consume a the topic to consume from the subscription.
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
# permission.
grantImplicitPermissionOnSubscription=true

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,16 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
+ " or last position eg: *.pulsar.service, pulsar.service.*)")
private boolean authorizationAllowWildcardsMatching = false;

@FieldContext(
category = CATEGORY_AUTHORIZATION,
doc = """
If a namespace has no roles configured in the subscription permission for a given subscription name,
allow all roles that have permission to consume a the topic to consume from the subscription.
See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
permission.
""")
private boolean grantImplicitPermissionOnSubscription = true;

@FieldContext(
category = CATEGORY_AUTHORIZATION,
doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,18 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
}
} else {
if (isNotBlank(subscription)) {
// validate if role is authorized to access subscription. (skip validation if authorization
// list is empty)
// Reject request if role is unauthorized to access subscription.
// If isGrantImplicitPermissionOnSubscription is true, role must be in the set of roles.
// Otherwise, set of roles must be null or empty, or role must be in set of roles.
Set<String> roles = policies.get().auth_policies
.getSubscriptionAuthentication().get(subscription);
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
boolean isUnauthorized;
if (roles == null || roles.isEmpty()) {
isUnauthorized = !conf.isGrantImplicitPermissionOnSubscription();
} else {
isUnauthorized = !roles.contains(role);
}
if (isUnauthorized) {
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
return CompletableFuture.completedFuture(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,83 @@ public void testSubscriberPermission() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testSubscriberPermissionRequired() throws Exception {
log.info("-- Starting {} test --", methodName);

// Simplify test by skipping configuration of topic level policies
conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
conf.setGrantImplicitPermissionOnSubscription(false);
setup();

final String tenantRole = "tenant-role";
final String subscriptionRole = "sub-role";
final String subscriptionName = "sub";
final String namespace = "my-property/ns-sub-auth-req";
final String topicName = "persistent://" + namespace + "/my-topic";
Authentication adminAuthentication = new ClientAuthentication("superUser");

clientAuthProviderSupportedRoles.add(subscriptionRole);

@Cleanup
PulsarAdmin superAdmin = spy(
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());

Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole);
@Cleanup
PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(tenantAdminAuthentication).build());

Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
@Cleanup
PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(subAdminAuthentication).build());

Authentication authentication = new ClientAuthentication(subscriptionRole);

superAdmin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());

// Initialize cluster and create namespace and topic
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
tenantAdmin.topics().createNonPartitionedTopic(topicName);
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));
assertNull(superAdmin.namespaces().getPublishRate(namespace));
replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(authentication));

// Cluster is initialized; the subscriptionRole has permission consume on the topic, but doesn't have
// explicit subscription permission. Verify that several operations which rely on subscription permission fail.
try {
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateTopicOperation for operation [RESET_CURSOR]"));
}
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Client is not authorized to subscribe"), e.getMessage());
}

// Grant the role permission.
tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Set.of(subscriptionRole));

// Verify the role now has permission to consume (reset cursor second to avoid 404 on subscription)
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testClearBacklogPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,18 @@ public void test() throws Exception {
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertFalse(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, null));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,null));
// Include a subscription name. The subscription doesn't need to exist for the purpose of this test, but this
// tests the case when service.getConfig().isGrantImplicitPermissionOnSubscription() is true because we
// have not granted permission for this role on the subscription named "sub".
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));

// Grant permission to a different role for sub and expect failure
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub", Set.of("no-ones-role"));
// Even though other-role has permission to consume from the topic, the "sub" subscription is locked down and
// only roles with permission granted via grantPermissionOnSubscription have permission to consume from that
// subscription.
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));

assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.proxy;

import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
* Class that initializes the WebSocketService disabling {@link WebSocketProxyConfiguration#setGrantImplicitPermissionOnSubscription(boolean)}.
* We must have this class on its own because the WebSocketProxyConfiguration is converted to the ServiceConfiguration
* on start up, so it is not a dynamic property that we can change after the service has started.
*/

@Test(groups = "websocket")
public class ProxyAuthorizationWithoutImplicitPermissionOnSubscriptionTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
private WebSocketService service;
private final String configClusterName = "c1";

@BeforeClass
@Override
protected void setup() throws Exception {
conf.setClusterName(configClusterName);
internalSetup();

WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
Set<String> superUser = Sets.newHashSet("");
config.setAuthorizationEnabled(true);
config.setSuperUserRoles(superUser);
config.setClusterName("c1");
config.setWebServicePort(Optional.of(0));
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
config.setGrantImplicitPermissionOnSubscription(false);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
}

@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
if (service != null) {
service.close();
}
log.info("Finished Cleaning Up Test setup");
}


@Test
public void testAuthorizationServiceDirectly() throws Exception {
AuthorizationService auth = service.getAuthorizationService();

assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.clusters().createCluster(configClusterName, ClusterData.builder().build());
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();

assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.of(AuthAction.produce));
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
EnumSet.of(AuthAction.consume));
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertFalse(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));

// Expect false because we disabled the implicit permission on subscription
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));

// Grant permission
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub", Set.of("other-role"));

// Expect only true for "other-role" because we granted permission for only that one
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));


assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null));

admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.allOf(AuthAction.class));
waitForChange();

assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null));

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
}

private static void waitForChange() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
+ "presents at first or last position. For example: *.pulsar.service,pulsar.service.*)")
private boolean authorizationAllowWildcardsMatching = false;

@FieldContext(
doc = """
If a namespace has no roles configured in the subscription permission for a given subscription name,
allow all roles that have permission to consume a the topic to consume from the subscription.
See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
permission.
""")
private boolean grantImplicitPermissionOnSubscription = true;

@FieldContext(doc = "Proxy authentication settings used to connect to brokers")
private String brokerClientAuthenticationPlugin;

Expand Down
Loading