Skip to content

Commit

Permalink
4/15 test
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Apr 16, 2024
1 parent 902c6f0 commit 3579faf
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1381,8 +1381,8 @@ private synchronized void doCleanup(String broker) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight non-system bundle override messages.", e);
}

Expand All @@ -1405,8 +1405,8 @@ private synchronized void doCleanup(String broker) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight system bundle override messages.", e);
}

Expand Down Expand Up @@ -1584,8 +1584,8 @@ protected void monitorOwnerships(List<String> brokers) {
}

try {
producer.flush();
} catch (PulsarClientException e) {
producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
} catch (Exception e) {
log.error("Failed to flush the in-flight messages.", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import lombok.Cleanup;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -88,6 +89,7 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
standalone.getConfig().setBrokerClientAuthenticationPlugin(
MockTokenAuthenticationProvider.MockAuthentication.class.getName());
}
standalone.getConfig().setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
final File bkDir = IOUtils.createTempDir("standalone", "bk");
standalone.setNumOfBk(1);
standalone.setBkDir(bkDir.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand Down Expand Up @@ -853,6 +854,9 @@ public void testDeleteNamespaces() throws Exception {

@Test
public void testDeleteNamespaceWithBundles() throws Exception {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return;
}
uriField.set(namespaces, uriInfo);
doReturn(URI.create(pulsar.getWebServiceAddress() + "/dummy/uri")).when(uriInfo).getRequestUri();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -1275,6 +1276,11 @@ public Object[][] authFunction () throws Exception {
@Test(dataProvider = "authFunction")
public void testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin> adminConsumer, OperationAuthType topicOpType)
throws Exception {

if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(getPulsarService())) {
return;
}

final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ protected void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("pulsar", tenantInfo);
admin.namespaces().createNamespace("pulsar/system", Set.of("test"));

setupSystemNamespace();
updateTenant("pulsar", tenantInfo);
admin.tenants().createTenant("public", tenantInfo);
admin.namespaces().createNamespace("public/default", Set.of("test"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.client.api.ClientBuilder;
Expand Down Expand Up @@ -62,6 +63,7 @@ protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());

// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand All @@ -33,8 +32,6 @@
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
Expand Down Expand Up @@ -112,9 +109,7 @@ public void testTransactionTopic() throws Exception {
txnLogBufferedWriterConfig.setBatchEnabled(false);
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
setupSystemNamespace();
createTransactionCoordinatorAssign();
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -86,10 +85,7 @@ public void setup() throws Exception {
new TenantInfoImpl(new HashSet<>(), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace("public/txn", 10);
admin.topics().createNonPartitionedTopic(CONSUME_TOPIC);

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
setupSystemNamespace();
createTransactionCoordinatorAssign(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:"
+ webServicePort).build());

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
setupSystemNamespace();
createTransactionCoordinatorAssign(numPartitionsOfTC);
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
Expand Down Expand Up @@ -242,4 +240,16 @@ public void checkSnapshotPublisherCount(String namespace, int expectCount) {
});
}

protected void setupSystemNamespace() throws Exception {
if (!admin.tenants().getTenants().contains(NamespaceName.SYSTEM_NAMESPACE.getTenant())) {
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
}

if (!admin.namespaces().getNamespaces(NamespaceName.SYSTEM_NAMESPACE.getTenant())
.contains(NamespaceName.SYSTEM_NAMESPACE.toString())) {
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -134,9 +133,7 @@ public void testRecoveryTransactionBufferWhenCommonTopicAndSystemTopicAtDifferen
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1, 4);
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
setupSystemNamespace();
pulsarServiceList.get(0).getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources()
Expand Down

0 comments on commit 3579faf

Please sign in to comment.