Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Azure BlobServiceClient to use workloadIdentity #12469

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion plugins/repository-azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ dependencies {
api "io.netty:netty-resolver-dns:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
implementation project(':modules:transport-netty4')
api 'com.azure:azure-storage-blob:12.23.0'
api 'com.azure:azure-storage-blob:12.25.1'
api 'com.azure:azure-identity:1.11.2'
api 'com.microsoft.azure:msal4j:1.14.2'
api 'com.nimbusds:oauth2-oidc-sdk:11.10'
api 'net.minidev:json-smart:1.0.6.3'
api "io.projectreactor.netty:reactor-netty-core:${versions.reactor_netty}"
api "io.projectreactor.netty:reactor-netty-http:${versions.reactor_netty}"
api "org.slf4j:slf4j-api:${versions.slf4j}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public List<Setting<?>> getSettings() {
AzureStorageSettings.ACCOUNT_SETTING,
AzureStorageSettings.KEY_SETTING,
AzureStorageSettings.SAS_TOKEN_SETTING,
AzureStorageSettings.FEDERATED_TOKEN_FILE_SETTING,
AzureStorageSettings.ENDPOINT_SUFFIX_SETTING,
AzureStorageSettings.TIMEOUT_SETTING,
AzureStorageSettings.MAX_RETRIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.repositories.azure;

import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpPipelinePosition;
Expand All @@ -40,11 +41,15 @@
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.identity.WorkloadIdentityCredential;
import com.azure.identity.WorkloadIdentityCredentialBuilder;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.implementation.connectionstring.StorageConnectionString;
Expand All @@ -61,12 +66,17 @@

import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.InvalidKeyException;
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -216,8 +226,12 @@ protected PasswordAuthentication getPasswordAuthentication() {
* <a href="https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md#miscellaneous">migration guide</a> for mode details:
*/
private BlobServiceClientBuilder applyLocationMode(final BlobServiceClientBuilder builder, final AzureStorageSettings settings) {
final StorageConnectionString storageConnectionString = StorageConnectionString.create(settings.getConnectString(), logger);
final StorageEndpoint endpoint = storageConnectionString.getBlobEndpoint();
StorageEndpoint endpoint;
if (settings.useWorkloadIdentity()) {
endpoint = new StorageEndpoint(URI.create(settings.getEndpoint()));
} else {
endpoint = StorageConnectionString.create(settings.getConnectString(), logger).getBlobEndpoint();
}

if (endpoint == null || endpoint.getPrimaryUri() == null) {
throw new IllegalArgumentException("connectionString missing required settings to derive blob service primary endpoint.");
Expand Down Expand Up @@ -247,9 +261,31 @@ private BlobServiceClientBuilder applyLocationMode(final BlobServiceClientBuilde
return builder;
}

private static BlobServiceClientBuilder createClientBuilder(AzureStorageSettings settings) throws InvalidKeyException,
URISyntaxException {
return SocketAccess.doPrivilegedException(() -> new BlobServiceClientBuilder().connectionString(settings.getConnectString()));
private static BlobServiceClientBuilder createClientBuilder(AzureStorageSettings settings)
throws InvalidKeyException,
URISyntaxException {
return SocketAccess.doPrivilegedException(() -> {
PrivilegedExceptionAction<ExecutorService> privilegedAction = Executors::newSingleThreadExecutor;
ExecutorService executorService = AccessController.doPrivileged(privilegedAction);
BlobServiceClientBuilder b =
new BlobServiceClientBuilder()
.credential(new DefaultAzureCredentialBuilder().executorService(executorService).build());

if (settings.useWorkloadIdentity()) {
TokenCredential workloadIdentityCredential =
new WorkloadIdentityCredentialBuilder()
.tokenFilePath(settings.getFederatedTokenFile())
.executorService(executorService)
.build();
b.endpoint(settings.getEndpoint())
.credential(workloadIdentityCredential);
} else if (!settings.getConnectString().isEmpty()) {
b.connectionString(settings.getConnectString());
}

return b;
}
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,27 @@ final class AzureStorageSettings {
key -> SecureSetting.secureString(key, null)
);

/** Azure web identity token */
public static final AffixSetting<String> FEDERATED_TOKEN_FILE_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"federated_token_file",
key -> Setting.simpleString(key, Property.NodeScope),
() -> ACCOUNT_SETTING
);

/** Azure SAS token */
public static final AffixSetting<SecureString> SAS_TOKEN_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"sas_token",
key -> SecureSetting.secureString(key, null)
AZURE_CLIENT_PREFIX_KEY,
"sas_token",
key -> SecureSetting.secureString(key, null)
);

/** max_retries: Number of retries in case of Azure errors. Defaults to 3 (RetryPolicy.DEFAULT_CLIENT_RETRY_COUNT). */
public static final AffixSetting<Integer> MAX_RETRIES_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"max_retries",
(key) -> Setting.intSetting(key, 3, Setting.Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);
/**
* Azure endpoint suffix. Default to core.windows.net (CloudStorageAccount.DEFAULT_DNS).
Expand All @@ -100,61 +107,54 @@ final class AzureStorageSettings {
AZURE_CLIENT_PREFIX_KEY,
"timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueMinutes(-1), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

// See please NettyAsyncHttpClientBuilder#DEFAULT_CONNECT_TIMEOUT
public static final AffixSetting<TimeValue> CONNECT_TIMEOUT_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"connect.timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(10), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

// See please NettyAsyncHttpClientBuilder#DEFAULT_WRITE_TIMEOUT
public static final AffixSetting<TimeValue> WRITE_TIMEOUT_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"write.timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

// See please NettyAsyncHttpClientBuilder#DEFAULT_READ_TIMEOUT
public static final AffixSetting<TimeValue> READ_TIMEOUT_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"read.timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

// See please NettyAsyncHttpClientBuilder#DEFAULT_RESPONSE_TIMEOUT
public static final AffixSetting<TimeValue> RESPONSE_TIMEOUT_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"response.timeout",
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

/** The type of the proxy to connect to azure through. Can be direct (no proxy, default), http or socks */
public static final AffixSetting<ProxySettings.ProxyType> PROXY_TYPE_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"proxy.type",
(key) -> new Setting<>(key, "direct", s -> ProxySettings.ProxyType.valueOf(s.toUpperCase(Locale.ROOT)), Property.NodeScope),
() -> ACCOUNT_SETTING,
() -> KEY_SETTING
() -> ACCOUNT_SETTING
);

/** The host name of a proxy to connect to azure through. */
public static final AffixSetting<String> PROXY_HOST_SETTING = Setting.affixKeySetting(
AZURE_CLIENT_PREFIX_KEY,
"proxy.host",
(key) -> Setting.simpleString(key, Property.NodeScope),
() -> KEY_SETTING,
() -> ACCOUNT_SETTING,
() -> PROXY_TYPE_SETTING
);
Expand All @@ -164,7 +164,6 @@ final class AzureStorageSettings {
AZURE_CLIENT_PREFIX_KEY,
"proxy.port",
(key) -> Setting.intSetting(key, 0, 0, 65535, Setting.Property.NodeScope),
() -> KEY_SETTING,
() -> ACCOUNT_SETTING,
() -> PROXY_TYPE_SETTING,
() -> PROXY_HOST_SETTING
Expand All @@ -175,7 +174,6 @@ final class AzureStorageSettings {
AZURE_CLIENT_PREFIX_KEY,
"proxy.username",
key -> SecureSetting.secureString(key, null),
() -> KEY_SETTING,
() -> ACCOUNT_SETTING,
() -> PROXY_TYPE_SETTING,
() -> PROXY_HOST_SETTING
Expand All @@ -186,7 +184,6 @@ final class AzureStorageSettings {
AZURE_CLIENT_PREFIX_KEY,
"proxy.password",
key -> SecureSetting.secureString(key, null),
() -> KEY_SETTING,
() -> ACCOUNT_SETTING,
() -> PROXY_TYPE_SETTING,
() -> PROXY_HOST_SETTING,
Expand All @@ -195,6 +192,7 @@ final class AzureStorageSettings {

private final String account;
private final String connectString;
private final String federatedTokenFile;
private final String endpointSuffix;
private final TimeValue timeout;
private final int maxRetries;
Expand All @@ -207,20 +205,22 @@ final class AzureStorageSettings {

// copy-constructor
private AzureStorageSettings(
String account,
String connectString,
String endpointSuffix,
TimeValue timeout,
int maxRetries,
LocationMode locationMode,
TimeValue connectTimeout,
TimeValue writeTimeout,
TimeValue readTimeout,
TimeValue responseTimeout,
ProxySettings proxySettings
String account,
String connectString,
String federatedTokenFile,
String endpointSuffix,
TimeValue timeout,
int maxRetries,
LocationMode locationMode,
TimeValue connectTimeout,
TimeValue writeTimeout,
TimeValue readTimeout,
TimeValue responseTimeout,
ProxySettings proxySettings
) {
this.account = account;
this.connectString = connectString;
this.federatedTokenFile = federatedTokenFile;
this.endpointSuffix = endpointSuffix;
this.timeout = timeout;
this.maxRetries = maxRetries;
Expand All @@ -235,6 +235,7 @@ private AzureStorageSettings(
private AzureStorageSettings(
String account,
String key,
String federatedTokenFile,
String sasToken,
String endpointSuffix,
TimeValue timeout,
Expand All @@ -246,7 +247,8 @@ private AzureStorageSettings(
ProxySettings proxySettings
) {
this.account = account;
this.connectString = buildConnectString(account, key, sasToken, endpointSuffix);
this.connectString = buildConnectString(account, key, sasToken, federatedTokenFile, endpointSuffix);
this.federatedTokenFile = federatedTokenFile;
this.endpointSuffix = endpointSuffix;
this.timeout = timeout;
this.maxRetries = maxRetries;
Expand Down Expand Up @@ -278,20 +280,32 @@ public String getConnectString() {
return connectString;
}

private static String buildConnectString(String account, @Nullable String key, @Nullable String sasToken, String endpointSuffix) {
public String getFederatedTokenFile() {
return federatedTokenFile;
}

public Boolean useWorkloadIdentity() {
return !federatedTokenFile.isEmpty();
}

private static String buildConnectString(String account, @Nullable String key, @Nullable String sasToken, @Nullable String federatedTokenFile, String endpointSuffix) {
final boolean hasSasToken = Strings.hasText(sasToken);
final boolean hasKey = Strings.hasText(key);
if (hasSasToken == false && hasKey == false) {
throw new SettingsException("Neither a secret key nor a shared access token was set.");
}
final boolean hasFederatedTokenFile = Strings.hasText(federatedTokenFile);
if (hasSasToken && hasKey) {
throw new SettingsException("Both a secret as well as a shared access token were set.");
} else if (hasSasToken && hasFederatedTokenFile) {
throw new SettingsException("Both a shared access token as well as an azure federated token file were set.");
} else if (hasKey && hasFederatedTokenFile) {
throw new SettingsException("Both a secret as well as an azure federated token file were set.");
}
final StringBuilder connectionStringBuilder = new StringBuilder();
connectionStringBuilder.append("DefaultEndpointsProtocol=https").append(";AccountName=").append(account);
if (hasSasToken || hasKey) {
connectionStringBuilder.append("DefaultEndpointsProtocol=https").append(";AccountName=").append(account);
}
if (hasKey) {
connectionStringBuilder.append(";AccountKey=").append(key);
} else {
} else if (hasSasToken) {
connectionStringBuilder.append(";SharedAccessSignature=").append(sasToken);
}
if (Strings.hasText(endpointSuffix)) {
Expand All @@ -300,6 +314,10 @@ private static String buildConnectString(String account, @Nullable String key, @
return connectionStringBuilder.toString();
}

public String getEndpoint() {
return "https://" + account + ".blob." + endpointSuffix;
}

public LocationMode getLocationMode() {
return locationMode;
}
Expand Down Expand Up @@ -369,6 +387,7 @@ private static AzureStorageSettings getClientSettings(Settings settings, String
return new AzureStorageSettings(
account.toString(),
key.toString(),
getValue(settings, clientName, FEDERATED_TOKEN_FILE_SETTING),
sasToken.toString(),
getValue(settings, clientName, ENDPOINT_SUFFIX_SETTING),
getValue(settings, clientName, TIMEOUT_SETTING),
Expand All @@ -377,8 +396,7 @@ private static AzureStorageSettings getClientSettings(Settings settings, String
getValue(settings, clientName, WRITE_TIMEOUT_SETTING),
getValue(settings, clientName, READ_TIMEOUT_SETTING),
getValue(settings, clientName, RESPONSE_TIMEOUT_SETTING),
validateAndCreateProxySettings(settings, clientName)
);
validateAndCreateProxySettings(settings, clientName));
}
}

Expand Down Expand Up @@ -431,6 +449,7 @@ static Map<String, AzureStorageSettings> overrideLocationMode(
new AzureStorageSettings(
entry.getValue().account,
entry.getValue().connectString,
entry.getValue().federatedTokenFile,
entry.getValue().endpointSuffix,
entry.getValue().timeout,
entry.getValue().maxRetries,
Expand All @@ -439,8 +458,7 @@ static Map<String, AzureStorageSettings> overrideLocationMode(
entry.getValue().writeTimeout,
entry.getValue().readTimeout,
entry.getValue().responseTimeout,
entry.getValue().getProxySettings()
)
entry.getValue().getProxySettings())
);
}
return mapBuilder.immutableMap();
Expand Down
Loading