aclSpec,
try (AbfsPerfInfo perfInfoSet = startTracking("setAcl", "setAcl")) {
final AbfsRestOperation setAclOp =
- client.setAcl(relativePath,
+ getClient().setAcl(relativePath,
AbfsAclHelper.serializeAclSpec(aclEntries), eTag, tracingContext);
perfInfoSet.registerResult(setAclOp.getResult())
.registerSuccess(true)
@@ -1666,10 +1666,10 @@ public AclStatus getAclStatus(final Path path, TracingContext tracingContext)
LOG.debug(
"getAclStatus filesystem: {} path: {}",
- client.getFileSystem(),
+ getClient().getFileSystem(),
path);
- AbfsRestOperation op = client
+ AbfsRestOperation op = getClient()
.getAclStatus(getRelativePath(path), tracingContext);
AbfsHttpOperation result = op.getResult();
perfInfo.registerResult(result);
@@ -1706,7 +1706,7 @@ public AclStatus getAclStatus(final Path path, TracingContext tracingContext)
public void access(final Path path, final FsAction mode,
TracingContext tracingContext) throws AzureBlobFileSystemException {
LOG.debug("access for filesystem: {}, path: {}, mode: {}",
- this.client.getFileSystem(), path, mode);
+ this.getClient().getFileSystem(), path, mode);
if (!this.abfsConfiguration.isCheckAccessEnabled()
|| !getIsNamespaceEnabled(tracingContext)) {
LOG.debug("Returning; either check access is not enabled or the account"
@@ -1714,7 +1714,7 @@ public void access(final Path path, final FsAction mode,
return;
}
try (AbfsPerfInfo perfInfo = startTracking("access", "checkAccess")) {
- final AbfsRestOperation op = this.client
+ final AbfsRestOperation op = this.getClient()
.checkAccess(getRelativePath(path), mode.SYMBOL, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
@@ -1745,7 +1745,7 @@ public boolean isInfiniteLeaseKey(String key) {
private void initializeClient(URI uri, String fileSystemName,
String accountName, boolean isSecure)
throws IOException {
- if (this.client != null) {
+ if (this.getClient() != null) {
return;
}
@@ -1819,7 +1819,7 @@ private void initializeClient(URI uri, String fileSystemName,
populateAbfsClientContext());
}
- this.client = getClientHandler().getClient();
+ this.setClient(getClientHandler().getClient());
LOG.trace("AbfsClient init complete");
}
@@ -2189,7 +2189,7 @@ private AbfsLease maybeCreateLease(String relativePath, TracingContext tracingCo
if (!enableInfiniteLease) {
return null;
}
- AbfsLease lease = new AbfsLease(client, relativePath, tracingContext);
+ AbfsLease lease = new AbfsLease(getClient(), relativePath, tracingContext);
leaseRefs.put(lease, null);
return lease;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 26106a717c94f..fb5cb58937220 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -50,7 +50,44 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
- public static final String TOKEN_VERSION = "2";
+
+ //Abfs Http Client Constants for Blob Endpoint APIs.
+
+ /**
+ * HTTP Header Value to denote resource type as container.
+ * {@value}.
+ */
+ public static final String CONTAINER = "container";
+
+ /**
+ * HTTP Header Value to denote component as metadata.
+ * {@value}.
+ */
+ public static final String METADATA = "metadata";
+
+ /**
+ * HTTP Header Value to denote component as block.
+ * {@value}.
+ */
+ public static final String BLOCK = "block";
+
+ /**
+ * HTTP Header Value to denote component as blocklist.
+ * {@value}.
+ */
+ public static final String BLOCKLIST = "blocklist";
+
+ /**
+ * HTTP Header Value to denote component as lease.
+ * {@value}.
+ */
+ public static final String LEASE = "lease";
+
+ /**
+ * HTTP Header Value to denote bock list type as committed.
+ * {@value}.
+ */
+ public static final String BLOCK_TYPE_COMMITTED = "committed";
public static final String JAVA_VENDOR = "java.vendor";
public static final String JAVA_VERSION = "java.version";
@@ -60,6 +97,10 @@ public final class AbfsHttpConstants {
public static final String APN_VERSION = "APN/1.0";
public static final String CLIENT_VERSION = "Azure Blob FS/" + VersionInfo.getVersion();
+ /**
+ * {@value}.
+ */
+ public static final String TOKEN_VERSION = "2";
// Abfs Http Verb
public static final String HTTP_METHOD_DELETE = "DELETE";
@@ -92,6 +133,7 @@ public final class AbfsHttpConstants {
public static final String HTTP_HEADER_PREFIX = "x-ms-";
public static final String HASH = "#";
public static final String TRUE = "true";
+ public static final String ZERO = "0";
public static final String PLUS_ENCODE = "%20";
public static final String FORWARD_SLASH_ENCODE = "%2F";
@@ -101,6 +143,7 @@ public final class AbfsHttpConstants {
public static final String GMT_TIMEZONE = "GMT";
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
+ public static final String APPLICATION_XML = "application/xml";
public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1";
public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java
index 8c9c8af75b53d..6b6e98c9c7082 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java
@@ -45,8 +45,7 @@ public enum FSOperationType {
SET_OWNER("SO"),
SET_ACL("SA"),
TEST_OP("TS"),
- WRITE("WR"),
- INIT("IN");
+ WRITE("WR");
private final String opCode;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index b3c2b21d3c277..53020750ab310 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -59,7 +59,6 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_ACL = "x-ms-acl";
public static final String X_MS_PERMISSIONS = "x-ms-permissions";
public static final String X_MS_UMASK = "x-ms-umask";
- public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
public static final String X_MS_ENCRYPTION_KEY = "x-ms-encryption-key";
public static final String X_MS_ENCRYPTION_KEY_SHA256 = "x-ms-encryption-key-sha256";
@@ -70,10 +69,40 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
+
+ /**
+ * Http Request Header for denoting the lease id of source in copy operation.
+ * {@value}
+ */
+ public static final String X_MS_SOURCE_LEASE_ID = "x-ms-source-lease-id";
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
public static final String EXPECT = "Expect";
public static final String X_MS_RANGE_GET_CONTENT_MD5 = "x-ms-range-get-content-md5";
+ /**
+ * Http Response Header for denoting directory.
+ * {@value}
+ */
+ public static final String X_MS_META_HDI_ISFOLDER = "x-ms-meta-hdi_isfolder";
+
+ /**
+ * Http Response Header prefix for user-defined properties.
+ * {@value}
+ */
+ public static final String X_MS_METADATA_PREFIX = "x-ms-meta-";
+
+ /**
+ * Http Request Header for denoting the source of copy operation.
+ * {@value}
+ */
+ public static final String X_MS_COPY_SOURCE = "x-ms-copy-source";
+
+ /**
+ * Http Request Header for denoting MD5 hash of the blob content.
+ * {@value}
+ */
+ public static final String X_MS_BLOB_CONTENT_MD5 = "x-ms-blob-content-md5";
+
private HttpHeaderConfigurations() {}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
index f7e37dcb6d50d..f4dd38585f5ee 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
@@ -42,6 +42,32 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
public static final String QUERY_PARAM_PAGINATED = "paginated";
+ // query parameters for Blob Endpoint Rest APIs
+
+ /**
+ * Http Query parameter for specifying resource type.
+ * {@value}
+ */
+ public static final String QUERY_PARAM_RESTYPE = "restype";
+
+ /**
+ * Http Query parameter for specifying component.
+ * {@value}
+ */
+ public static final String QUERY_PARAM_COMP = "comp";
+
+ /**
+ * Http Query parameter for specifying blockId.
+ * {@value}
+ */
+ public static final String QUERY_PARAM_BLOCKID = "blockid";
+
+ /**
+ * Http Query parameter for specifying block list type.
+ * {@value}
+ */
+ public static final String QUERY_PARAM_BLOCKLISTTYPE = "blocklisttype";
+
//query params for SAS
public static final String QUERY_PARAM_SAOID = "saoid";
public static final String QUERY_PARAM_SKOID = "skoid";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
index 9da6427d65c2c..f0510d7ac441a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
@@ -36,7 +36,19 @@ public enum Mode {
private final String leaseId;
private boolean isExpectHeaderEnabled;
private boolean isRetryDueToExpect;
+ private final BlobAppendRequestParameters blobParams;
+
+ /**
+ * Constructor to be used for interacting with AbfsDfsClient.
+ * @param position position in remote blob at which append should happen
+ * @param offset position in the buffer to be appended
+ * @param length length of the data to be appended
+ * @param mode mode of the append operation
+ * @param isAppendBlob true if the blob is append-blob
+ * @param leaseId leaseId of the blob to be appended
+ * @param isExpectHeaderEnabled true if the expect header is enabled
+ */
public AppendRequestParameters(final long position,
final int offset,
final int length,
@@ -52,6 +64,37 @@ public AppendRequestParameters(final long position,
this.leaseId = leaseId;
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
this.isRetryDueToExpect = false;
+ this.blobParams = null;
+ }
+
+ /**
+ * Constructor to be used for interacting with AbfsBlobClient.
+ * @param position position in remote blob at which append should happen
+ * @param offset position in the buffer to be appended
+ * @param length length of the data to be appended
+ * @param mode mode of the append operation
+ * @param isAppendBlob true if the blob is append-blob
+ * @param leaseId leaseId of the blob to be appended
+ * @param isExpectHeaderEnabled true if the expect header is enabled
+ * @param blobParams parameters specific to append operation on Blob Endpoint.
+ */
+ public AppendRequestParameters(final long position,
+ final int offset,
+ final int length,
+ final Mode mode,
+ final boolean isAppendBlob,
+ final String leaseId,
+ final boolean isExpectHeaderEnabled,
+ final BlobAppendRequestParameters blobParams) {
+ this.position = position;
+ this.offset = offset;
+ this.length = length;
+ this.mode = mode;
+ this.isAppendBlob = isAppendBlob;
+ this.leaseId = leaseId;
+ this.isExpectHeaderEnabled = isExpectHeaderEnabled;
+ this.isRetryDueToExpect = false;
+ this.blobParams = blobParams;
}
public long getPosition() {
@@ -86,6 +129,22 @@ public boolean isRetryDueToExpect() {
return isRetryDueToExpect;
}
+ /**
+ * Returns BlockId of the block blob to be appended.
+ * @return blockId
+ */
+ public String getBlockId() {
+ return blobParams.getBlockId();
+ }
+
+ /**
+ * Returns ETag of the block blob.
+ * @return eTag
+ */
+ public String getETag() {
+ return blobParams.getETag();
+ }
+
public void setRetryDueToExpect(boolean retryDueToExpect) {
isRetryDueToExpect = retryDueToExpect;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
index 439caabe2327f..db1560d541430 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
@@ -34,10 +34,12 @@
public enum AzureServiceErrorCode {
FILE_SYSTEM_ALREADY_EXISTS("FilesystemAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
PATH_ALREADY_EXISTS("PathAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
+ BLOB_ALREADY_EXISTS("BlobAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
INTERNAL_OPERATION_ABORT("InternalOperationAbortError", HttpURLConnection.HTTP_CONFLICT, null),
PATH_CONFLICT("PathConflict", HttpURLConnection.HTTP_CONFLICT, null),
FILE_SYSTEM_NOT_FOUND("FilesystemNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
PATH_NOT_FOUND("PathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
+ BLOB_PATH_NOT_FOUND("BlobNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
PRE_CONDITION_FAILED("PreconditionFailed", HttpURLConnection.HTTP_PRECON_FAILED, null),
SOURCE_PATH_NOT_FOUND("SourcePathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null),
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobAppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobAppendRequestParameters.java
new file mode 100644
index 0000000000000..25e3118265d6d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobAppendRequestParameters.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.services;
+
+/**
+ * Following parameters are used by AbfsBlobClient only.
+ * Blob Endpoint Append API requires blockId and eTag to be passed in the request.
+ */
+public class BlobAppendRequestParameters {
+ private String blockId;
+ private String eTag;
+
+ /**
+ * Constructor to be used for interacting with AbfsBlobClient.
+ * @param blockId blockId of the block to be appended
+ * @param eTag eTag of the blob being appended
+ */
+ public BlobAppendRequestParameters(String blockId, String eTag) {
+ this.blockId = blockId;
+ this.eTag = eTag;
+ }
+
+ public String getBlockId() {
+ return blockId;
+ }
+
+ public String getETag() {
+ return eTag;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
new file mode 100644
index 0000000000000..07c25b32483d3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -0,0 +1,1087 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK_TYPE_COMMITTED;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BREAK_LEASE_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CONTAINER;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LEASE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.METADATA;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_DURATION;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_META_HDI_ISFOLDER;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RANGE_GET_CONTENT_MD5;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SOURCE_LEASE_ID;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKID;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKLISTTYPE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE;
+
+/**
+ * AbfsClient interacting with Blob endpoint.
+ */
+public class AbfsBlobClient extends AbfsClient {
+
+ public AbfsBlobClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final AccessTokenProvider tokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
+ encryptionContextProvider, abfsClientContext);
+ }
+
+ public AbfsBlobClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final SASTokenProvider sasTokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
+ encryptionContextProvider, abfsClientContext);
+ }
+
+ /**
+ * Create request headers for Rest Operation using the default API version.
+ * @return default request headers.
+ */
+ @Override
+ public List createDefaultHeaders() {
+ return this.createDefaultHeaders(getxMsVersion());
+ }
+
+ /**
+ * Create request headers for Rest Operation using the specified API version.
+ * Blob Endpoint API responses are in JSON/XML format.
+ * @param xMsVersion API version to be used.
+ * @return default request headers
+ */
+ @Override
+ public List createDefaultHeaders(ApiVersion xMsVersion) {
+ List requestHeaders = super.createCommonHeaders(xMsVersion);
+ requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
+ + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM
+ + COMMA + SINGLE_WHITE_SPACE + APPLICATION_XML));
+ return requestHeaders;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Create Container.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation createFilesystem(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.CreateContainer,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Set Container Metadata.
+ * @param properties comma separated list of metadata key-value pairs.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setFilesystemProperties(final Hashtable properties,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ List requestHeaders = createDefaultHeaders();
+ /*
+ * Blob Endpoint supports Unicode characters but DFS Endpoint only allow ASCII.
+ * To match the behavior across endpoints, driver throws exception if non-ASCII characters are found.
+ */
+ try {
+ List metadataRequestHeaders = getMetadataHeadersList(properties);
+ requestHeaders.addAll(metadataRequestHeaders);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+
+ AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, METADATA);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetContainerMetadata,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Get Container Metadata.
+ * Gets all the properties of the filesystem.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ * */
+ @Override
+ public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetContainerProperties,
+ HTTP_METHOD_HEAD, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Delete Container.
+ * Deletes the Container acting as current filesystem.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
+ throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.DeleteContainer,
+ HTTP_METHOD_DELETE, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * List Blobs.
+ * @param relativePath to return only blobs with names that begin with the specified prefix.
+ * @param recursive to return all blobs in the path, including those in subdirectories.
+ * @param listMaxResults maximum number of blobs to return.
+ * @param continuation marker to specify the continuation token.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation or response parsing fails.
+ */
+ @Override
+ public AbfsRestOperation listPath(final String relativePath,
+ final boolean recursive,
+ final int listMaxResults,
+ final String continuation,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of response handling of blob endpoint APIs.
+ throw new NotImplementedException("Blob Endpoint Support is not yet implemented");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Put Blob.
+ * Creates a file or directory(marker file) at specified path.
+ * @param path of the directory to be created.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation createPath(final String path,
+ final boolean isFile,
+ final boolean overwrite,
+ final AzureBlobFileSystemStore.Permissions permissions,
+ final boolean isAppendBlob,
+ final String eTag,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of response handling of blob endpoint APIs.
+ throw new NotImplementedException("Create Path operation on Blob endpoint yet to be implemented.");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Lease Blob.
+ * @param path on which lease has to be acquired.
+ * @param duration for which lease has to be acquired.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation acquireLease(final String path, final int duration,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration)));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString()));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Lease Blob.
+ * @param path on which lease has to be renewed.
+ * @param leaseId of the lease to be renewed.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation renewLease(final String path, final String leaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Lease Blob.
+ * @param path on which lease has to be released.
+ * @param leaseId of the lease to be released.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation releaseLease(final String path, final String leaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Lease Blob.
+ * @param path on which lease has to be broken.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation breakLease(final String path,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.LeaseBlob,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get results for the rename operation.
+ * @param source path to source file
+ * @param destination destination of rename.
+ * @param continuation continuation.
+ * @param tracingContext trace context
+ * @param sourceEtag etag of source file. may be null or empty
+ * @param isMetadataIncompleteState was there a rename failure due to
+ * incomplete metadata state?
+ * @param isNamespaceEnabled whether namespace enabled account or not
+ * @return result of rename operation
+ * @throws IOException if rename operation fails.
+ */
+ @Override
+ public AbfsClientRenameResult renamePath(final String source,
+ final String destination,
+ final String continuation,
+ final TracingContext tracingContext,
+ final String sourceEtag,
+ final boolean isMetadataIncompleteState,
+ final boolean isNamespaceEnabled) throws IOException {
+ /**
+ * TODO: [FnsOverBlob] To be implemented as part of rename-delete over blob endpoint work. HADOOP-19233.
+ */
+ throw new NotImplementedException("Rename operation on Blob endpoint yet to be implemented.");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Put Block.
+ * Uploads data to be appended to a file.
+ * @param path to which data has to be appended.
+ * @param buffer containing data to be appended.
+ * @param reqParams containing parameters for append operation like offset, length etc.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation append(final String path,
+ final byte[] buffer,
+ final AppendRequestParameters reqParams,
+ final String cachedSasToken,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, String.valueOf(buffer.length)));
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, reqParams.getETag()));
+ if (reqParams.getLeaseId() != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId()));
+ }
+ if (reqParams.isExpectHeaderEnabled()) {
+ requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+ }
+ if (isChecksumValidationEnabled()) {
+ addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer);
+ }
+ if (reqParams.isRetryDueToExpect()) {
+ String userAgentRetry = getUserAgent();
+ userAgentRetry = userAgentRetry.replace(HUNDRED_CONTINUE_USER_AGENT, EMPTY_STRING);
+ requestHeaders.removeIf(header -> header.getName().equalsIgnoreCase(USER_AGENT));
+ requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCK);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOCKID, reqParams.getBlockId());
+
+ String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
+ abfsUriQueryBuilder, cachedSasToken);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.PutBlock,
+ HTTP_METHOD_PUT, url, requestHeaders,
+ buffer, reqParams.getoffset(), reqParams.getLength(),
+ sasTokenForReuse);
+
+ try {
+ op.execute(tracingContext);
+ } catch (AbfsRestOperationException e) {
+ /*
+ If the http response code indicates a user error we retry
+ the same append request with expect header being disabled.
+ When "100-continue" header is enabled but a non Http 100 response comes,
+ the response message might not get set correctly by the server.
+ So, this handling is to avoid breaking of backward compatibility
+ if someone has taken dependency on the exception message,
+ which is created using the error string present in the response header.
+ */
+ int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode();
+ if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
+ LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
+ reqParams.setExpectHeaderEnabled(false);
+ reqParams.setRetryDueToExpect(true);
+ return this.append(path, buffer, reqParams, cachedSasToken,
+ contextEncryptionAdapter, tracingContext);
+ }
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw e;
+ }
+
+ if (isMd5ChecksumError(e)) {
+ throw new AbfsInvalidChecksumException(e);
+ }
+
+ throw e;
+ }
+ catch (AzureBlobFileSystemException e) {
+ // Any server side issue will be returned as AbfsRestOperationException and will be handled above.
+ LOG.debug("Append request failed with non server issues for path: {}, offset: {}, position: {}",
+ path, reqParams.getoffset(), reqParams.getPosition());
+ throw e;
+ }
+ return op;
+ }
+
+ /**
+ * Blob Endpoint needs blockIds to flush the data.
+ * This method is not supported on Blob Endpoint.
+ * @param path on which data has to be flushed.
+ * @param position to which data has to be flushed.
+ * @param retainUncommittedData whether to retain uncommitted data after flush.
+ * @param isClose specify if this is the last flush to the file.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param leaseId if there is an active lease on the path.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the server calls.
+ * @return exception as this operation is not supported on Blob Endpoint.
+ * @throws UnsupportedOperationException always.
+ */
+ @Override
+ public AbfsRestOperation flush(final String path,
+ final long position,
+ final boolean retainUncommittedData,
+ final boolean isClose,
+ final String cachedSasToken,
+ final String leaseId,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ throw new UnsupportedOperationException(
+ "Flush without blockIds not supported on Blob Endpoint");
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Put Block List.
+ * The flush operation to commit the blocks.
+ * @param buffer This has the xml in byte format with the blockIds to be flushed.
+ * @param path The path to flush the data to.
+ * @param isClose True when the stream is closed.
+ * @param cachedSasToken The cachedSasToken if available.
+ * @param leaseId The leaseId of the blob if available.
+ * @param eTag The etag of the blob.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation flush(byte[] buffer,
+ final String path,
+ boolean isClose,
+ final String cachedSasToken,
+ final String leaseId,
+ final String eTag,
+ ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+ addEncryptionKeyRequestHeaders(path, requestHeaders, false,
+ contextEncryptionAdapter, tracingContext);
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, String.valueOf(buffer.length)));
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML));
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+ if (leaseId != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+ }
+ String md5Hash = computeMD5Hash(buffer, 0, buffer.length);
+ requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Hash));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
+ String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
+ abfsUriQueryBuilder, cachedSasToken);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.PutBlockList,
+ HTTP_METHOD_PUT, url, requestHeaders,
+ buffer, 0, buffer.length,
+ sasTokenForReuse);
+ try {
+ op.execute(tracingContext);
+ } catch (AbfsRestOperationException ex) {
+ // If 412 Condition Not Met error is seen on retry it means it's either a
+ // parallel write case or the previous request has failed due to network
+ // issue and flush has actually succeeded in the backend. If MD5 hash of
+ // blockIds matches with what was set by previous request, it means the
+ // previous request itself was successful, else request will fail with 412 itself.
+ if (op.getRetryCount() >= 1 && ex.getStatusCode() == HTTP_PRECON_FAILED) {
+ AbfsRestOperation op1 = getPathStatus(path, true, tracingContext,
+ contextEncryptionAdapter);
+ String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5);
+ if (!md5Hash.equals(metadataMd5)) {
+ throw ex;
+ }
+ return op;
+ }
+ throw ex;
+ }
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Set Blob Metadata.
+ * Set the properties of a file or directory.
+ * @param path on which properties have to be set.
+ * @param properties comma separated list of metadata key-value pairs.
+ * @param tracingContext for tracing the service call.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation setPathProperties(final String path,
+ final Hashtable properties,
+ final TracingContext tracingContext,
+ final ContextEncryptionAdapter contextEncryptionAdapter)
+ throws AzureBlobFileSystemException {
+ List requestHeaders = createDefaultHeaders();
+ /*
+ * Blob Endpoint supports Unicode characters but DFS Endpoint only allow ASCII.
+ * To match the behavior across endpoints, driver throws exception if non-ASCII characters are found.
+ */
+ try {
+ List metadataRequestHeaders = getMetadataHeadersList(properties);
+ requestHeaders.addAll(metadataRequestHeaders);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+
+ AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, METADATA);
+ appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.SetPathProperties,
+ HTTP_METHOD_PUT, url, requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Get Blob Properties.
+ * Get the properties of a file or directory.
+ * @param path of which properties have to be fetched.
+ * @param includeProperties to include user defined properties.
+ * @param tracingContext for tracing the service call.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation getPathStatus(final String path,
+ final boolean includeProperties,
+ final TracingContext tracingContext,
+ final ContextEncryptionAdapter contextEncryptionAdapter)
+ throws AzureBlobFileSystemException {
+ return this.getPathStatus(path, tracingContext,
+ contextEncryptionAdapter, true);
+
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Get Blob Properties.
+ * Get the properties of a file or directory.
+ * @param path of which properties have to be fetched.
+ * @param tracingContext for tracing the service call.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param isImplicitCheckRequired specify if implicit check is required.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ public AbfsRestOperation getPathStatus(final String path,
+ final TracingContext tracingContext,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final boolean isImplicitCheckRequired)
+ throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN,
+ String.valueOf(getAbfsConfiguration().isUpnUsed()));
+ appendSASTokenToQuery(path, SASTokenProvider.GET_PROPERTIES_OPERATION,
+ abfsUriQueryBuilder);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetPathStatus,
+ HTTP_METHOD_HEAD, url, requestHeaders);
+ try {
+ op.execute(tracingContext);
+ } catch (AzureBlobFileSystemException ex) {
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw ex;
+ }
+ if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isImplicitCheckRequired) {
+ // This path could be present as an implicit directory in FNS.
+ // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of implicit directory handling over blob endpoint.
+ }
+ if (op.getResult().getStatusCode() == HTTP_NOT_FOUND) {
+ /*
+ * Exception handling at AzureBlobFileSystem happens as per the error-code.
+ * In case of HEAD call that gets 4XX status, error code is not parsed from the response.
+ * Hence, we are throwing a new exception with error code and message.
+ */
+ throw new AbfsRestOperationException(HTTP_NOT_FOUND,
+ AzureServiceErrorCode.BLOB_PATH_NOT_FOUND.getErrorCode(),
+ ex.getMessage(), ex);
+ }
+ throw ex;
+ }
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Get Blob.
+ * Read the contents of the file at specified path
+ * @param path of the file to be read.
+ * @param position in the file from where data has to be read.
+ * @param buffer to store the data read.
+ * @param bufferOffset offset in the buffer to start storing the data.
+ * @param bufferLength length of data to be read.
+ * @param eTag to specify conditional headers.
+ * @param cachedSasToken to be used for the authenticating operation.
+ * @param contextEncryptionAdapter to provide encryption context.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation read(final String path,
+ final long position,
+ final byte[] buffer,
+ final int bufferOffset,
+ final int bufferLength,
+ final String eTag,
+ final String cachedSasToken,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+ AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, String.format(
+ "bytes=%d-%d", position, position + bufferLength - 1));
+ requestHeaders.add(rangeHeader);
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+
+ // Add request header to fetch MD5 Hash of data returned by server.
+ if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
+ abfsUriQueryBuilder, cachedSasToken);
+
+ URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetBlob,
+ HTTP_METHOD_GET, url, requestHeaders,
+ buffer, bufferOffset, bufferLength,
+ sasTokenForReuse);
+ op.execute(tracingContext);
+
+ // Verify the MD5 hash returned by server holds valid on the data received.
+ if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
+ verifyCheckSumForRead(buffer, op.getResult(), bufferOffset);
+ }
+
+ return op;
+ }
+
+ /**
+ * Orchestration for delete operation to be implemented.
+ * @param path to be deleted.
+ * @param recursive if the path is a directory, delete recursively.
+ * @param continuation to specify continuation token.
+ * @param tracingContext for tracing the server calls.
+ * @param isNamespaceEnabled specify if the namespace is enabled.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ @Override
+ public AbfsRestOperation deletePath(final String path,
+ final boolean recursive,
+ final String continuation,
+ TracingContext tracingContext,
+ final boolean isNamespaceEnabled) throws AzureBlobFileSystemException {
+ // TODO: [FnsOverBlob][HADOOP-19233] To be implemented as part of rename-delete over blob endpoint work.
+ throw new NotImplementedException("Delete operation on Blob endpoint will be implemented in future.");
+ }
+
+ /**
+ * Set the owner of the file or directory.
+ * Not supported for HNS-Disabled Accounts.
+ * @param path on which owner has to be set.
+ * @param owner to be set.
+ * @param group to be set.
+ * @param tracingContext for tracing the server calls.
+ * @return exception as this operation is not supported on Blob Endpoint.
+ * @throws UnsupportedOperationException always.
+ */
+ @Override
+ public AbfsRestOperation setOwner(final String path,
+ final String owner,
+ final String group,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ throw new UnsupportedOperationException(
+ "SetOwner operation is only supported on HNS enabled Accounts.");
+ }
+
+ /**
+ * Set the permission of the file or directory.
+ * Not supported for HNS-Disabled Accounts.
+ * @param path on which permission has to be set.
+ * @param permission to be set.
+ * @param tracingContext for tracing the server calls.
+ * @return exception as this operation is not supported on Blob Endpoint.
+ * @throws UnsupportedOperationException always.
+ */
+ @Override
+ public AbfsRestOperation setPermission(final String path,
+ final String permission,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ throw new UnsupportedOperationException(
+ "SetPermission operation is only supported on HNS enabled Accounts.");
+ }
+
+ /**
+ * Set the ACL of the file or directory.
+ * Not supported for HNS-Disabled Accounts.
+ * @param path on which ACL has to be set.
+ * @param aclSpecString to be set.
+ * @param eTag to specify conditional headers. Set only if etag matches.
+ * @param tracingContext for tracing the server calls.
+ * @return exception as this operation is not supported on Blob Endpoint.
+ * @throws UnsupportedOperationException always.
+ */
+ @Override
+ public AbfsRestOperation setAcl(final String path,
+ final String aclSpecString,
+ final String eTag,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ throw new UnsupportedOperationException(
+ "SetAcl operation is only supported on HNS enabled Accounts.");
+ }
+
+ /**
+ * Get the ACL of the file or directory.
+ * Not supported for HNS-Disabled Accounts.
+ * @param path of which properties have to be fetched.
+ * @param useUPN whether to use UPN with rest operation.
+ * @param tracingContext for tracing the server calls.
+ * @return exception as this operation is not supported on Blob Endpoint.
+ * @throws UnsupportedOperationException always.
+ */
+ @Override
+ public AbfsRestOperation getAclStatus(final String path,
+ final boolean useUPN,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ throw new UnsupportedOperationException(
+ "GetAclStatus operation is only supported on HNS enabled Accounts.");
+ }
+
+ /**
+ * Check the access of the file or directory.
+ * Not supported for HNS-Disabled Accounts.
+ * @param path Path for which access check needs to be performed
+ * @param rwx The permission to be checked on the path
+ * @param tracingContext Tracks identifiers for request header
+ * @return exception as this operation is not supported on Blob Endpoint.
+ * @throws UnsupportedOperationException always.
+ */
+ @Override
+ public AbfsRestOperation checkAccess(String path,
+ String rwx,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ throw new UnsupportedOperationException(
+ "CheckAccess operation is only supported on HNS enabled Accounts.");
+ }
+
+ /**
+ * Checks if the rest operation results indicate if the path is a directory.
+ * @param result executed rest operation containing response from server.
+ * @return True if the path is a directory, False otherwise.
+ */
+ @Override
+ public boolean checkIsDir(AbfsHttpOperation result) {
+ String resourceType = result.getResponseHeader(X_MS_META_HDI_ISFOLDER);
+ return resourceType != null && resourceType.equals(TRUE);
+ }
+
+ /**
+ * Returns true if the status code lies in the range of user error.
+ * In the case of HTTP_CONFLICT for PutBlockList we fall back to DFS and hence
+ * this retry handling is not needed.
+ * @param responseStatusCode http response status code.
+ * @return True or False.
+ */
+ @Override
+ public boolean checkUserError(int responseStatusCode) {
+ return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
+ && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
+ && responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Get Block List.
+ * Get the list of committed block ids of the blob.
+ * @param path The path to get the list of blockId's.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ public AbfsRestOperation getBlockList(final String path,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ final List requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ String operation = SASTokenProvider.READ_OPERATION;
+ appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
+
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOCKLISTTYPE, BLOCK_TYPE_COMMITTED);
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.GetBlockList, HTTP_METHOD_GET, url,
+ requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Copy Blob.
+ * This is an asynchronous API, it returns copyId and expects client
+ * to poll the server on the destination and check the copy-progress.
+ * @param sourceBlobPath path of source to be copied.
+ * @param destinationBlobPath path of the destination.
+ * @param srcLeaseId if source path has an active lease.
+ * @param tracingContext for tracing the service call.
+ * @return executed rest operation containing response from server.
+ * This method owns the logic of triggering copyBlob API. The caller of this
+ * method have to own the logic of polling the destination with the copyId
+ * returned in the response from this method.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ public AbfsRestOperation copyBlob(Path sourceBlobPath,
+ Path destinationBlobPath,
+ final String srcLeaseId,
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
+ AbfsUriQueryBuilder abfsUriQueryBuilderDst = createDefaultUriQueryBuilder();
+ AbfsUriQueryBuilder abfsUriQueryBuilderSrc = new AbfsUriQueryBuilder();
+ String dstBlobRelativePath = destinationBlobPath.toUri().getPath();
+ String srcBlobRelativePath = sourceBlobPath.toUri().getPath();
+ appendSASTokenToQuery(dstBlobRelativePath,
+ SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilderDst);
+ appendSASTokenToQuery(srcBlobRelativePath,
+ SASTokenProvider.READ_OPERATION, abfsUriQueryBuilderSrc);
+ final URL url = createRequestUrl(dstBlobRelativePath,
+ abfsUriQueryBuilderDst.toString());
+ final String sourcePathUrl = createRequestUrl(srcBlobRelativePath,
+ abfsUriQueryBuilderSrc.toString()).toString();
+ List requestHeaders = createDefaultHeaders();
+ if (srcLeaseId != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_SOURCE_LEASE_ID, srcLeaseId));
+ }
+ requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl));
+ requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+
+ return getAbfsRestOperation(AbfsRestOperationType.CopyBlob, HTTP_METHOD_PUT,
+ url, requestHeaders);
+ }
+
+ /**
+ * Get Rest Operation for API
+ * Delete Blob.
+ * Deletes the blob at the given path.
+ * @param blobPath path of the blob to be deleted.
+ * @param leaseId if path has an active lease.
+ * @param tracingContext for tracing the server calls.
+ * @return executed rest operation containing response from server.
+ * @throws AzureBlobFileSystemException if rest operation fails.
+ */
+ public AbfsRestOperation deleteBlobPath(final Path blobPath,
+ final String leaseId,
+ final TracingContext tracingContext) throws AzureBlobFileSystemException {
+ AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ String blobRelativePath = blobPath.toUri().getPath();
+ appendSASTokenToQuery(blobRelativePath,
+ SASTokenProvider.DELETE_OPERATION, abfsUriQueryBuilder);
+ final URL url = createRequestUrl(blobRelativePath, abfsUriQueryBuilder.toString());
+ final List requestHeaders = createDefaultHeaders();
+ if (leaseId != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+ }
+ final AbfsRestOperation op = getAbfsRestOperation(
+ AbfsRestOperationType.DeleteBlob, HTTP_METHOD_DELETE, url,
+ requestHeaders);
+ op.execute(tracingContext);
+ return op;
+ }
+
+ private static String encodeMetadataAttribute(String value)
+ throws UnsupportedEncodingException {
+ return value == null ? null
+ : URLEncoder.encode(value, XMS_PROPERTIES_ENCODING_UNICODE);
+ }
+
+ private static String decodeMetadataAttribute(String encoded)
+ throws UnsupportedEncodingException {
+ return encoded == null ? null
+ : URLDecoder.decode(encoded, XMS_PROPERTIES_ENCODING_UNICODE);
+ }
+
+ /**
+ * Checks if the value contains pure ASCII characters or not.
+ * @param value to be checked.
+ * @return true if pureASCII.
+ * @throws CharacterCodingException if not pure ASCII
+ */
+ private boolean isPureASCII(String value) throws CharacterCodingException {
+ final CharsetEncoder encoder = Charset.forName(
+ XMS_PROPERTIES_ENCODING_ASCII).newEncoder();
+ boolean canEncodeValue = encoder.canEncode(value);
+ if (!canEncodeValue) {
+ LOG.debug("Value {} for ne of the metadata is not pure ASCII.", value);
+ throw new CharacterCodingException();
+ }
+ return true;
+ }
+
+ private List getMetadataHeadersList(final Hashtable properties)
+ throws AbfsRestOperationException, CharacterCodingException {
+ List metadataRequestHeaders = new ArrayList<>();
+ for (Map.Entry entry : properties.entrySet()) {
+ String key = X_MS_METADATA_PREFIX + entry.getKey();
+ String value = entry.getValue();
+ // AzureBlobFileSystem supports only ASCII Characters in property values.
+ if (isPureASCII(value)) {
+ try {
+ value = encodeMetadataAttribute(value);
+ } catch (UnsupportedEncodingException e) {
+ throw new InvalidAbfsRestOperationException(e);
+ }
+ metadataRequestHeaders.add(new AbfsHttpHeader(key, value));
+ }
+ }
+ return metadataRequestHeaders;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 59b78cbf6195f..b51131b959854 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -794,6 +794,7 @@ public abstract AbfsRestOperation flush(String path, long position,
* @param cachedSasToken to be used for the authenticating operation.
* @param leaseId if there is an active lease on the path.
* @param eTag to specify conditional headers.
+ * @param contextEncryptionAdapter to provide encryption context.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
@@ -804,6 +805,7 @@ public abstract AbfsRestOperation flush(byte[] buffer,
String cachedSasToken,
String leaseId,
String eTag,
+ ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
index 12d800939ae95..e0be9cbc8a82d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs;
+import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromDfsToBlob;
/**
* AbfsClientHandler is a class that provides a way to get the AbfsClient
@@ -41,6 +42,7 @@ public class AbfsClientHandler {
private AbfsServiceType defaultServiceType;
private final AbfsDfsClient dfsAbfsClient;
+ private final AbfsBlobClient blobAbfsClient;
public AbfsClientHandler(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
@@ -51,6 +53,9 @@ public AbfsClientHandler(final URL baseUrl,
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
abfsClientContext);
+ this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
+ abfsConfiguration, tokenProvider, null, encryptionContextProvider,
+ abfsClientContext);
initServiceType(abfsConfiguration);
}
@@ -63,6 +68,9 @@ public AbfsClientHandler(final URL baseUrl,
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
+ this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
+ abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
+ abfsClientContext);
initServiceType(abfsConfiguration);
}
@@ -84,24 +92,24 @@ public AbfsClient getClient() {
/**
* Get the AbfsClient based on the service type.
- * @param serviceType AbfsServiceType
+ * @param serviceType AbfsServiceType.
* @return AbfsClient
*/
public AbfsClient getClient(AbfsServiceType serviceType) {
- return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : null;
+ return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : blobAbfsClient;
}
/**
* Create the AbfsDfsClient using the url used to configure file system.
* If URL is for Blob endpoint, it will be converted to DFS endpoint.
- * @param baseUrl URL
- * @param creds SharedKeyCredentials
- * @param abfsConfiguration AbfsConfiguration
- * @param tokenProvider AccessTokenProvider
- * @param sasTokenProvider SASTokenProvider
- * @param encryptionContextProvider EncryptionContextProvider
- * @param abfsClientContext AbfsClientContext
- * @return AbfsDfsClient with DFS endpoint URL
+ * @param baseUrl URL.
+ * @param creds SharedKeyCredentials.
+ * @param abfsConfiguration AbfsConfiguration.
+ * @param tokenProvider AccessTokenProvider.
+ * @param sasTokenProvider SASTokenProvider.
+ * @param encryptionContextProvider EncryptionContextProvider.
+ * @param abfsClientContext AbfsClientContext.
+ * @return AbfsDfsClient with DFS endpoint URL.
* @throws IOException if URL conversion fails.
*/
private AbfsDfsClient createDfsClient(final URL baseUrl,
@@ -124,4 +132,38 @@ private AbfsDfsClient createDfsClient(final URL baseUrl,
abfsClientContext);
}
}
+
+ /**
+ * Create the AbfsBlobClient using the url used to configure file system.
+ * If URL is for DFS endpoint, it will be converted to Blob endpoint.
+ * @param baseUrl URL.
+ * @param creds SharedKeyCredentials.
+ * @param abfsConfiguration AbfsConfiguration.
+ * @param tokenProvider AccessTokenProvider.
+ * @param sasTokenProvider SASTokenProvider.
+ * @param encryptionContextProvider EncryptionContextProvider.
+ * @param abfsClientContext AbfsClientContext.
+ * @return AbfsBlobClient with Blob endpoint URL.
+ * @throws IOException if URL conversion fails.
+ */
+ private AbfsBlobClient createBlobClient(final URL baseUrl,
+ final SharedKeyCredentials creds,
+ final AbfsConfiguration abfsConfiguration,
+ final AccessTokenProvider tokenProvider,
+ final SASTokenProvider sasTokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext) throws IOException {
+ URL blobUrl = changeUrlFromDfsToBlob(baseUrl);
+ if (tokenProvider != null) {
+ LOG.debug("Creating AbfsBlobClient with access token provider using the URL: {}", blobUrl);
+ return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
+ tokenProvider, encryptionContextProvider,
+ abfsClientContext);
+ } else {
+ LOG.debug("Creating AbfsBlobClient with SAS token provider using the URL: {}", blobUrl);
+ return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
+ sasTokenProvider, encryptionContextProvider,
+ abfsClientContext);
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index f2eebd8800f15..7d50260f7bad3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -837,6 +837,7 @@ public AbfsRestOperation flush(byte[] buffer,
final String cachedSasToken,
final String leaseId,
final String eTag,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
throw new UnsupportedOperationException(
"Flush with blockIds not supported on DFS Endpoint");
@@ -1282,6 +1283,7 @@ private String convertXmsPropertiesToCommaSeparatedString(final Map
+
+# Azure Blob Storage REST API (Blob Endpoint)
+
+## Introduction
+The REST API for Blob Storage defines HTTP operations against the storage account, containers(filesystems), and blobs.(files)
+The API includes the operations listed in the following table.
+
+| Operation | Resource Type | Description |
+|-------------------------------------------------------|---------------|---------------------------------------------------------------------------------------------|
+| [Create Container](#create-container) | Filesystem | Creates a new azure storage container to be used as an hadoop filesystem. |
+| [Delete Container](#delete-container) | Filesystem | Deletes the specified container acting as hadoop filesystem. |
+| [Set Container Metadata](#set-container-metadata) | Filesystem | Sets the metadata of the specified container acting as hadoop filesystem. |
+| [Get Container Properties](#get-container-properties) | Filesystem | Gets the metadata of the specified container acting as hadoop filesystem. |
+| [List Blobs](#list-blobs) | Filesystem | Lists the paths under the specified directory inside container acting as hadoop filesystem. |
+| [Put Blob](#put-blob) | Path | Creates a new path or updates an existing path under the specified filesystem (container). |
+| [Lease Blob](#lease-blob) | Path | Establishes and manages a lease on the specified path. |
+| [Put Block](#put-block) | Path | Appends Data to an already created blob at specified path. |
+| [Put Block List](#put-block-list) | Path | Flushes The Appended Data to the blob at specified path. |
+| [Set Blob Metadata](#set-blob-metadata) | Path | Sets the user-defined attributes of the blob at specified path. |
+| [Get Blob Properties](#get-blob-properties) | Path | Gets the user-defined attributes of the blob at specified path. |
+| [Get Blob](#get-blob) | Path | Reads data from the blob at specified path. |
+| [Delete Blob](#delete-blob) | Path | Deletes the blob at specified path. |
+| [Get Block List](#get-block-list) | Path | Retrieves the list of blocks that have been uploaded as part of a block blob. |
+| [Copy Blob](#copy-blob) | Path | Copies a blob to a destination within the storage account. |
+
+## Create Container
+The Create Container operation creates a new container under the specified account. If the container with the same name
+already exists, the operation fails.
+Rest API Documentation: [Create Container](https://docs.microsoft.com/en-us/rest/api/storageservices/create-container)
+
+## Delete Container
+The Delete Container operation marks the specified container for deletion. The container and any blobs contained within it.
+Rest API Documentation: [Delete Container](https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container)
+
+## Set Container Metadata
+The Set Container Metadata operation sets user-defined metadata for the specified container as one or more name-value pairs.
+Rest API Documentation: [Set Container Metadata](https://docs.microsoft.com/en-us/rest/api/storageservices/set-container-metadata)
+
+## Get Container Properties
+The Get Container Properties operation returns all user-defined metadata and system properties for the specified container. The returned data doesn't include the container's list of blobs.
+Rest API Documentation: [Get Container Properties](https://docs.microsoft.com/en-us/rest/api/storageservices/get-container-properties)
+
+## List Blobs
+The List Blobs operation returns a list of the blobs under the specified container.
+Rest API Documentation: [List Blobs](https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs)
+
+## Put Blob
+The Put Blob operation creates a new block blob, or updates the content of an existing block blob.
+The Put Blob operation will overwrite all contents of an existing blob with the same name.
+When you update an existing block blob, you overwrite any existing metadata on the blob.
+The content of the existing blob is overwritten with the content of the new blob.
+Partial updates are not supported with Put Blob
+Rest API Documentation: [Put Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
+
+## Lease Blob
+The Lease Blob operation creates and manages a lock on a blob for write and delete operations. The lock duration can be 15 to 60 seconds, or can be infinite.
+Rest API Documentation: [Lease Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/lease-blob)
+
+## Put Block
+The Put Block operation creates a new block to be committed as part of a blob.
+Rest API Documentation: [Put Block](https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
+
+## Put Block List
+The Put Block List operation writes a blob by specifying the list of block IDs that make up the blob. To be written as part of a blob, a block must have been successfully written to the server in an earlier Put Block operation. You can call Put Block List to update a blob by uploading only those blocks that have changed and then committing the new and existing blocks together.
+Rest API Documentation: [Put Block List](https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
+
+## Set Blob Metadata
+The Set Blob Metadata operation sets user-defined metadata for the specified blob as one or more name-value pairs.
+Rest API Documentation: [Set Blob Metadata](https://docs.microsoft.com/en-us/rest/api/storageservices/set-blob-metadata)
+
+## Get Blob Properties
+The Get Blob Properties operation returns all user-defined metadata, standard HTTP properties, and system properties for the blob.
+Rest API Documentation: [Get Blob Properties](https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties)
+
+## Get Blob
+The Get Blob operation reads or downloads a blob from the system, including its metadata and properties.
+Rest API Documentation: [Get Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob)
+
+## Delete Blob
+The Delete Blob operation marks the specified blob for deletion. The blob is later deleted during garbage collection.
+Rest API Documentation: [Delete Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob)
+
+## Get Block List
+The Get Block List operation retrieves the list of blocks that have been uploaded as part of a block blob.
+Rest API Documentation: [Get Block List](https://docs.microsoft.com/en-us/rest/api/storageservices/get-block-list)
+
+## Copy Blob
+The Copy Blob operation copies a blob to a destination within the storage account.
+Rest API Documentation: [Copy Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/copy-blob)
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md b/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md
index f93593cecfb5b..bf0835ccbe3ae 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md
@@ -27,7 +27,7 @@ Refer to [WASB Deprication](./wasb.html) for more details.
## Azure Service Endpoints Used by ABFS Driver
Azure Services offers two set of endpoints for interacting with storage accounts:
-1. [Azure Blob Storage](https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api) referred as Blob Endpoint
+1. [Azure Blob Storage](./blobEndpoint.md) referred as Blob Endpoint
2. [Azure Data Lake Storage](https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/operation-groups) referred as DFS Endpoint
The ABFS Driver by default is designed to work with DFS Endpoint only which primarily
@@ -70,9 +70,9 @@ to blob for HNS Enabled Accounts, FS init will fail with InvalidConfiguration er
```
4. Service Type for Ingress Operations: This will allow an override to choose service
-type only for Ingress Related Operations like [Create](https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob?tabs=microsoft-entra-id),
-[Append](https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id)
-and [Flush](https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id). All other operations will still use the
+type only for Ingress Related Operations like [Create](./blobEndpoint.html#put-blob),
+[Append](./blobEndpoint.html#put-block),
+and [Flush](./blobEndpoint.html#put-block-list). All other operations will still use the
configured service type.
```xml
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
index 69d4f79f8b099..b15f4c997be0e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
@@ -113,7 +113,7 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception {
.getAclStatus(Mockito.anyString(), any(TracingContext.class));
}
- // Todo: [FnsOverBlob] Remove this test case once Blob Endpoint Support is ready and enabled.
+ // TODO: [FnsOverBlob][HADOOP-19179] Remove this test case once Blob Endpoint Support is enabled.
@Test
public void testFileSystemInitFailsWithBlobEndpoitUrl() throws Exception {
Configuration configuration = getRawConfiguration();
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 81897a568763e..3eae1401998b6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -160,7 +160,7 @@ private String getUserAgentString(AbfsConfiguration config,
boolean includeSSLProvider) throws IOException, URISyntaxException {
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
- // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
+ // TODO: [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"), null,
config, (AccessTokenProvider) null, null, abfsClientContext);
String sslProviderName = null;
@@ -364,7 +364,7 @@ public static AbfsClient createTestClientFromCurrentContext(
.build();
// Create test AbfsClient
- // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
+ // TODO : [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
AbfsClient testClient = new AbfsDfsClient(
baseAbfsClientInstance.getBaseUrl(),
(currentAuthType == AuthType.SharedKey
@@ -393,7 +393,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
(currentAuthType == AuthType.SharedKey)
|| (currentAuthType == AuthType.OAuth));
- // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
+ // TODO : [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
AbfsClient client = mock(AbfsDfsClient.class);
AbfsPerfTracker tracker = new AbfsPerfTracker(
"test",
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClientHandler.java
new file mode 100644
index 0000000000000..169398e6e99f8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClientHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
+
+/**
+ * Test AbfsClientHandler initialization.
+ */
+public class ITestAbfsClientHandler extends AbstractAbfsIntegrationTest {
+
+ public ITestAbfsClientHandler() throws Exception{
+
+ }
+
+ /**
+ * Test to verify Client Handler holds both type of clients, and they can be accessed as needed.
+ * @throws Exception if test fails
+ */
+ @Test
+ public void testAbfsClientHandlerInitialization() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ AbfsClientHandler clientHandler = fs.getAbfsStore().getClientHandler();
+ Assertions.assertThat(clientHandler.getClient()).isInstanceOf(AbfsDfsClient.class);
+ Assertions.assertThat(clientHandler.getClient(AbfsServiceType.DFS)).isInstanceOf(AbfsDfsClient.class);
+ Assertions.assertThat(clientHandler.getClient(AbfsServiceType.BLOB)).isInstanceOf(AbfsBlobClient.class);
+ }
+}