diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java index f93e561542eeb..dfe1d2066dd63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -100,7 +100,7 @@ public List loadEntryFiltersForPolicy(EntryFilters policy) return loadEntryFilters(entryFilterList); } - private List loadEntryFilters(Collection entryFilterNames) + public List loadEntryFilters(Collection entryFilterNames) throws IOException { ImmutableMap.Builder builder = ImmutableMap.builder(); for (String filterName : entryFilterNames) { @@ -149,6 +149,14 @@ private void initialize() throws IOException { metadata.setDefinition(def); metadata.setArchivePath(archive); + if (entryFilterDefinitions.containsKey(def.getName())) { + Path oldPath = entryFilterDefinitions.get(def.getName()).getArchivePath(); + log.error("Entry filter with name `{}` already loaded from {}, replacing with {}", + def.getName(), oldPath, archive); + cachedClassLoaders.remove(classLoaderKey(oldPath)).close(); + entryFilterDefinitions.remove(def.getName()); + } + entryFilterDefinitions.put(def.getName(), metadata); } catch (Throwable t) { log.warn("Failed to load entry filters from {}." @@ -216,6 +224,7 @@ private NarClassLoader loadNarClassLoader(Path archivePath) { return cachedClassLoaders .computeIfAbsent(absolutePath, narFilePath -> { try { + log.info("Loading NAR from {}", archivePath); final File narFile = archivePath.toAbsolutePath().toFile(); return NarClassLoaderBuilder.builder() .narFile(narFile) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index 44a4de5e8a923..909ef18e98811 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -20,19 +20,32 @@ import static org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import com.google.common.collect.Sets; import java.util.HashSet; +import java.util.List; +import java.util.Map; + import lombok.Cleanup; +import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; +import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.broker.service.plugin.FilterContext; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.events.PulsarEvent; @@ -41,9 +54,11 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -65,6 +80,9 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa @BeforeClass @Override protected void setup() throws Exception { + resetConfig(); + //this.conf.setEntryFilterNames(List.of("jms")); + this.conf.setEntryFiltersDirectory("/Users/andreyyegorov/src/pulsar-jms/pulsar-jms-filters/target"); super.internalSetup(); prepareData(); } @@ -75,6 +93,77 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + public void checkEntryFilter() throws Exception { + final String normalTopic = "persistent://" + NAMESPACE1 + "/normal_topic"; + + admin.topics().createPartitionedTopic(normalTopic, 3); + BrokerService brokerService = pulsar.getBrokerService(); + List filters = brokerService.getEntryFilterProvider().getBrokerEntryFilters(); + + FilterContext filterContext = new FilterContext(); + Consumer cons = mock(Consumer.class); + filterContext.setConsumer(cons); + when(cons.getMetadata()).thenReturn(Map.of( + "jms.filtering", "true", + "jms.selector", "param < 1")); + + Subscription sub = mock(Subscription.class); + when(sub.getName()).thenReturn("sub"); + when(sub.getTopicName()).thenReturn(normalTopic); + when(sub.getSubscriptionProperties()).thenReturn(Map.of( + "jms.filtering", "true", + "jms.selector", "param < 1")); + Topic topic = mock(Topic.class); + when(topic.getName()).thenReturn(normalTopic); + when(topic.getBrokerService()).thenReturn(brokerService); + when(sub.getTopic()).thenReturn(topic); + + filterContext.setSubscription(sub); + MessageMetadata meta = mock(org.apache.pulsar.common.api.proto.MessageMetadata.class); + filterContext.setMsgMetadata(meta); + Entry entry = EntryImpl.create(1, 1, "test".getBytes()); + List filters0 = brokerService.getEntryFilterProvider().loadEntryFilters(List.of("jms")); + try + { + for (int i = 0; i < 10; i++) { + if (!filters.isEmpty()) { + filters.get(0).filterEntry(entry, filterContext); + } + + filters0.get(0).filterEntry(entry, filterContext); + + { + List filters2 = brokerService.getEntryFilterProvider().loadEntryFilters(List.of("jms")); + filters2.get(0).filterEntry(entry, filterContext); + filters2.get(0).close(); + } + List filters3 = brokerService.getEntryFilterProvider().loadEntryFilters(List.of("jms")); + // don't use it, just load + + //brokerService.getEntryFilterProvider().close(); + for (int j = 0; j < 10; j++) { + System.gc(); + System.runFinalization(); + Thread.sleep(1000); + } + + List filters4 = brokerService.getEntryFilterProvider().loadEntryFilters(List.of("jms")); + + filters3.get(0).close(); + + filters4.get(0).filterEntry(entry, filterContext); + filters4.get(0).close(); + } + } finally { + filters0.get(0).close(); + if (!filters.isEmpty()) { + filters.get(0).close(); + } + entry.release(); + } + } + + @Test public void testSchemaCompatibility() throws Exception { TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory