Skip to content

Commit

Permalink
fix: Pulsar OAuth2 Authentication Factory (apache#12195)
Browse files Browse the repository at this point in the history
* do appropriate string check on config params.

* test authentication factory creation with valid config.

* Fixed line spacing, added testing visibility annotation.
  • Loading branch information
JeffBolle authored Jan 1, 2024
1 parent 50912eb commit 5b4e19a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.stream.pulsar;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -105,7 +106,8 @@ public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
_audience = getConfigValue(streamConfigMap, OAUTH_AUDIENCE);
}

protected void validateOAuthCredFile() {
@VisibleForTesting
private void validateOAuthCredFile() {
try {
URL credFilePathUrl = new URL(_credentialsFilePath);
if (!"file".equals(credFilePathUrl.getProtocol())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig strea
}
}

private Optional<Authentication> getAuthenticationFactory(PulsarConfig pulsarConfig) {
protected Optional<Authentication> getAuthenticationFactory(PulsarConfig pulsarConfig) {
if (StringUtils.isNotBlank(pulsarConfig.getIssuerUrl())
&& StringUtils.isBlank(pulsarConfig.getAudience())
&& StringUtils.isBlank(pulsarConfig.getCredentialsFilePath())) {
&& StringUtils.isNotBlank(pulsarConfig.getAudience())
&& StringUtils.isNotBlank(pulsarConfig.getCredentialsFilePath())) {
try {
return Optional.of(AuthenticationFactoryOAuth2.clientCredentials(
new URL(pulsarConfig.getIssuerUrl()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.junit.Assert.assertTrue;

public class PulsarConfigTest {
public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
Expand Down Expand Up @@ -137,6 +138,9 @@ public void testParsingConfigForOAuth() throws Exception {
Assert.assertEquals(pulsarConfig.getCredentialsFilePath(),
"file://" + testFile.toFile().getAbsolutePath());
Assert.assertEquals(pulsarConfig.getAudience(), "urn:test:test");
PulsarPartitionLevelConnectionHandler pulsarPartitionLevelConnectionHandler =
new PulsarPartitionLevelConnectionHandler("testId", streamConfig);
assertTrue(pulsarPartitionLevelConnectionHandler.getAuthenticationFactory(pulsarConfig).isPresent());
} catch (Exception e) {
Assert.fail("Should not throw exception", e);
} finally {
Expand Down

0 comments on commit 5b4e19a

Please sign in to comment.