Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test filter classloader #280

Draft
wants to merge 3 commits into
base: 3.1_fx_merged
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public List<EntryFilter> loadEntryFiltersForPolicy(EntryFilters policy)
return loadEntryFilters(entryFilterList);
}

private List<EntryFilter> loadEntryFilters(Collection<String> entryFilterNames)
public List<EntryFilter> loadEntryFilters(Collection<String> entryFilterNames)
throws IOException {
ImmutableMap.Builder<String, EntryFilter> builder = ImmutableMap.builder();
for (String filterName : entryFilterNames) {
Expand Down Expand Up @@ -149,6 +149,14 @@ private void initialize() throws IOException {
metadata.setDefinition(def);
metadata.setArchivePath(archive);

if (entryFilterDefinitions.containsKey(def.getName())) {
Path oldPath = entryFilterDefinitions.get(def.getName()).getArchivePath();
log.error("Entry filter with name `{}` already loaded from {}, replacing with {}",
def.getName(), oldPath, archive);
cachedClassLoaders.remove(classLoaderKey(oldPath)).close();
entryFilterDefinitions.remove(def.getName());
}

entryFilterDefinitions.put(def.getName(), metadata);
} catch (Throwable t) {
log.warn("Failed to load entry filters from {}."
Expand Down Expand Up @@ -216,6 +224,7 @@ private NarClassLoader loadNarClassLoader(Path archivePath) {
return cachedClassLoaders
.computeIfAbsent(absolutePath, narFilePath -> {
try {
log.info("Loading NAR from {}", archivePath);
final File narFile = archivePath.toAbsolutePath().toFile();
return NarClassLoaderBuilder.builder()
.narFile(narFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,32 @@

import static org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
Expand All @@ -41,9 +54,11 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -65,6 +80,9 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
@BeforeClass
@Override
protected void setup() throws Exception {
resetConfig();
//this.conf.setEntryFilterNames(List.of("jms"));
this.conf.setEntryFiltersDirectory("/Users/andreyyegorov/src/pulsar-jms/pulsar-jms-filters/target");
super.internalSetup();
prepareData();
}
Expand All @@ -75,6 +93,77 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

public void checkEntryFilter() throws Exception {
final String normalTopic = "persistent://" + NAMESPACE1 + "/normal_topic";

admin.topics().createPartitionedTopic(normalTopic, 3);
BrokerService brokerService = pulsar.getBrokerService();
List<EntryFilter> filters = brokerService.getEntryFilterProvider().getBrokerEntryFilters();

FilterContext filterContext = new FilterContext();
Consumer cons = mock(Consumer.class);
filterContext.setConsumer(cons);
when(cons.getMetadata()).thenReturn(Map.of(
"jms.filtering", "true",
"jms.selector", "param < 1"));

Subscription sub = mock(Subscription.class);
when(sub.getName()).thenReturn("sub");
when(sub.getTopicName()).thenReturn(normalTopic);
when(sub.getSubscriptionProperties()).thenReturn(Map.of(
"jms.filtering", "true",
"jms.selector", "param < 1"));
Topic topic = mock(Topic.class);
when(topic.getName()).thenReturn(normalTopic);
when(topic.getBrokerService()).thenReturn(brokerService);
when(sub.getTopic()).thenReturn(topic);

filterContext.setSubscription(sub);
MessageMetadata meta = mock(org.apache.pulsar.common.api.proto.MessageMetadata.class);
filterContext.setMsgMetadata(meta);
Entry entry = EntryImpl.create(1, 1, "test".getBytes());
List<EntryFilter> filters0 = brokerService.getEntryFilterProvider().loadEntryFilters(List.of("jms"));
try
{
for (int i = 0; i < 10; i++) {
if (!filters.isEmpty()) {
filters.get(0).filterEntry(entry, filterContext);
}

filters0.get(0).filterEntry(entry, filterContext);

{
List<EntryFilter> filters2 = brokerService.getEntryFilterProvider().loadEntryFilters(List.of("jms"));
filters2.get(0).filterEntry(entry, filterContext);
filters2.get(0).close();
}
List<EntryFilter> filters3 = brokerService.getEntryFilterProvider().loadEntryFilters(List.of("jms"));
// don't use it, just load

//brokerService.getEntryFilterProvider().close();
for (int j = 0; j < 10; j++) {
System.gc();
System.runFinalization();
Thread.sleep(1000);
}

List<EntryFilter> filters4 = brokerService.getEntryFilterProvider().loadEntryFilters(List.of("jms"));

filters3.get(0).close();

filters4.get(0).filterEntry(entry, filterContext);
filters4.get(0).close();
}
} finally {
filters0.get(0).close();
if (!filters.isEmpty()) {
filters.get(0).close();
}
entry.release();
}
}


@Test
public void testSchemaCompatibility() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
Expand Down
Loading