diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 8113dd6427d67..523fc74791e1e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -416,6 +416,14 @@ public class AbfsConfiguration{ private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; + /** + * Constructor for AbfsConfiguration for specified service type. + * @param rawConfig used to initialize the configuration. + * @param accountName the name of the azure storage account. + * @param fsConfiguredServiceType service type configured for the file system. + * @throws IllegalAccessException if the field is not accessible. + * @throws IOException if an I/O error occurs. + */ public AbfsConfiguration(final Configuration rawConfig, String accountName, AbfsServiceType fsConfiguredServiceType) @@ -445,6 +453,13 @@ public AbfsConfiguration(final Configuration rawConfig, } } + /** + * Constructor for AbfsConfiguration for default service type i.e. DFS. + * @param rawConfig used to initialize the configuration. + * @param accountName the name of the azure storage account. + * @throws IllegalAccessException if the field is not accessible. + * @throws IOException if an I/O error occurs. + */ public AbfsConfiguration(final Configuration rawConfig, String accountName) throws IllegalAccessException, IOException { this(rawConfig, accountName, AbfsServiceType.DFS); @@ -470,7 +485,7 @@ public Trilean getIsNamespaceEnabledAccount() { * @return the service type. */ public AbfsServiceType getFsConfiguredServiceType() { - return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType); + return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType); } /** @@ -479,7 +494,7 @@ public AbfsServiceType getFsConfiguredServiceType() { * @return the service type. */ public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() { - return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null); + return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null); } /** @@ -488,7 +503,7 @@ public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() { * @return the service type. */ public AbfsServiceType getIngressServiceType() { - return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType()); + return getCaseInsensitiveEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType()); } /** @@ -515,7 +530,7 @@ public void validateConfiguredServiceType(boolean isHNSEnabled) } if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) { throw new InvalidConfigurationValueException( - FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Cannot be BLOB for HNS Account"); + FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Service Type Cannot be BLOB for HNS Account"); } else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) { throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY, "Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account"); @@ -712,6 +727,28 @@ public > T getEnum(String name, T defaultValue) { rawConfig.getEnum(name, defaultValue)); } + /** + * Returns the account-specific enum value if it exists, then + * looks for an account-agnostic value in case-insensitive manner. + * @param name Account-agnostic configuration key + * @param defaultValue Value returned if none is configured + * @param Enum type + * @return enum value if one exists, else null + */ + public > T getCaseInsensitiveEnum(String name, T defaultValue) { + String configValue = getString(name, null); + if (configValue != null) { + for (T enumConstant : defaultValue.getDeclaringClass().getEnumConstants()) { // Step 3: Iterate over enum constants + if (enumConstant.name().equalsIgnoreCase(configValue)) { + return enumConstant; + } + } + // No match found + throw new IllegalArgumentException("No enum constant " + defaultValue.getDeclaringClass().getCanonicalName() + "." + configValue); + } + return defaultValue; + } + /** * Returns the account-agnostic enum value if it exists, else * return default. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index c4df3e24be035..d243d95b8a9c8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -122,6 +122,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR; +import static org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; @@ -215,8 +216,8 @@ public void initialize(URI uri, Configuration configuration) tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat(); this.setWorkingDirectory(this.getHomeDirectory()); - TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener); + TracingContext initFSTracingContext = new TracingContext(clientCorrelationId, + fileSystemId, FSOperationType.INIT, tracingHeaderFormat, listener); /* * Validate the service type configured in the URI is valid for account type used. @@ -224,7 +225,7 @@ public void initialize(URI uri, Configuration configuration) */ try { abfsConfiguration.validateConfiguredServiceType( - tryGetIsNamespaceEnabled(new TracingContext(tracingContext))); + tryGetIsNamespaceEnabled(initFSTracingContext)); } catch (InvalidConfigurationValueException ex) { LOG.debug("File system configured with Invalid Service Type", ex); throw ex; @@ -233,34 +234,39 @@ public void initialize(URI uri, Configuration configuration) throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex); } + /* + * Non-hierarchical-namespace account can not have a customer-provided-key(CPK). + * Fail initialization of filesystem if the configs are provided. CPK is of + * two types: GLOBAL_KEY, and ENCRYPTION_CONTEXT. + */ + try { + if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK( + abfsConfiguration)) && !tryGetIsNamespaceEnabled(new TracingContext( + initFSTracingContext))) { + throw new PathIOException(uri.getPath(), + CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE); + } + } catch (InvalidConfigurationValueException ex) { + LOG.debug("Non-Hierarchical Namespace Accounts Cannot Have CPK Enabled", ex); + throw ex; + } catch (AzureBlobFileSystemException ex) { + LOG.debug("Failed to determine account type for service type validation", ex); + throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex); + } + // Create the file system if it does not exist. if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { - if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) { + TracingContext createFSTracingContext = new TracingContext(initFSTracingContext); + createFSTracingContext.setOperation(CREATE_FILESYSTEM); + if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), createFSTracingContext) == null) { try { - this.createFileSystem(tracingContext); + this.createFileSystem(createFSTracingContext); } catch (AzureBlobFileSystemException ex) { checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS); } } } - /* - * Non-hierarchical-namespace account can not have a customer-provided-key(CPK). - * Fail initialization of filesystem if the configs are provided. CPK is of - * two types: GLOBAL_KEY, and ENCRYPTION_CONTEXT. - */ - if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK( - abfsConfiguration)) - && !getIsNamespaceEnabled(new TracingContext(tracingContext))) { - /* - * Close the filesystem gracefully before throwing exception. Graceful close - * will ensure that all resources are released properly. - */ - close(); - throw new PathIOException(uri.getPath(), - CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE); - } - LOG.trace("Initiate check for delegation token manager"); if (UserGroupInformation.isSecurityEnabled()) { this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled(); @@ -700,7 +706,7 @@ private void incrementStatistic(AbfsStatistic statistic) { private void trailingPeriodCheck(Path path) throws IllegalArgumentException { while (!path.isRoot()) { String pathToString = path.toString(); - if (pathToString.length() != 0) { + if (!pathToString.isEmpty()) { if (pathToString.charAt(pathToString.length() - 1) == '.') { throw new IllegalArgumentException( "ABFS does not allow files or directories to end with a dot."); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 4a2ecf8b2c6d6..aa4b3b95ab9c2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -150,6 +150,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; @@ -343,11 +344,13 @@ public void close() throws IOException { } byte[] encodeAttribute(String value) throws UnsupportedEncodingException { - return value.getBytes(XMS_PROPERTIES_ENCODING); + // DFS Client works with ISO_8859_1 encoding, Blob Works with UTF-8. + return getClient().encodeAttribute(value); } String decodeAttribute(byte[] value) throws UnsupportedEncodingException { - return new String(value, XMS_PROPERTIES_ENCODING); + // DFS Client works with ISO_8859_1 encoding, Blob Works with UTF-8. + return getClient().decodeAttribute(value); } private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException { @@ -485,9 +488,8 @@ public Hashtable getFilesystemProperties( .getFilesystemProperties(tracingContext); perfInfo.registerResult(op.getResult()); - final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); - - parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); + // Handling difference in request headers formats between DFS and Blob Clients. + parsedXmsProperties = getClient().getXMSProperties(op.getResult()); perfInfo.registerSuccess(true); return parsedXmsProperties; @@ -533,10 +535,8 @@ public Hashtable getPathStatus(final Path path, perfInfo.registerResult(op.getResult()); contextEncryptionAdapter.destroy(); - final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); - - parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); - + // Handling difference in request headers formats between DFS and Blob Clients. + parsedXmsProperties = getClient().getXMSProperties(op.getResult()); perfInfo.registerSuccess(true); return parsedXmsProperties; @@ -899,10 +899,8 @@ public AbfsInputStream openFileForRead(Path path, } else { AbfsHttpOperation op = getClient().getPathStatus(relativePath, false, tracingContext, null).getResult(); - resourceType = op.getResponseHeader( - HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - contentLength = Long.parseLong( - op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE; + contentLength = extractContentLength(op); eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); /* * For file created with ENCRYPTION_CONTEXT, client shall receive @@ -983,17 +981,15 @@ public OutputStream openFileForWrite(final Path path, .getPathStatus(relativePath, false, tracingContext, null); perfInfo.registerResult(op.getResult()); - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - - if (parseIsDirectory(resourceType)) { + if (getClient().checkIsDir(op.getResult())) { throw new AbfsRestOperationException( AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", + "openFileForWrite must be used with files and not directories", null); } + final long contentLength = extractContentLength(op.getResult()); final long offset = overwrite ? 0 : contentLength; perfInfo.registerSuccess(true); @@ -1180,8 +1176,8 @@ public FileStatus getFileStatus(final Path path, contentLength = 0; resourceIsDir = true; } else { - contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE)); + contentLength = extractContentLength(result); + resourceIsDir = getClient().checkIsDir(result); } final String transformedOwner = identityTransformer.transformIdentityForGetRequest( @@ -1256,10 +1252,16 @@ public String listStatus(final Path path, final String startFrom, startFrom); final String relativePath = getRelativePath(path); + AbfsClient listingClient = getClient(); if (continuation == null || continuation.isEmpty()) { // generate continuation token if a valid startFrom is provided. if (startFrom != null && !startFrom.isEmpty()) { + /* + * Blob Endpoint Does not support startFrom yet. Fallback to DFS Client. + * startFrom remains null for all HDFS APIs. This is only for internal use. + */ + listingClient = getClient(AbfsServiceType.DFS); continuation = getIsNamespaceEnabled(tracingContext) ? generateContinuationTokenForXns(startFrom) : generateContinuationTokenForNonXns(relativePath, startFrom); @@ -1268,11 +1270,11 @@ public String listStatus(final Path path, final String startFrom, do { try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) { - AbfsRestOperation op = getClient().listPath(relativePath, false, + AbfsRestOperation op = listingClient.listPath(relativePath, false, abfsConfiguration.getListMaxResults(), continuation, tracingContext); perfInfo.registerResult(op.getResult()); - continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + continuation = listingClient.getContinuationFromResponse(op.getResult()); ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); if (retrievedSchema == null) { throw new AbfsRestOperationException( @@ -1465,7 +1467,7 @@ public void modifyAclEntries(final Path path, final List aclSpec, final AbfsRestOperation op = getClient() .getAclStatus(relativePath, useUpn, tracingContext); perfInfoGet.registerResult(op.getResult()); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String eTag = extractEtagHeader(op.getResult()); final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); @@ -1508,7 +1510,7 @@ public void removeAclEntries(final Path path, final List aclSpec, final AbfsRestOperation op = getClient() .getAclStatus(relativePath, isUpnFormat, tracingContext); perfInfoGet.registerResult(op.getResult()); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String eTag = extractEtagHeader(op.getResult()); final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); @@ -1546,7 +1548,7 @@ public void removeDefaultAcl(final Path path, TracingContext tracingContext) final AbfsRestOperation op = getClient() .getAclStatus(relativePath, tracingContext); perfInfoGet.registerResult(op.getResult()); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String eTag = extractEtagHeader(op.getResult()); final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); final Map defaultAclEntries = new HashMap<>(); @@ -1590,7 +1592,7 @@ public void removeAcl(final Path path, TracingContext tracingContext) final AbfsRestOperation op = getClient() .getAclStatus(relativePath, tracingContext); perfInfoGet.registerResult(op.getResult()); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String eTag = extractEtagHeader(op.getResult()); final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); final Map newAclEntries = new HashMap<>(); @@ -1636,7 +1638,7 @@ public void setAcl(final Path path, final List aclSpec, final AbfsRestOperation op = getClient() .getAclStatus(relativePath, isUpnFormat, tracingContext); perfInfoGet.registerResult(op.getResult()); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String eTag = extractEtagHeader(op.getResult()); final Map getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); @@ -1859,12 +1861,24 @@ public String getRelativePath(final Path path) { return relPath; } - private long parseContentLength(final String contentLength) { - if (contentLength == null) { - return -1; + /** + * Extracts the content length from the HTTP operation's response headers. + * + * @param op The AbfsHttpOperation instance from which to extract the content length. + * This operation contains the HTTP response headers. + * @return The content length as a long value. If the Content-Length header is + * not present or is empty, returns 0. + */ + private long extractContentLength(AbfsHttpOperation op) { + long contentLength; + String contentLengthHeader = op.getResponseHeader( + HttpHeaderConfigurations.CONTENT_LENGTH); + if (!contentLengthHeader.equals(EMPTY_STRING)) { + contentLength = Long.parseLong(contentLengthHeader); + } else { + contentLength = 0; } - - return Long.parseLong(contentLength); + return contentLength; } private boolean parseIsDirectory(final String resourceType) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index fb5cb58937220..4e64442a35b69 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -50,6 +50,7 @@ public final class AbfsHttpConstants { public static final String DEFAULT_LEASE_BREAK_PERIOD = "0"; public static final String DEFAULT_TIMEOUT = "90"; public static final String APPEND_BLOB_TYPE = "appendblob"; + public static final String LIST = "list"; //Abfs Http Client Constants for Blob Endpoint APIs. @@ -202,6 +203,40 @@ public static ApiVersion getCurrentVersion() { @Deprecated public static final String DECEMBER_2019_API_VERSION = ApiVersion.DEC_12_2019.toString(); + /** + * List of Constants Used by Blob Endpoint Rest APIs. + */ + public static final String XML_TAG_NAME = "Name"; + public static final String XML_TAG_BLOB = "Blob"; + public static final String XML_TAG_NEXT_MARKER = "NextMarker"; + public static final String XML_TAG_METADATA = "Metadata"; + public static final String XML_TAG_PROPERTIES = "Properties"; + public static final String XML_TAG_BLOB_PREFIX = "BlobPrefix"; + public static final String XML_TAG_CONTENT_LEN = "Content-Length"; + public static final String XML_TAG_RESOURCE_TYPE = "ResourceType"; + public static final String XML_TAG_INVALID_XML = "Invalid XML"; + public static final String XML_TAG_HDI_ISFOLDER = "hdi_isfolder"; + public static final String XML_TAG_ETAG = "Etag"; + public static final String XML_TAG_LAST_MODIFIED_TIME = "Last-Modified"; + public static final String XML_TAG_CREATION_TIME = "Creation-Time"; + public static final String XML_TAG_OWNER = "Owner"; + public static final String XML_TAG_GROUP = "Group"; + public static final String XML_TAG_PERMISSIONS = "Permissions"; + public static final String XML_TAG_ACL = "Acl"; + public static final String XML_TAG_COPY_ID = "CopyId"; + public static final String XML_TAG_COPY_STATUS = "CopyStatus"; + public static final String XML_TAG_COPY_SOURCE = "CopySource"; + public static final String XML_TAG_COPY_PROGRESS = "CopyProgress"; + public static final String XML_TAG_COPY_COMPLETION_TIME = "CopyCompletionTime"; + public static final String XML_TAG_COPY_STATUS_DESCRIPTION = "CopyStatusDescription"; + public static final String XML_TAG_BLOB_ERROR_CODE_START_XML = ""; + public static final String XML_TAG_BLOB_ERROR_CODE_END_XML = ""; + public static final String XML_TAG_BLOB_ERROR_MESSAGE_START_XML = ""; + public static final String XML_TAG_BLOB_ERROR_MESSAGE_END_XML = ""; + public static final String XML_TAG_COMMITTED_BLOCKS = "CommittedBlocks"; + public static final String XML_TAG_BLOCK_NAME = "Block"; + public static final String PUT_BLOCK_LIST = "PutBlockList"; + /** * Value that differentiates categories of the http_status. *
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java
index 6b6e98c9c7082..8c9c8af75b53d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java
@@ -45,7 +45,8 @@ public enum FSOperationType {
     SET_OWNER("SO"),
     SET_ACL("SA"),
     TEST_OP("TS"),
-    WRITE("WR");
+    WRITE("WR"),
+    INIT("IN");
 
     private final String opCode;
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemUriSchemes.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemUriSchemes.java
index 0b5cba72f126d..ef268faef1322 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemUriSchemes.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemUriSchemes.java
@@ -38,8 +38,8 @@ public final class FileSystemUriSchemes {
   public static final String WASB_SECURE_SCHEME = "wasbs";
   public static final String WASB_DNS_PREFIX = "blob";
 
-  public static final String ABFS_DFS_DOMAIN_NAME = "dfs.core.windows.net";
-  public static final String ABFS_BLOB_DOMAIN_NAME = "blob.core.windows.net";
+  public static final String ABFS_DFS_DOMAIN_NAME = ".dfs.";
+  public static final String ABFS_BLOB_DOMAIN_NAME = ".blob.";
 
   private FileSystemUriSchemes() {}
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
index f4dd38585f5ee..6f13f86dd37ab 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
@@ -67,6 +67,11 @@ public final class HttpQueryParams {
    * {@value}
    */
   public static final String QUERY_PARAM_BLOCKLISTTYPE = "blocklisttype";
+  public static final String QUERY_PARAM_INCLUDE = "include";
+  public static final String QUERY_PARAM_PREFIX = "prefix";
+  public static final String QUERY_PARAM_MARKER = "marker";
+  public static final String QUERY_PARAM_DELIMITER = "delimiter";
+  public static final String QUERY_PARAM_MAX_RESULTS = "maxresults";
 
   //query params for SAS
   public static final String QUERY_PARAM_SAOID = "saoid";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListResultEntrySchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListResultEntrySchema.java
new file mode 100644
index 0000000000000..495e650954f4e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListResultEntrySchema.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.services;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * List Result Entry Schema for Blob Endpoint List Blob API
+ */
+public class BlobListResultEntrySchema implements ListResultEntrySchema {
+
+  private String name;
+  private Path path;
+  private String url;
+  private Boolean isDirectory = false;
+  private String eTag;
+  private String lastModifiedTime;
+  private String creationTime;
+  private String owner;
+  private String group;
+  private String permission;
+  private String acl;
+  private Long contentLength = 0L;
+  private String copyId;
+  private String copyStatus;
+  private String copySourceUrl;
+  private String copyProgress;
+  private String copyStatusDescription;
+  private long copyCompletionTime;
+  private Map metadata = new HashMap<>();
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  public Path path() {
+    return path;
+  }
+
+  public String url() {
+    return url;
+  }
+
+  @Override
+  public Boolean isDirectory() {
+    return isDirectory;
+  }
+
+  @Override
+  public String eTag() {
+    return eTag;
+  }
+
+  @Override
+  public String lastModified() {
+    return String.valueOf(lastModifiedTime);
+  }
+
+  public String creation() {
+    return String.valueOf(lastModifiedTime);
+  }
+
+  public String lastModifiedTime() {
+    return lastModifiedTime;
+  }
+
+  public String creationTime() {
+    return creationTime;
+  }
+
+  @Override
+  public Long contentLength() {
+    return contentLength;
+  }
+
+  public String copyId() {
+    return copyId;
+  }
+
+  public String copyStatus() {
+    return copyStatus;
+  }
+
+  public String copySourceUrl() {
+    return copySourceUrl;
+  }
+
+  public String copyProgress() {
+    return copyProgress;
+  }
+
+  public String copyStatusDescription() {
+    return copyStatusDescription;
+  }
+
+  public long copyCompletionTime() {
+    return copyCompletionTime;
+  }
+
+  public Map metadata() {
+    return metadata;
+  }
+
+  @Override
+  public String owner() {
+    return owner;
+  }
+
+  @Override
+  public String group() {
+    return group;
+  }
+
+  @Override
+  public String permissions() {
+    return permission;
+  }
+
+  @Override
+  public String getXMsEncryptionContext() {
+    return null;
+  }
+
+  @Override
+  public String getCustomerProvidedKeySha256() {
+    return null;
+  }
+
+  @Override
+  public ListResultEntrySchema withName(final String name) {
+    this.name = name;
+    return this;
+  }
+
+  public void setName(final String name) {
+    this.name = name;
+  }
+
+  public void setPath(final Path path) {
+    this.path = path;
+  }
+
+  public void setUrl(final String url) {
+    this.url = url;
+  }
+
+  public void setIsDirectory(final Boolean isDirectory) {
+    this.isDirectory = isDirectory;
+  }
+
+  public void setETag(final String eTag) {
+    this.eTag = eTag;
+  }
+
+  public void setLastModifiedTime(final String lastModifiedTime) {
+    this.lastModifiedTime = lastModifiedTime;
+  }
+
+  public void setCreationTime(final String creationTime) {
+    this.creationTime = creationTime;
+  }
+
+  public void setOwner(final String owner) {
+    this.owner = owner;
+  }
+
+  public void setGroup(final String group) {
+    this.group = group;
+  }
+
+  public void setPermission(final String permission) {
+    this.permission = permission;
+  }
+
+  public void setAcl(final String acl) {
+    this.acl = acl;
+  }
+
+  public void setContentLength(final Long contentLength) {
+    this.contentLength = contentLength;
+  }
+
+  public void setCopyId(final String copyId) {
+    this.copyId = copyId;
+  }
+
+  public void setCopyStatus(final String copyStatus) {
+    this.copyStatus = copyStatus;
+  }
+
+  public void setCopyProgress(final String copyProgress) {
+    this.copyProgress = copyProgress;
+  }
+
+  public void setCopySourceUrl(final String copySourceUrl) {
+    this.copySourceUrl = copySourceUrl;
+  }
+
+  public void setCopyStatusDescription(final String copyStatusDescription) {
+    this.copyStatusDescription = copyStatusDescription;
+  }
+
+  public void setCopyCompletionTime(final long copyCompletionTime) {
+    this.copyCompletionTime = copyCompletionTime;
+  }
+
+  public void setMetadata(final Map metadata) {
+    this.metadata = metadata;
+  }
+
+  public void addMetadata(final String key, final String value) {
+    this.metadata.put(key, value);
+  }
+
+  public String getAcl() {
+    return acl;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListResultSchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListResultSchema.java
new file mode 100644
index 0000000000000..7506ebdda5012
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListResultSchema.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The ListResultSchema model for Blob Endpoint Listing.
+ */
+public class BlobListResultSchema implements ListResultSchema {
+
+  // List of paths returned by Blob Endpoint Listing.
+  private List paths;
+
+  // Continuation token for the next page of results.
+  private String nextMarker;
+
+  public BlobListResultSchema() {
+    this.paths = new ArrayList<>();
+    nextMarker = null;
+  }
+
+  /**
+   * Return the list of paths returned by Blob Endpoint Listing.
+   * @return the paths value
+   */
+  @Override
+  public List paths() {
+    return paths;
+  }
+
+  /**
+   * Set the paths value to list of paths returned by Blob Endpoint Listing.
+   * @param paths the paths value to set
+   * @return the ListSchema object itself.
+   */
+  @Override
+  public ListResultSchema withPaths(final List paths) {
+    this.paths = (List) paths;
+    return this;
+  }
+
+  public void addBlobListEntry(final BlobListResultEntrySchema blobListEntry) {
+    this.paths.add(blobListEntry);
+  }
+
+  public String getNextMarker() {
+    return nextMarker;
+  }
+
+  public void setNextMarker(String nextMarker) {
+    this.nextMarker = nextMarker;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListXmlParser.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListXmlParser.java
new file mode 100644
index 0000000000000..9cfbf23789884
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobListXmlParser.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.services;
+
+import java.util.Stack;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+
+/**
+ * Parses the response inputSteam and populates an object of {@link BlobListResultSchema}. Parsing
+ * creates a list of {@link BlobListResultEntrySchema}.
+ * + * BlobList API XML response example + *
+ *   {@code
+ *   
+ *    string-value
+ *    string-value
+ *    int-value
+ *    string-value
+ *    
+ *       
+ *       
+ *    
+ *    
+ *   
+ *   }
+ * 
+ */ + + +public class BlobListXmlParser extends DefaultHandler { + /** + * Object that contains the parsed response. + */ + private final BlobListResultSchema listResultSchema; + private final String url; + /** + * {@link BlobListResultEntrySchema} for which at a given moment, the parsing is going on. + * + * Following XML elements will be parsed and added in currentBlobEntry. + * 1. Blob: for explicit files and directories + * + * blob-name + * + * + * value + * + * + * + * 2. BlobPrefix: for directories both explicit and implicit + * + * blob-prefix + * + */ + private BlobListResultEntrySchema currentBlobEntry; + /** + * Maintains the value in a given XML-element. + */ + private StringBuilder bld = new StringBuilder(); + /** + * Maintains the stack of XML-elements in memory at a given moment. + */ + private final Stack elements = new Stack<>(); + + /** + * Set an object of {@link BlobListResultSchema} to populate from the parsing. + * Set the url for which GetBlobList API is called. + * @param listResultSchema Object to populate from the parsing. + * @param url URL for which GetBlobList API is called. + */ + public BlobListXmlParser(final BlobListResultSchema listResultSchema, final String url) { + this.listResultSchema = listResultSchema; + this.url = url; + } + + /** + *
Receive notification of the start of an element.
+ * If the xml start tag is "Blob", it defines that a new BlobProperty information + * is going to be parsed. + */ + @Override + public void startElement(final String uri, + final String localName, + final String qName, + final Attributes attributes) throws SAXException { + elements.push(localName); + if (AbfsHttpConstants.XML_TAG_BLOB.equals(localName) || AbfsHttpConstants.XML_TAG_BLOB_PREFIX.equals(localName)) { + currentBlobEntry = new BlobListResultEntrySchema(); + } + } + + /** + *
Receive notification of the end of an element.
+ * Whenever an XML-tag is closed, the parent-tag and current-tag shall be + * checked and correct property shall be set in the active {@link #currentBlobEntry}. + * If the current-tag is "Blob", it means that there are no more properties to + * be set in the the active {@link #currentBlobEntry}, and it shall be the + * {@link #listResultSchema}. + */ + @Override + public void endElement(final String uri, + final String localName, + final String qName) + throws SAXException { + String currentNode = elements.pop(); + + // Check if the ending tag is correct to the starting tag in the stack. + if (!currentNode.equals(localName)) { + throw new SAXException(AbfsHttpConstants.XML_TAG_INVALID_XML); + } + + String parentNode = EMPTY_STRING; + if (elements.size() > 0) { + parentNode = elements.peek(); + } + + String value = bld.toString(); + if (value.isEmpty()) { + value = null; + } + + /* + * If the closing tag is Blob, there are no more properties to be set in + * currentBlobEntry. + */ + if (AbfsHttpConstants.XML_TAG_BLOB.equals(currentNode)) { + listResultSchema.addBlobListEntry(currentBlobEntry); + currentBlobEntry = null; + } + + /* + * If the closing tag is BlobPrefix, there are no more properties to be set in + * currentBlobEntry and this is a directory (implicit or explicit) + * If implicit, it will be added only once/ + * If explicit it will be added with Blob Tag as well. + */ + if (AbfsHttpConstants.XML_TAG_BLOB_PREFIX.equals(currentNode)) { + currentBlobEntry.setIsDirectory(true); + listResultSchema.addBlobListEntry(currentBlobEntry); + currentBlobEntry = null; + } + + /* + * If the closing tag is Next Marker, it needs to be saved with the + * list of blobs currently fetched + */ + if (AbfsHttpConstants.XML_TAG_NEXT_MARKER.equals(currentNode)) { + listResultSchema.setNextMarker(value); + } + + /* + * If the closing tag is Name, then it is either for a blob + * or for a blob prefix denoting a directory. We will save this + * in current BlobProperty for both + */ + if (currentNode.equals(AbfsHttpConstants.XML_TAG_NAME) + && (parentNode.equals(AbfsHttpConstants.XML_TAG_BLOB) + || parentNode.equals(AbfsHttpConstants.XML_TAG_BLOB_PREFIX))) { + // In case of BlobPrefix Name will have a slash at the end + // Remove the "/" at the end of name + if (value.endsWith(AbfsHttpConstants.FORWARD_SLASH)) { + value = value.substring(0, value.length() - 1); + } + + currentBlobEntry.setName(value); + currentBlobEntry.setPath(new Path(AbfsHttpConstants.ROOT_PATH + value)); + currentBlobEntry.setUrl(url + AbfsHttpConstants.ROOT_PATH + value); + } + + /* + * For case: + * + * ... + * + * value + * value + * + * ... + * + * ParentNode will be Metadata for all key1, key2, ... , keyN. + */ + if (parentNode.equals(AbfsHttpConstants.XML_TAG_METADATA)) { + currentBlobEntry.addMetadata(currentNode, value); + // For Marker blobs hdi_isFolder will be present as metadata + if (AbfsHttpConstants.XML_TAG_HDI_ISFOLDER.equals(currentNode)) { + currentBlobEntry.setIsDirectory(Boolean.valueOf(value)); + } + } + + /* + * For case: + * + * ... + * + * date-time-value + * date-time-value + * Etag + * owner user id + * owning group id + * permission string + * access control list + * file | directory + * size-in-bytes + * id + * pending | success | aborted | failed + * source url + * bytes copied/bytes total + * datetime + * error string + * + * ... + * + * ParentNode will be Properties for Content-Length, ResourceType. + */ + if (parentNode.equals(AbfsHttpConstants.XML_TAG_PROPERTIES)) { + if (currentNode.equals(AbfsHttpConstants.XML_TAG_CREATION_TIME)) { + currentBlobEntry.setCreationTime(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_LAST_MODIFIED_TIME)) { + currentBlobEntry.setLastModifiedTime(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_ETAG)) { + currentBlobEntry.setETag(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_OWNER)) { + currentBlobEntry.setOwner(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_GROUP)) { + currentBlobEntry.setGroup(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_PERMISSIONS)) { + currentBlobEntry.setPermission(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_ACL)) { + currentBlobEntry.setAcl(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_RESOURCE_TYPE)) { + if (AbfsHttpConstants.DIRECTORY.equals(value)) { + currentBlobEntry.setIsDirectory(true); + } + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_CONTENT_LEN)) { + currentBlobEntry.setContentLength(Long.parseLong(value)); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_COPY_ID)) { + currentBlobEntry.setCopyId(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_COPY_STATUS)) { + currentBlobEntry.setCopyStatus(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_COPY_SOURCE)) { + currentBlobEntry.setCopySourceUrl(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_COPY_PROGRESS)) { + currentBlobEntry.setCopyProgress(value); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_COPY_COMPLETION_TIME)) { + currentBlobEntry.setCopyCompletionTime(DateTimeUtils.parseLastModifiedTime(value)); + } + if (currentNode.equals(AbfsHttpConstants.XML_TAG_COPY_STATUS_DESCRIPTION)) { + currentBlobEntry.setCopyStatusDescription(value); + } + } + /* + * refresh bld for the next XML-tag value + */ + bld = new StringBuilder(); + } + + /** + * Receive notification of character data inside an element. No heuristics to + * apply. Just append the {@link #bld}. + */ + @Override + public void characters(final char[] ch, final int start, final int length) + throws SAXException { + bld.append(ch, start, length); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/DfsListResultEntrySchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/DfsListResultEntrySchema.java new file mode 100644 index 0000000000000..51d9d530756ed --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/DfsListResultEntrySchema.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.services; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * The ListResultEntrySchema model for DFS Endpoint + */ +@InterfaceStability.Evolving +@JsonIgnoreProperties(ignoreUnknown = true) +public class DfsListResultEntrySchema implements ListResultEntrySchema { + /** + * The name property. + */ + @JsonProperty(value = "name") + private String name; + + /** + * The isDirectory property. + */ + @JsonProperty(value = "isDirectory") + private Boolean isDirectory; + + /** + * The lastModified property. + */ + @JsonProperty(value = "lastModified") + private String lastModified; + + /** + * The eTag property. + */ + @JsonProperty(value = "etag") + private String eTag; + + /** + * The contentLength property. + */ + @JsonProperty(value = "contentLength") + private Long contentLength; + + /** + * The owner property. + */ + @JsonProperty(value = "owner") + private String owner; + + /** + * The group property. + */ + @JsonProperty(value = "group") + private String group; + + /** + * The permissions property. + */ + @JsonProperty(value = "permissions") + private String permissions; + + /** + * The encryption context property + */ + @JsonProperty(value = "EncryptionContext") + private String xMsEncryptionContext; + + /** + * The customer-provided encryption-256 value + * */ + @JsonProperty(value = "CustomerProvidedKeySha256") + private String customerProvidedKeySha256; + + /** + * Get the name value. + * @return the name value + */ + @Override + public String name() { + return name; + } + + /** + * Set the name value. + * @param name the name value to set + * @return the ListEntrySchema object itself. + */ + @Override + public DfsListResultEntrySchema withName(String name) { + this.name = name; + return this; + } + + /** + * Get the isDirectory value. + * @return the isDirectory value + */ + @Override + public Boolean isDirectory() { + return isDirectory; + } + + /** + * Set the isDirectory value. + * + * @param isDirectory the isDirectory value to set + * @return the ListEntrySchema object itself. + */ + public DfsListResultEntrySchema withIsDirectory(final Boolean isDirectory) { + this.isDirectory = isDirectory; + return this; + } + + /** + * Get the lastModified value. + * @return the lastModified value + */ + @Override + public String lastModified() { + return lastModified; + } + + /** + * Set the lastModified value. + * + * @param lastModified the lastModified value to set + * @return the ListEntrySchema object itself. + */ + public DfsListResultEntrySchema withLastModified(String lastModified) { + this.lastModified = lastModified; + return this; + } + + /** + * Get the etag value. + * @return the etag value + */ + @Override + public String eTag() { + return eTag; + } + + /** + * Set the eTag value. + * @param eTag the eTag value to set + * @return the ListEntrySchema object itself. + */ + public DfsListResultEntrySchema withETag(final String eTag) { + this.eTag = eTag; + return this; + } + + /** + * Get the contentLength value. + * @return the contentLength value + */ + @Override + public Long contentLength() { + return contentLength; + } + + /** + * Set the contentLength value. + * + * @param contentLength the contentLength value to set + * @return the ListEntrySchema object itself. + */ + public DfsListResultEntrySchema withContentLength(final Long contentLength) { + this.contentLength = contentLength; + return this; + } + + /** + * Get the owner value. + * @return the owner value + */ + @Override + public String owner() { + return owner; + } + + /** + * Set the owner value. + * + * @param owner the owner value to set + * @return the ListEntrySchema object itself. + */ + public DfsListResultEntrySchema withOwner(final String owner) { + this.owner = owner; + return this; + } + + /** + * Get the group value. + * @return the group value + */ + @Override + public String group() { + return group; + } + + /** + * Set the group value. + * + * @param group the group value to set + * @return the ListEntrySchema object itself. + */ + public DfsListResultEntrySchema withGroup(final String group) { + this.group = group; + return this; + } + + /** + * Get the permissions value. + * @return the permissions value + */ + @Override + public String permissions() { + return permissions; + } + + /** + * Set the permissions value. + * + * @param permissions the permissions value to set + * @return the ListEntrySchema object itself. + */ + public DfsListResultEntrySchema withPermissions(final String permissions) { + this.permissions = permissions; + return this; + } + + /** + * Get the x-ms-encryption-context value. + * @return the x-ms-encryption-context value. + */ + @Override + public String getXMsEncryptionContext() { + return xMsEncryptionContext; + } + + /** + * Get the customer-provided sha-256 key + * @return the x-ms-encryption-key-sha256 value used by client. + */ + @Override + public String getCustomerProvidedKeySha256() { + return customerProvidedKeySha256; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/DfsListResultSchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/DfsListResultSchema.java new file mode 100644 index 0000000000000..c5d16e82550bd --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/DfsListResultSchema.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.services; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * The ListResultSchema model for DFS Endpoint Listing. + */ +@InterfaceStability.Evolving +@JsonIgnoreProperties(ignoreUnknown = true) +public class DfsListResultSchema implements ListResultSchema { + /** + * List of paths returned by DFS Endpoint Listing. + */ + @JsonProperty(value = "paths") + private List paths; + + /** + * Get the list of paths returned by DFS Endpoint Listing. + * @return the paths value + */ + public List paths() { + return this.paths; + } + + /** + * Set the paths value to list of paths returned by DFS Endpoint Listing. + * @param paths the paths value to set + * @return the ListSchema object itself. + */ + public ListResultSchema withPaths(final List paths) { + this.paths = (List) paths; + return this; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java index 77f52e4a2bbd1..c5126d05f8ba5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java @@ -18,251 +18,75 @@ package org.apache.hadoop.fs.azurebfs.contracts.services; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.hadoop.classification.InterfaceStability; - /** * The ListResultEntrySchema model. */ -@InterfaceStability.Evolving -@JsonIgnoreProperties(ignoreUnknown = true) -public class ListResultEntrySchema { - /** - * The name property. - */ - @JsonProperty(value = "name") - private String name; - - /** - * The isDirectory property. - */ - @JsonProperty(value = "isDirectory") - private Boolean isDirectory; - - /** - * The lastModified property. - */ - @JsonProperty(value = "lastModified") - private String lastModified; - - /** - * The eTag property. - */ - @JsonProperty(value = "etag") - private String eTag; - - /** - * The contentLength property. - */ - @JsonProperty(value = "contentLength") - private Long contentLength; - - /** - * The owner property. - */ - @JsonProperty(value = "owner") - private String owner; - - /** - * The group property. - */ - @JsonProperty(value = "group") - private String group; - - /** - * The permissions property. - */ - @JsonProperty(value = "permissions") - private String permissions; - - /** - * The encryption context property - */ - @JsonProperty(value = "EncryptionContext") - private String xMsEncryptionContext; - - /** - * The customer-provided encryption-256 value - * */ - @JsonProperty(value = "CustomerProvidedKeySha256") - private String customerProvidedKeySha256; +public interface ListResultEntrySchema { /** * Get the name value. - * * @return the name value */ - public String name() { - return name; - } + String name(); /** * Set the name value. - * * @param name the name value to set - * @return the ListEntrySchema object itself. + * @return the ListResultEntrySchema object itself. */ - public ListResultEntrySchema withName(String name) { - this.name = name; - return this; - } + ListResultEntrySchema withName(String name); /** * Get the isDirectory value. - * * @return the isDirectory value */ - public Boolean isDirectory() { - return isDirectory; - } - - /** - * Set the isDirectory value. - * - * @param isDirectory the isDirectory value to set - * @return the ListEntrySchema object itself. - */ - public ListResultEntrySchema withIsDirectory(final Boolean isDirectory) { - this.isDirectory = isDirectory; - return this; - } + Boolean isDirectory(); /** * Get the lastModified value. - * * @return the lastModified value */ - public String lastModified() { - return lastModified; - } - - /** - * Set the lastModified value. - * - * @param lastModified the lastModified value to set - * @return the ListEntrySchema object itself. - */ - public ListResultEntrySchema withLastModified(String lastModified) { - this.lastModified = lastModified; - return this; - } - - /** - * Get the etag value. - * - * @return the etag value - */ - public String eTag() { - return eTag; - } + String lastModified(); /** - * Set the eTag value. - * - * @param eTag the eTag value to set - * @return the ListEntrySchema object itself. + * Get the eTag value. + * @return the eTag value */ - public ListResultEntrySchema withETag(final String eTag) { - this.eTag = eTag; - return this; - } + String eTag(); /** * Get the contentLength value. - * * @return the contentLength value */ - public Long contentLength() { - return contentLength; - } + Long contentLength(); /** - * Set the contentLength value. - * - * @param contentLength the contentLength value to set - * @return the ListEntrySchema object itself. - */ - public ListResultEntrySchema withContentLength(final Long contentLength) { - this.contentLength = contentLength; - return this; - } - - /** - * - Get the owner value. - * + * Get the owner value. * @return the owner value */ - public String owner() { - return owner; - } - - /** - * Set the owner value. - * - * @param owner the owner value to set - * @return the ListEntrySchema object itself. - */ - public ListResultEntrySchema withOwner(final String owner) { - this.owner = owner; - return this; - } + String owner(); /** * Get the group value. - * * @return the group value */ - public String group() { - return group; - } - - /** - * Set the group value. - * - * @param group the group value to set - * @return the ListEntrySchema object itself. - */ - public ListResultEntrySchema withGroup(final String group) { - this.group = group; - return this; - } + String group(); /** * Get the permissions value. - * * @return the permissions value */ - public String permissions() { - return permissions; - } + String permissions(); /** - * Set the permissions value. - * - * @param permissions the permissions value to set - * @return the ListEntrySchema object itself. + * Get the encryption context value. + * @return the encryption context value */ - public ListResultEntrySchema withPermissions(final String permissions) { - this.permissions = permissions; - return this; - } + String getXMsEncryptionContext(); /** - * Get the x-ms-encryption-context value. - * @return the x-ms-encryption-context value. - * */ - public String getXMsEncryptionContext() { - return xMsEncryptionContext; - } - - /** - * Get the customer-provided sha-256 key - * @return the x-ms-encryption-key-sha256 value used by client. - * */ - public String getCustomerProvidedKeySha256() { - return customerProvidedKeySha256; - } + * Get the customer-provided encryption-256 value. + * @return the customer-provided encryption-256 value + */ + String getCustomerProvidedKeySha256(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java index dc7da04b5bd4f..2fe0aef1433c5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java @@ -20,41 +20,20 @@ import java.util.List; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.hadoop.classification.InterfaceStability; - /** * The ListResultSchema model. */ -@InterfaceStability.Evolving -@JsonIgnoreProperties(ignoreUnknown = true) -public class ListResultSchema { - /** - * The paths property. - */ - @JsonProperty(value = "paths") - private List paths; - +public interface ListResultSchema { /** - * * Get the paths value. - * + * Get the paths value. * @return the paths value */ - public List paths() { - return this.paths; - } + List paths(); /** * Set the paths value. - * * @param paths the paths value to set * @return the ListSchema object itself. */ - public ListResultSchema withPaths(final List paths) { - this.paths = paths; - return this; - } - + ListResultSchema withPaths(List paths); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/StorageErrorResponseSchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/StorageErrorResponseSchema.java new file mode 100644 index 0000000000000..4f312e7e797ea --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/StorageErrorResponseSchema.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.services; + +/** + * Response Schema for Storage Error Parsing. + * Common schema for both endpoints. + */ +public class StorageErrorResponseSchema { + + public StorageErrorResponseSchema(final String storageErrorCode, + final String storageErrorMessage, + final String expectedAppendPos) { + this.storageErrorCode = storageErrorCode; + this.storageErrorMessage = storageErrorMessage; + this.expectedAppendPos = expectedAppendPos; + } + + private String storageErrorCode; + private String storageErrorMessage; + private String expectedAppendPos; + + public String getStorageErrorCode() { + return storageErrorCode; + } + + public void setStorageErrorCode(final String storageErrorCode) { + this.storageErrorCode = storageErrorCode; + } + + public String getStorageErrorMessage() { + return storageErrorMessage; + } + + public void setStorageErrorMessage(final String storageErrorMessage) { + this.storageErrorMessage = storageErrorMessage; + } + + public String getExpectedAppendPos() { + return expectedAppendPos; + } + + public void setExpectedAppendPos(final String expectedAppendPos) { + this.expectedAppendPos = expectedAppendPos; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java index 3e8c6d22637fb..3ed70965db923 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java @@ -58,6 +58,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH; import static org.apache.http.entity.ContentType.TEXT_PLAIN; /** @@ -95,8 +96,9 @@ public AbfsAHCHttpOperation(final URL url, final List requestHeaders, final Duration connectionTimeout, final Duration readTimeout, - final AbfsApacheHttpClient abfsApacheHttpClient) throws IOException { - super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout); + final AbfsApacheHttpClient abfsApacheHttpClient, + final AbfsClient abfsClient) throws IOException { + super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout, abfsClient); this.isPayloadRequest = HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method) || HTTP_METHOD_POST.equals(method); @@ -310,6 +312,20 @@ public String getResponseHeader(final String headerName) { return null; } + /**{@inheritDoc}*/ + @Override + public Map> getResponseHeaders() { + Map> headers = new HashMap<>(); + if (httpResponse == null) { + return headers; + } + for (Header header : httpResponse.getAllHeaders()) { + headers.computeIfAbsent(header.getName(), k -> new ArrayList<>()) + .add(header.getValue()); + } + return headers; + } + /**{@inheritDoc}*/ @Override protected InputStream getContentInputStream() @@ -370,7 +386,12 @@ public void sendPayload(final byte[] buffer, * Sets the header on the request. */ private void prepareRequest() { + final boolean isEntityBasedRequest + = httpRequestBase instanceof HttpEntityEnclosingRequestBase; for (AbfsHttpHeader header : getRequestHeaders()) { + if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) { + continue; + } httpRequestBase.setHeader(header.getName(), header.getValue()); } } @@ -380,7 +401,12 @@ private void prepareRequest() { public String getRequestProperty(String name) { for (AbfsHttpHeader header : getRequestHeaders()) { if (header.getName().equals(name)) { - return header.getValue(); + String val = header.getValue(); + val = val == null ? EMPTY_STRING : val; + if (EMPTY_STRING.equals(val)) { + continue; + } + return val; } } return EMPTY_STRING; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 07c25b32483d3..e3552b777045c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -18,22 +18,35 @@ package org.apache.hadoop.fs.azurebfs.services; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.parsers.SAXParser; +import javax.xml.parsers.SAXParserFactory; import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; -import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Hashtable; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; @@ -45,6 +58,11 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListXmlParser; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; @@ -52,6 +70,7 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON; @@ -65,18 +84,29 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CONTAINER; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LEASE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LIST; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.METADATA; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOB_ERROR_CODE_END_XML; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOB_ERROR_CODE_START_XML; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOB_ERROR_MESSAGE_END_XML; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOB_ERROR_MESSAGE_START_XML; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOCK_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_COMMITTED_BLOCKS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_HDI_ISFOLDER; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_NAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; @@ -86,6 +116,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.LAST_MODIFIED; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5; @@ -103,6 +134,11 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKLISTTYPE; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_DELIMITER; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_INCLUDE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MARKER; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAX_RESULTS; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PREFIX; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE; /** @@ -271,14 +307,56 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) * @return executed rest operation containing response from server. * @throws AzureBlobFileSystemException if rest operation or response parsing fails. */ - @Override - public AbfsRestOperation listPath(final String relativePath, - final boolean recursive, - final int listMaxResults, - final String continuation, - TracingContext tracingContext) throws AzureBlobFileSystemException { - // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of response handling of blob endpoint APIs. - throw new NotImplementedException("Blob Endpoint Support is not yet implemented"); + public AbfsRestOperation listPath(final String relativePath, final boolean recursive, + final int listMaxResults, final String continuation, TracingContext tracingContext) + throws IOException { + return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, true); + } + + public AbfsRestOperation listPath(final String relativePath, final boolean recursive, + final int listMaxResults, final String continuation, TracingContext tracingContext, + boolean is404CheckRequired) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LIST); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_INCLUDE, METADATA); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_PREFIX, getDirectoryQueryParameter(relativePath)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_MARKER, continuation); + if (!recursive) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH); + } + abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, String.valueOf(listMaxResults)); + appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, abfsUriQueryBuilder); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.ListBlobs, + HTTP_METHOD_GET, + url, + requestHeaders); + + op.execute(tracingContext); + if (isEmptyListResults(op.getResult()) && is404CheckRequired) { + // If the list operation returns no paths, we need to check if the path is a file. + // If it is a file, we need to return the file in the list. + // If it is a non-existing path, we need to throw a FileNotFoundException. + if (relativePath.equals(ROOT_PATH)) { + // Root Always exists as directory. It can be a empty listing. + return op; + } + AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false); + BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus); + AbfsRestOperation listOp = getAbfsRestOperation( + AbfsRestOperationType.ListBlobs, + HTTP_METHOD_GET, + url, + requestHeaders); + listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema); + return listOp; + } + return op; } /** @@ -299,7 +377,7 @@ public AbfsRestOperation createPath(final String path, final String eTag, final ContextEncryptionAdapter contextEncryptionAdapter, final TracingContext tracingContext) throws AzureBlobFileSystemException { - // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of response handling of blob endpoint APIs. + // TODO: [FnsOverBlob][HADOOP-19232] To be implemented as part of ingress support. throw new NotImplementedException("Create Path operation on Blob endpoint yet to be implemented."); } @@ -668,6 +746,26 @@ public AbfsRestOperation setPathProperties(final String path, AbfsRestOperationType.SetPathProperties, HTTP_METHOD_PUT, url, requestHeaders); op.execute(tracingContext); + try { + op.execute(tracingContext); + } catch (AbfsRestOperationException ex) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw ex; + } + // This path could be present as an implicit directory in FNS. + if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isNonEmptyListing(path, tracingContext)) { + // Implicit path found, create a marker blob at this path and set properties. + this.createPath(path, false, false, null, false, null, contextEncryptionAdapter, tracingContext); + // Make sure hdi_isFolder is added to the list of properties to be set. + boolean hdiIsFolderExists = properties.containsKey(XML_TAG_HDI_ISFOLDER); + if (!hdiIsFolderExists) { + properties.put(XML_TAG_HDI_ISFOLDER, TRUE); + } + return this.setPathProperties(path, properties, tracingContext, contextEncryptionAdapter); + } + throw ex; + } return op; } @@ -719,7 +817,7 @@ public AbfsRestOperation getPathStatus(final String path, final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.GetPathStatus, + AbfsRestOperationType.GetBlobProperties, HTTP_METHOD_HEAD, url, requestHeaders); try { op.execute(tracingContext); @@ -728,9 +826,15 @@ public AbfsRestOperation getPathStatus(final String path, if (!op.hasResult()) { throw ex; } - if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isImplicitCheckRequired) { - // This path could be present as an implicit directory in FNS. - // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of implicit directory handling over blob endpoint. + // This path could be present as an implicit directory in FNS. + if (op.getResult().getStatusCode() == HTTP_NOT_FOUND + && isImplicitCheckRequired && isNonEmptyListing(path, tracingContext)) { + // Implicit path found. + AbfsRestOperation successOp = getAbfsRestOperation( + AbfsRestOperationType.GetPathStatus, + HTTP_METHOD_HEAD, url, requestHeaders); + successOp.hardSetGetFileStatusResult(HTTP_OK); + return successOp; } if (op.getResult().getStatusCode() == HTTP_NOT_FOUND) { /* @@ -913,31 +1017,6 @@ public AbfsRestOperation checkAccess(String path, "CheckAccess operation is only supported on HNS enabled Accounts."); } - /** - * Checks if the rest operation results indicate if the path is a directory. - * @param result executed rest operation containing response from server. - * @return True if the path is a directory, False otherwise. - */ - @Override - public boolean checkIsDir(AbfsHttpOperation result) { - String resourceType = result.getResponseHeader(X_MS_META_HDI_ISFOLDER); - return resourceType != null && resourceType.equals(TRUE); - } - - /** - * Returns true if the status code lies in the range of user error. - * In the case of HTTP_CONFLICT for PutBlockList we fall back to DFS and hence - * this retry handling is not needed. - * @param responseStatusCode http response status code. - * @return True or False. - */ - @Override - public boolean checkUserError(int responseStatusCode) { - return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST - && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR - && responseStatusCode != HttpURLConnection.HTTP_CONFLICT); - } - /** * Get Rest Operation for API * Get Block List. @@ -1004,8 +1083,10 @@ public AbfsRestOperation copyBlob(Path sourceBlobPath, requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl)); requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); - return getAbfsRestOperation(AbfsRestOperationType.CopyBlob, HTTP_METHOD_PUT, + final AbfsRestOperation op = getAbfsRestOperation(AbfsRestOperationType.CopyBlob, HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; } /** @@ -1037,16 +1118,204 @@ public AbfsRestOperation deleteBlobPath(final Path blobPath, return op; } - private static String encodeMetadataAttribute(String value) - throws UnsupportedEncodingException { - return value == null ? null - : URLEncoder.encode(value, XMS_PROPERTIES_ENCODING_UNICODE); + /** + * Checks if the rest operation results indicate if the path is a directory. + * @param result executed rest operation containing response from server. + * @return True if the path is a directory, False otherwise. + */ + @Override + public boolean checkIsDir(AbfsHttpOperation result) { + String resourceType = result.getResponseHeader(X_MS_META_HDI_ISFOLDER); + return resourceType != null && resourceType.equals(TRUE); } - private static String decodeMetadataAttribute(String encoded) - throws UnsupportedEncodingException { - return encoded == null ? null - : URLDecoder.decode(encoded, XMS_PROPERTIES_ENCODING_UNICODE); + /** + * Returns true if the status code lies in the range of user error. + * In the case of HTTP_CONFLICT for PutBlockList we fall back to DFS and hence + * this retry handling is not needed. + * @param responseStatusCode http response status code. + * @return True or False. + */ + @Override + public boolean checkUserError(int responseStatusCode) { + return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST + && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR + && responseStatusCode != HttpURLConnection.HTTP_CONFLICT); + } + + /** + * Get the continuation token from the response from BLOB Endpoint Listing. + * Continuation Token will be present in XML List response body. + * @param result The response from the server. + * @return The continuation token. + */ + @Override + public String getContinuationFromResponse(AbfsHttpOperation result) { + BlobListResultSchema listResultSchema = (BlobListResultSchema) result.getListResultSchema(); + return listResultSchema.getNextMarker(); + } + + /** + * Get the User-defined metadata on a path from response headers of + * GetBlobProperties API on Blob Endpoint. + * Blob Endpoint returns each metadata as a separate header. + * @param result The response of GetBlobProperties call from the server. + * @return Hashtable containing metadata key-value pairs. + * @throws InvalidAbfsRestOperationException if parsing fails. + */ + @Override + public Hashtable getXMSProperties(AbfsHttpOperation result) + throws InvalidAbfsRestOperationException { + Hashtable properties = new Hashtable<>(); + Map> responseHeaders = result.getResponseHeaders(); + for (Map.Entry> entry : responseHeaders.entrySet()) { + String name = entry.getKey(); + if (name != null && name.startsWith(X_MS_METADATA_PREFIX)) { + String value; + try { + value = decodeMetadataAttribute(entry.getValue().get(0)); + } catch (UnsupportedEncodingException e) { + throw new InvalidAbfsRestOperationException(e); + } + properties.put(name.substring(X_MS_METADATA_PREFIX.length()), value); + } + } + return properties; + } + + /** + * Parse the XML response body returned by ListBlob API on Blob Endpoint. + * @param stream InputStream contains the response from server. + * @return BlobListResultSchema containing the list of entries. + * @throws IOException if parsing fails. + */ + @Override + public ListResultSchema parseListPathResults(final InputStream stream) throws IOException { + if (stream == null) { + return null; + } + BlobListResultSchema listResultSchema; + try { + final SAXParser saxParser = saxParserThreadLocal.get(); + saxParser.reset(); + listResultSchema = new BlobListResultSchema(); + saxParser.parse(stream, new BlobListXmlParser(listResultSchema, getBaseUrl().toString())); + } catch (SAXException | IOException e) { + throw new RuntimeException(e); + } + + return removeDuplicateEntries(listResultSchema); + } + + /** + * Parse the XML response body returned by GetBlockList API on Blob Endpoint. + * @param stream InputStream contains the response from server. + * @return List of blockIds. + * @throws IOException if parsing fails. + */ + @Override + public List parseBlockListResponse(final InputStream stream) throws IOException { + List blockIdList = new ArrayList<>(); + // Convert the input stream to a Document object + + try { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + Document doc = factory.newDocumentBuilder().parse(stream); + + // Find the CommittedBlocks element and extract the list of block IDs + NodeList committedBlocksList = doc.getElementsByTagName( + XML_TAG_COMMITTED_BLOCKS); + if (committedBlocksList.getLength() > 0) { + Node committedBlocks = committedBlocksList.item(0); + NodeList blockList = committedBlocks.getChildNodes(); + for (int i = 0; i < blockList.getLength(); i++) { + Node block = blockList.item(i); + if (block.getNodeName().equals(XML_TAG_BLOCK_NAME)) { + NodeList nameList = block.getChildNodes(); + for (int j = 0; j < nameList.getLength(); j++) { + Node name = nameList.item(j); + if (name.getNodeName().equals(XML_TAG_NAME)) { + String blockId = name.getTextContent(); + blockIdList.add(blockId); + } + } + } + } + } + } catch (ParserConfigurationException | SAXException e) { + throw new IOException(e); + } + + return blockIdList; + } + + /** + * Parse the XML response body returned by error stream for all blob endpoint APIs. + * @param stream ErrorStream contains the response from server. + * @return StorageErrorResponseSchema containing the error code and message. + * @throws IOException if parsing fails. + */ + @Override + public StorageErrorResponseSchema processStorageErrorResponse(final InputStream stream) throws IOException { + final String data = IOUtils.toString(stream, StandardCharsets.UTF_8); + String storageErrorCode = EMPTY_STRING; + String storageErrorMessage = EMPTY_STRING; + String expectedAppendPos = EMPTY_STRING; + int codeStartFirstInstance = data.indexOf(XML_TAG_BLOB_ERROR_CODE_START_XML); + int codeEndFirstInstance = data.indexOf(XML_TAG_BLOB_ERROR_CODE_END_XML); + if (codeEndFirstInstance != -1 && codeStartFirstInstance != -1) { + storageErrorCode = data.substring(codeStartFirstInstance, + codeEndFirstInstance).replace(XML_TAG_BLOB_ERROR_CODE_START_XML, ""); + } + + int msgStartFirstInstance = data.indexOf(XML_TAG_BLOB_ERROR_MESSAGE_START_XML); + int msgEndFirstInstance = data.indexOf(XML_TAG_BLOB_ERROR_MESSAGE_END_XML); + if (msgEndFirstInstance != -1 && msgStartFirstInstance != -1) { + storageErrorMessage = data.substring(msgStartFirstInstance, + msgEndFirstInstance).replace(XML_TAG_BLOB_ERROR_MESSAGE_START_XML, ""); + } + return new StorageErrorResponseSchema(storageErrorCode, storageErrorMessage, expectedAppendPos); + } + + /** + * Encode the value of the attribute to be set as metadata. + * Blob Endpoint support Unicode characters in metadata values. + * @param value to be encoded. + * @return encoded value. + * @throws UnsupportedEncodingException if encoding fails. + */ + @Override + public byte[] encodeAttribute(String value) throws UnsupportedEncodingException { + return value.getBytes(XMS_PROPERTIES_ENCODING_UNICODE); + } + + /** + * Decode the value of the attribute from the metadata. + * Blob Endpoint support Unicode characters in metadata values. + * @param value to be decoded. + * @return decoded value. + * @throws UnsupportedEncodingException if decoding fails. + */ + @Override + public String decodeAttribute(byte[] value) throws UnsupportedEncodingException { + return new String(value, XMS_PROPERTIES_ENCODING_UNICODE); + } + + /** + * Blob Endpoint Supports Delimiter based listing where the + * directory path to be listed must end with a Forward Slash. + * @param path directory path to be listed. + * @return directory path with forward slash at end. + */ + public static String getDirectoryQueryParameter(final String path) { + String directory = AbfsClient.getDirectoryQueryParameter(path); + if (directory.isEmpty()) { + return directory; + } + if (!directory.endsWith(FORWARD_SLASH)) { + directory = directory + FORWARD_SLASH; + } + return directory; } /** @@ -1066,6 +1335,14 @@ private boolean isPureASCII(String value) throws CharacterCodingException { return true; } + /** + * Get the list of headers to be set for metadata properties. + * Blob Endpoint accepts each metadata as a separate header. + * @param properties to be set as metadata + * @return List of headers to be set. + * @throws AbfsRestOperationException if encoding fails. + * @throws CharacterCodingException if value is not pure ASCII. + */ private List getMetadataHeadersList(final Hashtable properties) throws AbfsRestOperationException, CharacterCodingException { List metadataRequestHeaders = new ArrayList<>(); @@ -1084,4 +1361,112 @@ private List getMetadataHeadersList(final Hashtable saxParserThreadLocal = ThreadLocal.withInitial(() -> { + SAXParserFactory factory = SAXParserFactory.newInstance(); + factory.setNamespaceAware(true); + try { + return factory.newSAXParser(); + } catch (SAXException e) { + throw new RuntimeException("Unable to create SAXParser", e); + } catch (ParserConfigurationException e) { + throw new RuntimeException("Check parser configuration", e); + } + }); + + /** + * This is to handle duplicate listing entries returned by Blob Endpoint for + * implicit paths that also has a marker file created for them. + * This will retain entry corresponding to marker file and remove the BlobPrefix entry. + * @param listResultSchema List of entries returned by Blob Endpoint. + * @return List of entries after removing duplicates. + */ + private BlobListResultSchema removeDuplicateEntries(BlobListResultSchema listResultSchema) { + List uniqueEntries = new ArrayList<>(); + TreeMap nameToEntryMap = new TreeMap<>(); + + for (BlobListResultEntrySchema entry : listResultSchema.paths()) { + if (StringUtils.isNotEmpty(entry.eTag())) { + // This is a blob entry. It is either a file or a marker blob. + // In both cases we will add this. + nameToEntryMap.put(entry.name(), entry); + } else { + // This is a BlobPrefix entry. It is a directory with file inside + // This might have already been added as a marker blob. + if (!nameToEntryMap.containsKey(entry.name())) { + nameToEntryMap.put(entry.name(), entry); + } + } + } + + uniqueEntries.addAll(nameToEntryMap.values()); + listResultSchema.withPaths(uniqueEntries); + return listResultSchema; + } + + /** + * When listing is done on a file, Blob Endpoint returns the empty listing + * but DFS Endpoint returns the file status as one of the entries. + * This is to convert file status into ListResultSchema. + * @param relativePath + * @param pathStatus + * @return + */ + private BlobListResultSchema getListResultSchemaFromPathStatus(String relativePath, AbfsRestOperation pathStatus) { + BlobListResultSchema listResultSchema = new BlobListResultSchema(); + + BlobListResultEntrySchema entrySchema = new BlobListResultEntrySchema(); + entrySchema.setUrl(pathStatus.getUrl().toString()); + entrySchema.setPath(new Path(relativePath)); + entrySchema.setName(relativePath.charAt(0) == '/' ? relativePath.substring(1) : relativePath); + entrySchema.setIsDirectory(checkIsDir(pathStatus.getResult())); + entrySchema.setContentLength(Long.parseLong(pathStatus.getResult().getResponseHeader(CONTENT_LENGTH))); + entrySchema.setLastModifiedTime( + pathStatus.getResult().getResponseHeader(LAST_MODIFIED)); + entrySchema.setETag(AzureBlobFileSystemStore.extractEtagHeader(pathStatus.getResult())); + + // If listing is done on explicit directory, do not include directory in the listing. + if (!entrySchema.isDirectory()) { + listResultSchema.paths().add(entrySchema); + } + return listResultSchema; + } + + private static String encodeMetadataAttribute(String value) + throws UnsupportedEncodingException { + return value == null ? null + : URLEncoder.encode(value, StandardCharsets.UTF_8.name()); + } + + private static String decodeMetadataAttribute(String encoded) + throws UnsupportedEncodingException { + return encoded == null ? null + : java.net.URLDecoder.decode(encoded, StandardCharsets.UTF_8.name()); + } + + private boolean isNonEmptyListing(String path, + TracingContext tracingContext) throws AzureBlobFileSystemException { + AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext, false); + return !isEmptyListResults(listOp.getResult()); + } + + /** + * Check if the list call returned empty results without any continuation token. + * @param result The response of listing API from the server. + * @return True if empty results without continuation token. + */ + private boolean isEmptyListResults(AbfsHttpOperation result) { + boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful + result.getListResultSchema() != null && // Parsing of list response was successful + result.getListResultSchema().paths().isEmpty() && // No paths were returned + result.getListResultSchema() instanceof BlobListResultSchema && // It is safe to typecast to BlobListResultSchema + ((BlobListResultSchema) result.getListResultSchema()).getNextMarker() == null; // No continuation token was returned + if (isEmptyList) { + LOG.debug("List call returned empty results without any continuation token."); + return true; + } else if (result != null && !(result.getListResultSchema() instanceof BlobListResultSchema)) { + throw new RuntimeException("List call returned unexpected results over Blob Endpoint."); + } + return false; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index b51131b959854..0eedeeea45968 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.InetAddress; @@ -48,7 +49,11 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema; import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions; @@ -383,7 +388,7 @@ AbfsThrottlingIntercept getIntercept() { * @return common request headers */ protected List createCommonHeaders(ApiVersion xMsVersion) { - final List requestHeaders = new ArrayList(); + final List requestHeaders = new ArrayList<>(); requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion.toString())); requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET, UTF_8)); requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING)); @@ -624,7 +629,7 @@ public abstract AbfsClientRenameResult renamePath( * @param result executed rest operation containing response from server. * @return True if the path is a directory, False otherwise. */ - protected abstract boolean checkIsDir(AbfsHttpOperation result); + public abstract boolean checkIsDir(AbfsHttpOperation result); /** * Creates a rest operation for rename. @@ -1616,4 +1621,62 @@ protected String getUserAgent() { protected boolean isRenameResilience() { return renameResilience; } + + /** + * Parses response of Listing API from server based on Endpoint used. + * @param stream InputStream of the response + * @return ListResultSchema + * @throws IOException if parsing fails + */ + public abstract ListResultSchema parseListPathResults(InputStream stream) throws IOException; + + /** + * Parses response of Get Block List from server based on Endpoint used. + * @param stream InputStream of the response + * @return List of block IDs + * @throws IOException if parsing fails + */ + public abstract List parseBlockListResponse(InputStream stream) throws IOException; + + /** + * Parses response from ErrorStream returned by server based on Endpoint used. + * @param stream InputStream of the response + * @return StorageErrorResponseSchema + * @throws IOException if parsing fails + */ + public abstract StorageErrorResponseSchema processStorageErrorResponse(InputStream stream) throws IOException; + + /** + * Returns continuation token from server response based on Endpoint used. + * @param result response from server + * @return continuation token + */ + public abstract String getContinuationFromResponse(AbfsHttpOperation result); + + /** + * Returns user-defined metadata from server response based on Endpoint used. + * @param result response from server + * @return user-defined metadata key-value pairs + * @throws InvalidFileSystemPropertyException if parsing fails + * @throws InvalidAbfsRestOperationException if parsing fails + */ + public abstract Hashtable getXMSProperties(AbfsHttpOperation result) + throws InvalidFileSystemPropertyException, + InvalidAbfsRestOperationException; + + /** + * Encode attribute with encoding based on Endpoint used. + * @param value to be encoded + * @return encoded value + * @throws UnsupportedEncodingException if encoding fails + */ + public abstract byte[] encodeAttribute(String value) throws UnsupportedEncodingException; + + /** + * Decode attribute with decoding based on Endpoint used. + * @param value to be decoded + * @return decoded value + * @throws UnsupportedEncodingException if decoding fails + */ + public abstract String decodeAttribute(byte[] value) throws UnsupportedEncodingException; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 05173443cdb23..39aaf34db0d57 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -154,6 +154,7 @@ public void updateMetrics(AbfsRestOperationType operationType, switch (operationType) { case Append: + case PutBlock: contentLength = abfsHttpOperation.getBytesSent(); if (contentLength == 0) { /* @@ -171,6 +172,7 @@ public void updateMetrics(AbfsRestOperationType operationType, } break; case ReadFile: + case GetBlob: String range = abfsHttpOperation.getRequestProperty(HttpHeaderConfigurations.RANGE); contentLength = getContentLengthIfKnown(range); if (contentLength > 0) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 7d50260f7bad3..75081a44c25a3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -19,17 +19,26 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.UUID; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; @@ -40,7 +49,11 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; @@ -1268,9 +1281,142 @@ public boolean checkIsDir(AbfsHttpOperation result) { @Override public boolean checkUserError(int responseStatusCode) { return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST - && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR); + && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR + && responseStatusCode != HttpURLConnection.HTTP_CONFLICT); + } + + /** + * Get the continuation token from the response from DFS Endpoint Listing. + * Continuation Token will be present as a response header. + * @param result The response from the server. + * @return The continuation token. + */ + @Override + public String getContinuationFromResponse(AbfsHttpOperation result) { + return result.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + } + + /** + * Get the user defined metadata from the response from DFS Endpoint API. + * @param result response from server + * @return user defined metadata as key value pairs + * @throws InvalidFileSystemPropertyException if parsing fails + * @throws InvalidAbfsRestOperationException if decoding fails + */ + @Override + public Hashtable getXMSProperties(AbfsHttpOperation result) + throws InvalidFileSystemPropertyException, InvalidAbfsRestOperationException { + return parseCommaSeparatedXmsProperties(result.getResponseHeader(X_MS_PROPERTIES)); + } + + /** + * Parse the list file response from DFS ListPath API in Json format + * @param stream InputStream contains the list results. + * @throws IOException if parsing fails. + */ + @Override + public ListResultSchema parseListPathResults(final InputStream stream) throws IOException { + DfsListResultSchema listResultSchema; + try { + final ObjectMapper objectMapper = new ObjectMapper(); + listResultSchema = objectMapper.readValue(stream, DfsListResultSchema.class); + } catch (IOException ex) { + LOG.error("Unable to deserialize list results", ex); + throw ex; + } + return listResultSchema; + } + + @Override + public List parseBlockListResponse(final InputStream stream) throws IOException { + return null; + } + + /** + * When the request fails, this function is used to parse the responseAbfsHttpClient.LOG.debug("ExpectedError: ", ex); + * and extract the storageErrorCode and storageErrorMessage. Any errors + * encountered while attempting to process the error response are logged, + * but otherwise ignored. + * + * For storage errors, the response body *usually* has the following format: + * + * { + * "error": + * { + * "code": "string", + * "message": "string" + * } + * } + * + */ + @Override + public StorageErrorResponseSchema processStorageErrorResponse(final InputStream stream) throws IOException { + String storageErrorCode = "", storageErrorMessage = "", expectedAppendPos = ""; + try { + JsonFactory jf = new JsonFactory(); + try (JsonParser jp = jf.createParser(stream)) { + String fieldName, fieldValue; + jp.nextToken(); // START_OBJECT - { + jp.nextToken(); // FIELD_NAME - "error": + jp.nextToken(); // START_OBJECT - { + jp.nextToken(); + while (jp.hasCurrentToken()) { + if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { + fieldName = jp.getCurrentName(); + jp.nextToken(); + fieldValue = jp.getText(); + switch (fieldName) { + case "code": + storageErrorCode = fieldValue; + break; + case "message": + storageErrorMessage = fieldValue; + break; + case "ExpectedAppendPos": + expectedAppendPos = fieldValue; + break; + default: + break; + } + } + jp.nextToken(); + } + } + } catch (IOException e) { + throw e; + } + return new StorageErrorResponseSchema(storageErrorCode, storageErrorMessage, expectedAppendPos); + } + + /** + * Encode the value using ASCII encoding. + * @param value to be encoded + * @return encoded value + * @throws UnsupportedEncodingException if encoding fails + */ + @Override + public byte[] encodeAttribute(String value) throws + UnsupportedEncodingException { + return value.getBytes(XMS_PROPERTIES_ENCODING_ASCII); + } + + /** + * Decode the value using ASCII encoding. + * @param value to be decoded + * @return decoded value + * @throws UnsupportedEncodingException if encoding fails + */ + @Override + public String decodeAttribute(byte[] value) throws UnsupportedEncodingException { + return new String(value, XMS_PROPERTIES_ENCODING_ASCII); } + /** + * Convert the properties hashtable to a comma separated string. + * @param properties hashtable containing the properties key value pairs + * @return comma separated string containing the properties key value pairs + * @throws CharacterCodingException if encoding fails + */ private String convertXmsPropertiesToCommaSeparatedString(final Map properties) throws CharacterCodingException { StringBuilder commaSeparatedProperties = new StringBuilder(); @@ -1301,4 +1447,49 @@ private String convertXmsPropertiesToCommaSeparatedString(final Map parseCommaSeparatedXmsProperties(String xMsProperties) throws + InvalidFileSystemPropertyException, InvalidAbfsRestOperationException { + Hashtable properties = new Hashtable<>(); + + final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING_ASCII).newDecoder(); + + if (xMsProperties != null && !xMsProperties.isEmpty()) { + String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA); + + if (userProperties.length == 0) { + return properties; + } + + for (String property : userProperties) { + if (property.isEmpty()) { + throw new InvalidFileSystemPropertyException(xMsProperties); + } + + String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2); + if (nameValue.length != 2) { + throw new InvalidFileSystemPropertyException(xMsProperties); + } + + byte[] decodedValue = Base64.decode(nameValue[1]); + + final String value; + try { + value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString(); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + properties.put(nameValue[0], value); + } + } + + return properties; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index e2ce5c628a4b6..b179023953468 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -24,13 +24,10 @@ import java.net.URL; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +36,16 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EQUAL; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_META_HDI_ISFOLDER; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP; + /** * Base Http operation class for orchestrating server IO calls. Child classes would * define the certain orchestration implementation on the basis of network library used. @@ -61,6 +66,7 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable { private final String method; private final URL url; private String maskedUrl; + private AbfsClient client; private String maskedEncodedUrl; private int statusCode; private String statusDescription; @@ -69,6 +75,7 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable { private String requestId = ""; private String expectedAppendPos = ""; private ListResultSchema listResultSchema = null; + private List blockIdList = null; // metrics private int bytesSent; @@ -113,13 +120,14 @@ public AbfsHttpOperation( final String method, final List requestHeaders, final Duration connectionTimeout, - final Duration readTimeout) { + final Duration readTimeout, AbfsClient abfsClient) { this.log = log; this.url = url; this.method = method; this.requestHeaders = requestHeaders; this.connectionTimeout = (int) connectionTimeout.toMillis(); this.readTimeout = (int) readTimeout.toMillis(); + this.client = abfsClient; } /** @@ -220,6 +228,8 @@ public ListResultSchema getListResultSchema() { */ public abstract String getResponseHeader(String httpHeader); + public abstract Map> getResponseHeaders(); + // Returns a trace message for the request @Override public String toString() { @@ -381,9 +391,12 @@ final void parseResponse(final byte[] buffer, // this is a list operation and need to retrieve the data // need a better solution - if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) - && buffer == null) { - parseListFilesResponse(stream); + if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) && buffer == null) { + if (url.toString().contains(QUERY_PARAM_COMP + EQUAL + BLOCKLIST)) { + parseBlockListResponse(stream); + } else { + parseListFilesResponse(stream); + } } else { if (buffer != null) { while (totalBytesRead < length) { @@ -447,35 +460,10 @@ private void processStorageErrorResponse() { if (stream == null) { return; } - JsonFactory jf = new JsonFactory(); - try (JsonParser jp = jf.createParser(stream)) { - String fieldName, fieldValue; - jp.nextToken(); // START_OBJECT - { - jp.nextToken(); // FIELD_NAME - "error": - jp.nextToken(); // START_OBJECT - { - jp.nextToken(); - while (jp.hasCurrentToken()) { - if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); - jp.nextToken(); - fieldValue = jp.getText(); - switch (fieldName) { - case "code": - storageErrorCode = fieldValue; - break; - case "message": - storageErrorMessage = fieldValue; - break; - case "ExpectedAppendPos": - expectedAppendPos = fieldValue; - break; - default: - break; - } - } - jp.nextToken(); - } - } + StorageErrorResponseSchema storageErrorResponse = client.processStorageErrorResponse(stream); + storageErrorCode = storageErrorResponse.getStorageErrorCode(); + storageErrorMessage = storageErrorResponse.getStorageErrorMessage(); + expectedAppendPos = storageErrorResponse.getExpectedAppendPos(); } catch (IOException ex) { // Ignore errors that occur while attempting to parse the storage // error, since the response may have been handled by the HTTP driver @@ -497,25 +485,22 @@ private void processStorageErrorResponse() { * @param stream InputStream contains the list results. * @throws IOException if the response cannot be deserialized. */ - private void parseListFilesResponse(final InputStream stream) - throws IOException { - if (stream == null) { + private void parseListFilesResponse(final InputStream stream) throws IOException { + if (stream == null || listResultSchema != null) { return; } + listResultSchema = client.parseListPathResults(stream); + } - if (listResultSchema != null) { - // already parse the response + private void parseBlockListResponse(final InputStream stream) throws IOException { + if (stream == null || blockIdList != null) { return; } + blockIdList = client.parseBlockListResponse(stream); + } - try { - final ObjectMapper objectMapper = new ObjectMapper(); - this.listResultSchema = objectMapper.readValue(stream, - ListResultSchema.class); - } catch (IOException ex) { - log.error("Unable to deserialize list results", ex); - throw ex; - } + public List getBlockIdList() { + return blockIdList; } /** @@ -620,7 +605,7 @@ protected void setRequestId() { requestId = getResponseHeader( HttpHeaderConfigurations.X_MS_REQUEST_ID); if (requestId == null) { - requestId = AbfsHttpConstants.EMPTY_STRING; + requestId = EMPTY_STRING; } } @@ -761,6 +746,11 @@ public String getResponseHeader(final String httpHeader) { return ""; } + @Override + public Map> getResponseHeaders() { + return new HashMap<>(); + } + @Override public void sendPayload(final byte[] buffer, final int offset, @@ -769,4 +759,195 @@ public void sendPayload(final byte[] buffer, } } + + /** + * Dummy Result to be returned for getFileStatus for implicit directory paths. + * Blob Endpoint is not capable of understanding implicit paths and handling + * on client side is needed to make sure HDFS compatibility holds. + */ + public static class AbfsHttpOperationWithFixedResultForGetFileStatus extends AbfsHttpOperation { + + public AbfsHttpOperationWithFixedResultForGetFileStatus(final URL url, + final String method, + final int httpStatus) { + super(url, method, httpStatus); + } + + @Override + public String getResponseHeader(final String httpHeader) { + // Directories on FNS-Blob are identified by a special metadata header. + if (httpHeader.equals(X_MS_META_HDI_ISFOLDER)) { + return TRUE; + } + return EMPTY_STRING; + } + + @Override + public Map> getResponseHeaders() { + return new HashMap<>(); + } + + @Override + public void processResponse(final byte[] buffer, + final int offset, + final int length) + throws IOException { + + } + + @Override + public void setRequestProperty(final String key, final String value) { + + } + + @Override + protected InputStream getContentInputStream() throws IOException { + return null; + } + + @Override + protected InputStream getErrorStream() throws IOException { + return null; + } + + @Override + String getConnProperty(final String key) { + return null; + } + + @Override + URL getConnUrl() { + return null; + } + + @Override + Integer getConnResponseCode() throws IOException { + return null; + } + + @Override + String getConnResponseMessage() throws IOException { + return null; + } + + @Override + Map> getRequestProperties() { + return null; + } + + @Override + String getRequestProperty(final String headerName) { + return null; + } + + @Override + public String getTracingContextSuffix() { + return null; + } + + @Override + public void sendPayload(final byte[] buffer, + final int offset, + final int length) + throws IOException { + + } + } + + /** + * Dummy Result to be returned for listBlobs for paths existing as files. + * Blob Endpoint listing returns empty results and client handling + * is needed to make sure HDFS compatibility holds. + */ + public static class AbfsHttpOperationWithFixedResultForGetListStatus extends AbfsHttpOperation { + private final ListResultSchema hardSetListResultSchema; + + public AbfsHttpOperationWithFixedResultForGetListStatus(final URL url, + final String method, + final int httpStatus, + final ListResultSchema listResult) { + super(url, method, httpStatus); + hardSetListResultSchema = listResult; + } + + @Override + public ListResultSchema getListResultSchema() { + return hardSetListResultSchema; + } + + @Override + public void processResponse(final byte[] buffer, + final int offset, + final int length) + throws IOException { + + } + + @Override + public void setRequestProperty(final String key, final String value) { + + } + + @Override + protected InputStream getContentInputStream() throws IOException { + return null; + } + + @Override + protected InputStream getErrorStream() throws IOException { + return null; + } + + @Override + String getConnProperty(final String key) { + return null; + } + + @Override + URL getConnUrl() { + return null; + } + + @Override + Integer getConnResponseCode() throws IOException { + return null; + } + + @Override + String getConnResponseMessage() throws IOException { + return null; + } + + @Override + Map> getRequestProperties() { + return null; + } + + @Override + String getRequestProperty(final String headerName) { + return null; + } + + @Override + public String getTracingContextSuffix() { + return null; + } + + @Override + public String getResponseHeader(final String httpHeader) { + return ""; + } + + @Override + public Map> getResponseHeaders() { + return new HashMap<>(); + } + + @Override + public void sendPayload(final byte[] buffer, + final int offset, + final int length) + throws IOException { + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java index 9628e8e338028..20fe215e03954 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java @@ -62,15 +62,17 @@ public class AbfsJdkHttpOperation extends AbfsHttpOperation { * @param requestHeaders The HTTP request headers.READ_TIMEOUT * @param connectionTimeout The Connection Timeout value to be used while establishing http connection * @param readTimeout The Read Timeout value to be used with http connection while making a request + * @param abfsClient The AbfsClient instance. * @throws IOException if an error occurs. */ public AbfsJdkHttpOperation(final URL url, final String method, final List requestHeaders, final Duration connectionTimeout, - final Duration readTimeout) + final Duration readTimeout, + final AbfsClient abfsClient) throws IOException { - super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout); + super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout, abfsClient); this.connection = openConnection(); if (this.connection instanceof HttpsURLConnection) { @@ -96,6 +98,12 @@ public String getResponseHeader(String httpHeader) { return connection.getHeaderField(httpHeader); } + /**{@inheritDoc}*/ + @Override + public Map> getResponseHeaders() { + return connection.getHeaderFields(); + } + /**{@inheritDoc}*/ public void sendPayload(byte[] buffer, int offset, int length) throws IOException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 8533d37f83e6d..3ed3420a90603 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; @@ -47,6 +48,7 @@ import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics; import org.apache.http.impl.execchain.RequestAbortedException; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.PUT_BLOCK_LIST; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO; import static org.apache.hadoop.util.Time.now; @@ -130,6 +132,25 @@ public void hardSetResult(int httpStatus) { this.method, httpStatus); } + /** + * For setting dummy result of getFileStatus for implicit paths. + * @param httpStatus http status code to be set. + */ + public void hardSetGetFileStatusResult(int httpStatus) { + result = new AbfsHttpOperation.AbfsHttpOperationWithFixedResultForGetFileStatus(this.url, + this.method, httpStatus); + } + + /** + * For setting dummy result of listPathStatus for file paths. + * @param httpStatus http status code to be set. + * @param listResultSchema list result schema to be set. + */ + public void hardSetGetListStatusResult(int httpStatus, final ListResultSchema listResultSchema) { + result = new AbfsHttpOperation.AbfsHttpOperationWithFixedResultForGetListStatus(this.url, + this.method, httpStatus, listResultSchema); + } + public URL getUrl() { return url; } @@ -217,7 +238,7 @@ String getSasToken() { * @param requestHeaders The HTTP request headers. * @param buffer For uploads, this is the request entity body. For downloads, * this will hold the response entity body. - * @param bufferOffset An offset into the buffer where the data beings. + * @param bufferOffset An offset into the buffer where the data begins. * @param bufferLength The length of the data in the buffer. * @param sasToken A sasToken for optional re-use by AbfsInputStream/AbfsOutputStream. */ @@ -387,7 +408,9 @@ private boolean executeHttpOperation(final int retryCount, if (hasRequestBody) { httpOperation.sendPayload(buffer, bufferOffset, bufferLength); incrementCounter(AbfsStatistic.SEND_REQUESTS, 1); - incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength); + if (!(operationType.name().equals(PUT_BLOCK_LIST))) { + incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength); + } } httpOperation.processResponse(buffer, bufferOffset, bufferLength); if (!isThrottledRequest && httpOperation.getStatusCode() @@ -573,7 +596,7 @@ private boolean isApacheClientUsable() { AbfsJdkHttpOperation createAbfsHttpOperation() throws IOException { return new AbfsJdkHttpOperation(url, method, requestHeaders, Duration.ofMillis(client.getAbfsConfiguration().getHttpConnectionTimeout()), - Duration.ofMillis(client.getAbfsConfiguration().getHttpReadTimeout())); + Duration.ofMillis(client.getAbfsConfiguration().getHttpReadTimeout()), client); } @VisibleForTesting @@ -581,7 +604,7 @@ AbfsAHCHttpOperation createAbfsAHCHttpOperation() throws IOException { return new AbfsAHCHttpOperation(url, method, requestHeaders, Duration.ofMillis(client.getAbfsConfiguration().getHttpConnectionTimeout()), Duration.ofMillis(client.getAbfsConfiguration().getHttpReadTimeout()), - client.getAbfsApacheHttpClient()); + client.getAbfsApacheHttpClient(), client); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java index dc070a1d405d8..f3c08c4a30036 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java @@ -75,7 +75,7 @@ public interface ListingSupport { * result. * @param continuation Contiuation token. null means start rom the begining. * @param tracingContext TracingContext instance to track identifiers - * @return Continuation tokem + * @return Continuation token * @throws IOException in case of error */ String listStatus(Path path, String startFrom, List fileStatuses, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java index c769186692b07..5af049176e855 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java @@ -182,7 +182,7 @@ public static String getMaskedUrl(URL url) { */ public static URL changeUrlFromBlobToDfs(URL url) throws InvalidUriException { try { - url = new URL(url.toString().replace(ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME)); + url = new URL(replacedUrl(url.toString(), ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME)); } catch (MalformedURLException ex) { throw new InvalidUriException(url.toString()); } @@ -198,13 +198,38 @@ public static URL changeUrlFromBlobToDfs(URL url) throws InvalidUriException { */ public static URL changeUrlFromDfsToBlob(URL url) throws InvalidUriException { try { - url = new URL(url.toString().replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME)); + url = new URL(replacedUrl(url.toString(), ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME)); } catch (MalformedURLException ex) { throw new InvalidUriException(url.toString()); } return url; } + /** + * Replaces the oldString with newString in the baseUrl. + * It will extract the account url path to make sure we do not replace any + * matching string in blob path or any other part of url + * @param baseUrl the url to be updated. + * @param oldString the string to be replaced. + * @param newString the string to be replaced with. + * @return updated URL + */ + private static String replacedUrl(String baseUrl, String oldString, String newString) { + int startIndex = baseUrl.toString().indexOf("//") + 2; + int endIndex = baseUrl.toString().indexOf("/", startIndex); + if (oldString == null || newString == null|| startIndex < 0 + || endIndex > baseUrl.length() || startIndex > endIndex) { + throw new IllegalArgumentException("Invalid input or indices"); + } + StringBuilder sb = new StringBuilder(baseUrl); + int targetIndex = sb.indexOf(oldString, startIndex); + if (targetIndex == -1 || targetIndex >= endIndex) { + return baseUrl; // target not found within the specified range + } + sb.replace(targetIndex, targetIndex + oldString.length(), newString); + return sb.toString(); + } + private UriUtils() { } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java index aef5482248bd0..baa57da288160 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java @@ -108,7 +108,7 @@ public void testListPathWithValidListMaxResultsValues() directory.toString(), false, getListMaxResults(), null, getTestTracingContext(getFileSystem(), true)); - List list = op.getResult().getListResultSchema().paths(); + List list = op.getResult().getListResultSchema().paths(); String continuationToken = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); if (continuationToken == null) { @@ -143,7 +143,7 @@ public void testListPathWithValueGreaterThanServerMaximum() directory.toString(), false, getListMaxResults(), null, getTestTracingContext(getFileSystem(), true)); - List list = op.getResult().getListResultSchema().paths(); + List list = op.getResult().getListResultSchema().paths(); String continuationToken = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); if (continuationToken == null) { @@ -175,7 +175,7 @@ public void testListPathWithInvalidListMaxResultsValues() throws Exception { } } - private List listPath(String directory) + private List listPath(String directory) throws IOException { return getFileSystem().getAbfsClient() .listPath(directory, false, getListMaxResults(), null, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index e559a03454bf8..fdc7e0a3dafe1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; @@ -261,6 +262,9 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception { // Case 2: Mimic retried case // Idempotency check on Delete always returns success + StorageErrorResponseSchema storageErrorResponse = new StorageErrorResponseSchema( + "NotFound", "NotFound", "NotFound"); + Mockito.doReturn(storageErrorResponse).when(mockClient).processStorageErrorResponse(any()); AbfsRestOperation idempotencyRetOp = Mockito.spy(ITestAbfsClient.getRestOp( DeletePath, mockClient, HTTP_METHOD_DELETE, ITestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"), diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java index 8cdd355e00791..29eb05ef97899 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java @@ -40,6 +40,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultEntrySchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultSchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; @@ -136,18 +138,18 @@ public void testListPathTracingContext() throws Exception { AbfsClientTestUtil.setMockAbfsRestOperationForListPathOperation(spiedClient, (httpOperation) -> { - ListResultEntrySchema entry = new ListResultEntrySchema() + ListResultEntrySchema entry = new DfsListResultEntrySchema() .withName("a") .withIsDirectory(true); List paths = new ArrayList<>(); paths.add(entry); paths.clear(); - entry = new ListResultEntrySchema() + entry = new DfsListResultEntrySchema() .withName("abc.txt") .withIsDirectory(false); paths.add(entry); - ListResultSchema schema1 = new ListResultSchema().withPaths(paths); - ListResultSchema schema2 = new ListResultSchema().withPaths(paths); + ListResultSchema schema1 = new DfsListResultSchema().withPaths(paths); + ListResultSchema schema2 = new DfsListResultSchema().withPaths(paths); when(httpOperation.getListResultSchema()).thenReturn(schema1) .thenReturn(schema2); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java index 1a663ec3c93c5..36c38e80b79ce 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java @@ -108,7 +108,7 @@ private byte[] getUserDelegationKey(String accountName, String appID, String app requestBody.append(""); AbfsJdkHttpOperation op = new AbfsJdkHttpOperation(url, method, requestHeaders, - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString()); op.sendPayload(requestBuffer, 0, requestBuffer.length); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index ed1bfc38cbbf1..7a05bd4129d58 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception; @@ -160,9 +161,14 @@ private String getUserAgentString(AbfsConfiguration config, boolean includeSSLProvider) throws IOException, URISyntaxException { AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build(); - // TODO: [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready. - AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"), null, - config, (AccessTokenProvider) null, null, abfsClientContext); + AbfsClient client; + if (AbfsServiceType.DFS.equals(config.getFsConfiguredServiceType())) { + client = new AbfsDfsClient(new URL("https://azure.com"), null, + config, (AccessTokenProvider) null, null, abfsClientContext); + } else { + client = new AbfsBlobClient(new URL("https://azure.com"), null, + config, (AccessTokenProvider) null, null, abfsClientContext); + } String sslProviderName = null; if (includeSSLProvider) { sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory() @@ -364,8 +370,61 @@ public static AbfsClient createTestClientFromCurrentContext( .build(); // Create test AbfsClient - // TODO : [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready. - AbfsClient testClient = new AbfsDfsClient( + AbfsClient testClient; + if (AbfsServiceType.DFS.equals(abfsConfig.getFsConfiguredServiceType())) { + testClient = new AbfsDfsClient( + baseAbfsClientInstance.getBaseUrl(), + (currentAuthType == AuthType.SharedKey + ? new SharedKeyCredentials( + abfsConfig.getAccountName().substring(0, + abfsConfig.getAccountName().indexOf(DOT)), + abfsConfig.getStorageAccountKey()) + : null), + abfsConfig, + (currentAuthType == AuthType.OAuth + ? abfsConfig.getTokenProvider() + : null), + null, + abfsClientContext); + } else { + testClient = new AbfsBlobClient( + baseAbfsClientInstance.getBaseUrl(), + (currentAuthType == AuthType.SharedKey + ? new SharedKeyCredentials( + abfsConfig.getAccountName().substring(0, + abfsConfig.getAccountName().indexOf(DOT)), + abfsConfig.getStorageAccountKey()) + : null), + abfsConfig, + (currentAuthType == AuthType.OAuth + ? abfsConfig.getTokenProvider() + : null), + null, + abfsClientContext); + } + + return testClient; + } + + public static AbfsClient createBlobClientFromCurrentContext( + AbfsClient baseAbfsClientInstance, + AbfsConfiguration abfsConfig) throws IOException, URISyntaxException { + AuthType currentAuthType = abfsConfig.getAuthType( + abfsConfig.getAccountName()); + + AbfsPerfTracker tracker = new AbfsPerfTracker("test", + abfsConfig.getAccountName(), + abfsConfig); + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); + + AbfsClientContext abfsClientContext = + new AbfsClientContextBuilder().withAbfsPerfTracker(tracker) + .withExponentialRetryPolicy( + new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries())) + .withAbfsCounters(abfsCounters) + .build(); + + AbfsClient testClient = new AbfsBlobClient( baseAbfsClientInstance.getBaseUrl(), (currentAuthType == AuthType.SharedKey ? new SharedKeyCredentials( @@ -377,7 +436,7 @@ public static AbfsClient createTestClientFromCurrentContext( (currentAuthType == AuthType.OAuth ? abfsConfig.getTokenProvider() : null), - null, + null, abfsClientContext); return testClient; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java index f45a333fae1f7..7ff95c6565f98 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java @@ -77,7 +77,7 @@ public void verifyDisablingOfTracker() throws Exception { try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller", "disablingCallee")) { AbfsJdkHttpOperation op = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); tracker.registerResult(op).registerSuccess(true); } @@ -96,7 +96,7 @@ public void verifyTrackingForSingletonLatencyRecords() throws Exception { List> tasks = new ArrayList<>(); AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -136,7 +136,7 @@ public void verifyTrackingForAggregateLatencyRecords() throws Exception { List> tasks = new ArrayList<>(); AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -176,7 +176,7 @@ public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exceptio AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -212,7 +212,7 @@ public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exceptio AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -277,7 +277,7 @@ public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -312,7 +312,7 @@ public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -373,7 +373,7 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false); AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); verifyNoException(abfsPerfTrackerDisabled); verifyNoException(abfsPerfTrackerEnabled); @@ -382,7 +382,7 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception { Instant testInstant = Instant.now(); final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList(), - Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT)); + Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null); try ( AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestUriUtils.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestUriUtils.java index 25d3f7caa4f39..80d2f70766ab4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestUriUtils.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestUriUtils.java @@ -18,12 +18,14 @@ package org.apache.hadoop.fs.azurebfs.utils; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; @@ -31,6 +33,8 @@ import org.apache.http.client.utils.URLEncodedUtils; import org.apache.http.message.BasicNameValuePair; +import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs; +import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromDfsToBlob; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -132,4 +136,60 @@ public void testMaskUrlQueryParameters() throws Exception { .maskUrlQueryParameters(keyValueList, fullMask, partialMask)); //no mask } + + @Test + public void testConvertUrlFromDfsToBlob() throws Exception{ + List inputUrls = Arrays.asList( + "https://accountName.dfs.core.windows.net/containerName", + "https://accountName.blob.core.windows.net/containerName", // Already blob will remain blob + "https:/accountName.dfs.core.windows.net/containerName", // Invalid URL will be returned as it is + "https://accountNamedfs.dfs.core.windows.net/containerName", + "https://accountNameblob.dfs.core.windows.net/containerName", + "https://accountName.dfs.core.windows.net/dfsContainer", + "https://accountName.dfs.core.windows.net/blobcontainerName", + "https://accountName.dfs.core.windows.net/dfs.Container", + "https://accountName.dfs.core.windows.net/blob.containerName"); + List expectedUrls = Arrays.asList( + "https://accountName.blob.core.windows.net/containerName", + "https://accountName.blob.core.windows.net/containerName", // Already blob will remain blob + "https:/accountName.dfs.core.windows.net/containerName", // Invalid URL will be returned as it is + "https://accountNamedfs.blob.core.windows.net/containerName", + "https://accountNameblob.blob.core.windows.net/containerName", + "https://accountName.blob.core.windows.net/dfsContainer", + "https://accountName.blob.core.windows.net/blobcontainerName", + "https://accountName.blob.core.windows.net/dfs.Container", + "https://accountName.blob.core.windows.net/blob.containerName"); + + for (int i = 0; i < inputUrls.size(); i++) { + Assertions.assertThat(changeUrlFromDfsToBlob(new URL(inputUrls.get(i))).toString()) + .describedAs("URL conversion not as expected").isEqualTo(expectedUrls.get(i)); + } + } + + @Test + public void testConvertUrlFromBlobToDfs() throws Exception{ + List inputUrls = Arrays.asList( + "https://accountName.blob.core.windows.net/containerName", + "https://accountName.dfs.core.windows.net/containerName", + "https://accountNamedfs.blob.core.windows.net/containerName", + "https://accountNameblob.blob.core.windows.net/containerName", + "https://accountName.blob.core.windows.net/dfsContainer", + "https://accountName.blob.core.windows.net/blobcontainerName", + "https://accountName.blob.core.windows.net/dfs.Container", + "https://accountName.blob.core.windows.net/blob.containerName"); + List expectedUrls = Arrays.asList( + "https://accountName.dfs.core.windows.net/containerName", + "https://accountName.dfs.core.windows.net/containerName", + "https://accountNamedfs.dfs.core.windows.net/containerName", + "https://accountNameblob.dfs.core.windows.net/containerName", + "https://accountName.dfs.core.windows.net/dfsContainer", + "https://accountName.dfs.core.windows.net/blobcontainerName", + "https://accountName.dfs.core.windows.net/dfs.Container", + "https://accountName.dfs.core.windows.net/blob.containerName"); + + for (int i = 0; i < inputUrls.size(); i++) { + Assertions.assertThat(changeUrlFromBlobToDfs(new URL(inputUrls.get(i))).toString()) + .describedAs("Url Conversion Not as Expected").isEqualTo(expectedUrls.get(i)); + } + } }