From 5b4e19aa4e3073ce9030d2283fb80bbaf177df21 Mon Sep 17 00:00:00 2001 From: Jeffrey Bolle Date: Mon, 1 Jan 2024 14:11:15 -0500 Subject: [PATCH] fix: Pulsar OAuth2 Authentication Factory (#12195) * do appropriate string check on config params. * test authentication factory creation with valid config. * Fixed line spacing, added testing visibility annotation. --- .../org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java | 4 +++- .../pulsar/PulsarPartitionLevelConnectionHandler.java | 6 +++--- .../apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java | 4 ++++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java index 8cdc8f864784..637899d649a6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.plugin.stream.pulsar; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.File; import java.net.MalformedURLException; @@ -105,7 +106,8 @@ public PulsarConfig(StreamConfig streamConfig, String subscriberId) { _audience = getConfigValue(streamConfigMap, OAUTH_AUDIENCE); } - protected void validateOAuthCredFile() { + @VisibleForTesting + private void validateOAuthCredFile() { try { URL credFilePathUrl = new URL(_credentialsFilePath); if (!"file".equals(credFilePathUrl.getProtocol())) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java index 163400bc3b60..53b27eb963f0 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java @@ -73,10 +73,10 @@ public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig strea } } - private Optional getAuthenticationFactory(PulsarConfig pulsarConfig) { + protected Optional getAuthenticationFactory(PulsarConfig pulsarConfig) { if (StringUtils.isNotBlank(pulsarConfig.getIssuerUrl()) - && StringUtils.isBlank(pulsarConfig.getAudience()) - && StringUtils.isBlank(pulsarConfig.getCredentialsFilePath())) { + && StringUtils.isNotBlank(pulsarConfig.getAudience()) + && StringUtils.isNotBlank(pulsarConfig.getCredentialsFilePath())) { try { return Optional.of(AuthenticationFactoryOAuth2.clientCredentials( new URL(pulsarConfig.getIssuerUrl()), diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java index 3c7eeffe8079..486944b70384 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java @@ -31,6 +31,7 @@ import org.testng.Assert; import org.testng.annotations.Test; +import static org.junit.Assert.assertTrue; public class PulsarConfigTest { public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME"; @@ -137,6 +138,9 @@ public void testParsingConfigForOAuth() throws Exception { Assert.assertEquals(pulsarConfig.getCredentialsFilePath(), "file://" + testFile.toFile().getAbsolutePath()); Assert.assertEquals(pulsarConfig.getAudience(), "urn:test:test"); + PulsarPartitionLevelConnectionHandler pulsarPartitionLevelConnectionHandler = + new PulsarPartitionLevelConnectionHandler("testId", streamConfig); + assertTrue(pulsarPartitionLevelConnectionHandler.getAuthenticationFactory(pulsarConfig).isPresent()); } catch (Exception e) { Assert.fail("Should not throw exception", e); } finally {