Skip to content

Commit

Permalink
[fix][broker] Invoke custom BrokerInterceptor's onFilter method if …
Browse files Browse the repository at this point in the history
…it's defined (apache#23676)
  • Loading branch information
jiangpengcheng authored Dec 4, 2024
1 parent 1c1a5cc commit 7f7e12b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
Expand Down Expand Up @@ -272,6 +273,18 @@ public void initialize(PulsarService pulsarService) throws Exception {
}
}

@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws ServletException, IOException {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onFilter(request, response, chain);
} finally {
Thread.currentThread().setContextClassLoader(previousContext);
}
}

@Override
public void close() {
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
Expand Down Expand Up @@ -258,7 +260,17 @@ private static class FilterInitializer {
// Enable PreInterceptFilter only when interceptors are enabled
filterHolders.add(
new FilterHolder(new PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler)));
filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService.getBrokerInterceptor())));
// The `ProcessHandlerFilter` is used to overwrite `doFilter` method, which cannot be called multiple
// times inside one `Filter`, so we cannot use one `ProcessHandlerFilter` with a `BrokerInterceptors` to
// hold all interceptors, instead we need to create a `ProcessHandlerFilter` for each `interceptor`.
if (pulsarService.getBrokerInterceptor() instanceof BrokerInterceptors) {
for (BrokerInterceptor interceptor: ((BrokerInterceptors) pulsarService.getBrokerInterceptor())
.getInterceptors().values()) {
filterHolders.add(new FilterHolder(new ProcessHandlerFilter(interceptor)));
}
} else {
filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService.getBrokerInterceptor())));
}
}

if (config.isAuthenticationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pulsar.tests.integration.plugins;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.bookkeeper.mledger.Entry;
Expand Down Expand Up @@ -122,7 +124,9 @@ public void txnEnded(String txnID, long txnAction) {
}

@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain) {
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws ServletException, IOException {
log.info("onFilter");
chain.doFilter(request, response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void test(Supplier<String> serviceUrlSupplier) throws Exception {
"consumerCreated",
"messageProduced",
"beforeSendMessage: OK",
"onFilter",
}) {
assertTrue(log.contains("LoggingBrokerInterceptor - " + line), "Log did not contain line '" + line + "'");
}
Expand Down

0 comments on commit 7f7e12b

Please sign in to comment.