Skip to content

Commit

Permalink
[feat][proxy] PIP-250: Add proxyVersion to CommandConnect (#19618)
Browse files Browse the repository at this point in the history
PIP #19623 
Relates to #19540

### Motivation

In order to get more information about connections, it is helpful for the proxy to supply its version to the broker.

### Modifications

* Add `proxy_version` field to the `CommandConnect` protobuf message 
* Update proxy and broker to handle this new field

### Verifying this change

New tests are added with this PR.

### Does this pull request potentially affect one of the following parts:

- [x] The binary protocol

This will be submitted as part of a PIP.

### Documentation

- [x] `doc-not-needed`
  • Loading branch information
michaeljmarshall authored Apr 11, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent b136cab commit 1545396
Showing 9 changed files with 105 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -464,7 +464,7 @@ public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
}
}

private boolean isProxyRole(String role) {
public boolean isProxyRole(String role) {
return role != null && conf.getProxyRoles().contains(role);
}

Original file line number Diff line number Diff line change
@@ -210,6 +210,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private int pendingSendRequest = 0;
private final String replicatorPrefix;
private String clientVersion = null;
private String proxyVersion = null;
private int nonPersistentPendingMessages = 0;
private final int maxNonPersistentPendingMessages;
private String originalPrincipal = null;
@@ -320,7 +321,10 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
return;
}
log.info("New connection from {}", remoteAddress);
if (log.isDebugEnabled()) {
// Connection information is logged after a successful Connect command is processed.
log.debug("New connection from {}", remoteAddress);
}
this.ctx = ctx;
this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, this);
this.service.getPulsarStats().recordConnectionCreate();
@@ -690,6 +694,15 @@ private void completeConnect(int clientProtoVersion, String clientVersion) {
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
return;
}
if (proxyVersion != null && !service.getAuthorizationService().isProxyRole(authRole)) {
// Only allow proxyVersion to be set when connecting with a proxy
state = State.Failed;
service.getPulsarStats().recordConnectionCreateFail();
final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError,
"Must not set proxyVersion without connecting as a ProxyRole.");
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
return;
}
}
maybeScheduleAuthenticationCredentialsRefresh();
}
@@ -703,6 +716,18 @@ private void completeConnect(int clientProtoVersion, String clientVersion) {
if (isNotBlank(clientVersion)) {
this.clientVersion = clientVersion.intern();
}
if (!service.isAuthenticationEnabled()) {
log.info("[{}] connected with clientVersion={}, clientProtocolVersion={}, proxyVersion={}", remoteAddress,
clientVersion, clientProtoVersion, proxyVersion);
} else if (originalPrincipal != null) {
log.info("[{}] connected role={} and originalAuthRole={} using authMethod={}, clientVersion={}, "
+ "clientProtocolVersion={}, proxyVersion={}", remoteAddress, authRole, originalPrincipal,
authMethod, clientVersion, clientProtoVersion, proxyVersion);
} else {
log.info("[{}] connected with role={} using authMethod={}, clientVersion={}, clientProtocolVersion={}, "
+ "proxyVersion={}", remoteAddress, authRole, authMethod, clientVersion, clientProtoVersion,
proxyVersion);
}
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionCreated(this);
}
@@ -761,10 +786,6 @@ public void authChallengeSuccessCallback(AuthData authChallenge,
authenticateOriginalData(clientProtocolVersion, clientVersion);
} else {
completeConnect(clientProtocolVersion, clientVersion);
if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
}
}
} else {
// Refresh the auth data
@@ -948,6 +969,10 @@ protected void handleConnect(CommandConnect connect) {
features.copyFrom(connect.getFeatureFlags());
}

if (connect.hasProxyVersion()) {
proxyVersion = connect.getProxyVersion();
}

if (!service.isAuthenticationEnabled()) {
completeConnect(clientProtocolVersion, clientVersion);
return;
@@ -3266,6 +3291,11 @@ public String getClientVersion() {
return clientVersion;
}

@Override
public String getProxyVersion() {
return proxyVersion;
}

@VisibleForTesting
void setAutoReadDisabledRateLimiting(boolean isLimiting) {
this.autoReadDisabledRateLimiting = isLimiting;
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
public interface TransportCnx {

String getClientVersion();
String getProxyVersion();

SocketAddress clientAddress();

Original file line number Diff line number Diff line change
@@ -297,6 +297,21 @@ public void testConnectCommandWithProtocolVersion() throws Exception {
channel.finish();
}

@Test(timeOut = 30000)
public void testConnectCommandWithProxyVersion() throws Exception {
resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

ByteBuf clientCommand = Commands.newConnect("none", null, 1, null, null, null, null, null,
"my-pulsar-proxy");
channel.writeInbound(clientCommand);

assertEquals(serverCnx.getState(), State.Connected);
assertEquals(serverCnx.getProxyVersion(), "my-pulsar-proxy");
channel.finish();
}

@DataProvider(name = "clientVersions")
public Object[][] clientVersions() {
return new Object[][]{
@@ -512,6 +527,32 @@ public void testConnectCommandWithPassingOriginalPrincipal() throws Exception {
channel.finish();
}

@Test
public void testConnectWithNonProxyRoleAndProxyVersion() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();

when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthorizationEnabled(true);

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

ByteBuf clientCommand = Commands.newConnect(authMethodName, AuthData.of("pass.pass".getBytes()),
1, null, null, null, null, null, "my-pulsar-proxy");
channel.writeInbound(clientCommand);
Object response = getResponse();
assertTrue(response instanceof CommandError);
assertEquals(((CommandError) response).getError(), ServerError.AuthorizationError);
assertEquals(serverCnx.getState(), State.Failed);
channel.finish();
}

@Test
public void testAuthChallengePrincipalChangeFails() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider();
Original file line number Diff line number Diff line change
@@ -234,11 +234,22 @@ public static ByteBuf newConnect(String authMethodName, String authData, int pro
public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion,
String targetBroker, String originalPrincipal, AuthData originalAuthData,
String originalAuthMethod) {
return newConnect(authMethodName, authData, protocolVersion, libVersion, targetBroker, originalPrincipal,
originalAuthData, originalAuthMethod, null);
}

public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion,
String targetBroker, String originalPrincipal, AuthData originalAuthData,
String originalAuthMethod, String proxyVersion) {
BaseCommand cmd = localCmd(Type.CONNECT);
CommandConnect connect = cmd.setConnect()
.setClientVersion(libVersion != null ? libVersion : "Pulsar Client")
.setAuthMethodName(authMethodName);

if (proxyVersion != null) {
connect.setProxyVersion(proxyVersion);
}

if (targetBroker != null) {
// When connecting through a proxy, we need to specify which broker do we want to be proxied through
connect.setProxyToBrokerUrl(targetBroker);
6 changes: 4 additions & 2 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
@@ -268,7 +268,7 @@ enum ProtocolVersion {
}

message CommandConnect {
required string client_version = 1;
required string client_version = 1; // The version of the client. Proxy should forward client's client_version.
optional AuthMethod auth_method = 2; // Deprecated. Use "auth_method_name" instead.
optional string auth_method_name = 5;
optional bytes auth_data = 3;
@@ -291,6 +291,8 @@ message CommandConnect {

// Feature flags
optional FeatureFlags feature_flags = 10;

optional string proxy_version = 11; // Version of the proxy. Should only be forwarded by a proxy.
}

message FeatureFlags {
@@ -308,7 +310,7 @@ message CommandConnected {
}

message CommandAuthResponse {
optional string client_version = 1;
optional string client_version = 1; // The version of the client. Proxy should forward client's client_version.
optional AuthData response = 2;
optional int32 protocol_version = 3 [default = 0];
}
Original file line number Diff line number Diff line change
@@ -327,7 +327,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf command = Commands.newConnect(
authentication.getAuthMethodName(), authData, protocolVersion,
proxyConnection.clientVersion, null /* target broker */,
originalPrincipal, clientAuthData, clientAuthMethod);
originalPrincipal, clientAuthData, clientAuthMethod, PulsarVersion.getVersion());
writeAndFlush(command);
isTlsOutboundChannel = ProxyConnection.isTlsChannel(inboundChannel);
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import io.netty.channel.EventLoopGroup;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
@@ -66,7 +67,7 @@ protected ByteBuf newConnectCommand() throws Exception {
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
proxyConnection.clientVersion, proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
clientAuthMethod);
clientAuthMethod, PulsarVersion.getVersion());
}

@Override
Original file line number Diff line number Diff line change
@@ -497,6 +497,15 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
return;
}

if (connect.hasProxyVersion()) {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client illegally provided proxyVersion.", remoteAddress);
}
state = State.Closing;
writeAndFlushAndClose(Commands.newError(-1, ServerError.NotAllowedError, "Must not provide proxyVersion"));
return;
}

try {
// init authn
this.clientConf = createClientConfiguration();

0 comments on commit 1545396

Please sign in to comment.