diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 875a5cbc0ecce..2f04202433d58 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1363,10 +1363,14 @@ public String listStatus(final Path path, final String startFrom, startFrom); final String relativePath = getRelativePath(path); + AbfsClient listingClient = getClient(); if (continuation == null || continuation.isEmpty()) { // generate continuation token if a valid startFrom is provided. if (startFrom != null && !startFrom.isEmpty()) { + // Blob Endpoint Does not support startFrom yet. + // Fallback to DFS Client for listing with startFrom. + listingClient = clientHandler.getDfsClient(); continuation = getIsNamespaceEnabled(tracingContext) ? generateContinuationTokenForXns(startFrom) : generateContinuationTokenForNonXns(relativePath, startFrom); @@ -1375,11 +1379,11 @@ public String listStatus(final Path path, final String startFrom, do { try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) { - AbfsRestOperation op = client.listPath(relativePath, false, + AbfsRestOperation op = listingClient.listPath(relativePath, false, abfsConfiguration.getListMaxResults(), continuation, tracingContext); perfInfo.registerResult(op.getResult()); - continuation = client.getContinuationFromResponse(op.getResult()); + continuation = listingClient.getContinuationFromResponse(op.getResult()); ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); if (retrievedSchema == null) { throw new AbfsRestOperationException( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 81002da5d2c6d..e18ab39877550 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -114,8 +114,8 @@ public final class AbfsHttpConstants { public static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; public static final String APPLICATION_XML = "application/xml"; - public static final String XMS_PROPERTIES_ENCODING_DFS = "ISO-8859-1"; - public static final String XMS_PROPERTIES_ENCODING_BLOB = "UTF-8"; + public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1"; + public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8"; public static final String ROOT_PATH = "/"; public static final String ACCESS_MASK = "mask:"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 33e1ab4bdf274..42d0efc11bfa5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -31,6 +31,9 @@ import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Hashtable; @@ -108,7 +111,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOCK_NAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_COMMITTED_BLOCKS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_NAME; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_BLOB; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ZERO; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH; @@ -228,8 +232,13 @@ public AbfsRestOperation createFilesystem(TracingContext tracingContext) public AbfsRestOperation setFilesystemProperties(final Hashtable properties, TracingContext tracingContext) throws AzureBlobFileSystemException { List requestHeaders = createDefaultHeaders(); - List metadataRequestHeaders = getMetadataHeadersList(properties); - requestHeaders.addAll(metadataRequestHeaders); + // Exception handling to match behavior of this API across service types. + try { + List metadataRequestHeaders = getMetadataHeadersList(properties); + requestHeaders.addAll(metadataRequestHeaders); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER); @@ -499,20 +508,9 @@ public AbfsRestOperation appendBlock(final String path, return op; } - /** - * Get Rest Operation for API . - * @param relativePath to return only blobs with names that begin with the specified prefix. - * @param recursive to return all blobs in the path, including those in subdirectories. - * @param listMaxResults maximum number of blobs to return. - * @param continuation marker to specify the continuation token. - * @param tracingContext - * @return executed rest operation containing response from server. - * @throws AzureBlobFileSystemException if rest operation or response parsing fails. - */ - @Override public AbfsRestOperation listPath(final String relativePath, final boolean recursive, - final int listMaxResults, final String continuation, TracingContext tracingContext) - throws AzureBlobFileSystemException { + final int listMaxResults, final String continuation, TracingContext tracingContext, + boolean is404CheckRequired) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); @@ -537,7 +535,7 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur requestHeaders); op.execute(tracingContext); - if (op.getResult().getListResultSchema().paths().isEmpty()) { + if (op.getResult().getListResultSchema().paths().isEmpty() && is404CheckRequired) { // If the list operation returns no paths, we need to check if the path is a file. // If it is a file, we need to return the file in the list. // If it is a non-existing path, we need to throw a FileNotFoundException. @@ -558,6 +556,23 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur return op; } + /** + * Get Rest Operation for API . + * @param relativePath to return only blobs with names that begin with the specified prefix. + * @param recursive to return all blobs in the path, including those in subdirectories. + * @param listMaxResults maximum number of blobs to return. + * @param continuation marker to specify the continuation token. + * @param tracingContext + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation or response parsing fails. + */ + @Override + public AbfsRestOperation listPath(final String relativePath, final boolean recursive, + final int listMaxResults, final String continuation, TracingContext tracingContext) + throws AzureBlobFileSystemException { + return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, true); + } + /** * Get Rest Operation for API . * @param path on which lease has to be acquired. @@ -918,8 +933,12 @@ public AbfsRestOperation setPathProperties(final String path, final ContextEncryptionAdapter contextEncryptionAdapter) throws AzureBlobFileSystemException { List requestHeaders = createDefaultHeaders(); - List metadataRequestHeaders = getMetadataHeadersList(properties); - requestHeaders.addAll(metadataRequestHeaders); + try { + List metadataRequestHeaders = getMetadataHeadersList(properties); + requestHeaders.addAll(metadataRequestHeaders); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, METADATA); @@ -957,16 +976,20 @@ public AbfsRestOperation getPathStatus(final String path, } if (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND && isImplicitCheckRequired) { // This path could be present as an implicit directory in FNS. - AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext); + AbfsRestOperation listOp = listPath(path, false, 1, null, + tracingContext, false); BlobListResultSchema listResultSchema = (BlobListResultSchema) listOp.getResult().getListResultSchema(); - if (listResultSchema.paths() != null && listResultSchema.paths().size() > 0) { + if (listResultSchema.paths() != null + && listResultSchema.paths().size() > 0) { AbfsRestOperation successOp = getAbfsRestOperation( AbfsRestOperationType.GetPathStatus, HTTP_METHOD_HEAD, url, requestHeaders); successOp.hardSetGetFileStatusResult(HTTP_OK); return successOp; } + } + if (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { /* * Exception handling at AzureBlobFileSystem happens as per the error-code. * In case of HEAD call that gets 4XX status, error code is not parsed from the response. @@ -1317,25 +1340,45 @@ public Hashtable getXMSProperties(AbfsHttpOperation result) @Override public byte[] encodeAttribute(String value) throws UnsupportedEncodingException { - return value.getBytes(XMS_PROPERTIES_ENCODING_BLOB); + return value.getBytes(XMS_PROPERTIES_ENCODING_UNICODE); } @Override public String decodeAttribute(byte[] value) throws UnsupportedEncodingException { - return new String(value, XMS_PROPERTIES_ENCODING_BLOB); + return new String(value, XMS_PROPERTIES_ENCODING_UNICODE); } - private List getMetadataHeadersList(final Hashtable properties) throws AbfsRestOperationException { + /** + * Checks if the value contains pure ASCII characters or not. + * @param value + * @return true if pureASCII. + * @throws CharacterCodingException if not pure ASCII + */ + private boolean isPureASCII(String value) throws CharacterCodingException { + final CharsetEncoder encoder = Charset.forName( + XMS_PROPERTIES_ENCODING_ASCII).newEncoder(); + boolean canEncodeValue = encoder.canEncode(value); + if (!canEncodeValue) { + throw new CharacterCodingException(); + } + return canEncodeValue; + } + + private List getMetadataHeadersList(final Hashtable properties) + throws AbfsRestOperationException, CharacterCodingException { List metadataRequestHeaders = new ArrayList<>(); for(Map.Entry entry : properties.entrySet()) { String key = X_MS_METADATA_PREFIX + entry.getKey(); - String value = null; - try { - value = encodeMetadataAttribute(entry.getValue()); - } catch (UnsupportedEncodingException e) { - throw new InvalidAbfsRestOperationException(e); + String value = entry.getValue(); + // AzureBlobFileSystem supports only ASCII Characters in property values. + if (isPureASCII(value)) { + try { + value = encodeMetadataAttribute(value); + } catch (UnsupportedEncodingException e) { + throw new InvalidAbfsRestOperationException(e); + } + metadataRequestHeaders.add(new AbfsHttpHeader(key, value)); } - metadataRequestHeaders.add(new AbfsHttpHeader(key, value)); } return metadataRequestHeaders; } @@ -1388,7 +1431,10 @@ private BlobListResultSchema getListResultSchemaFromPathStatus(String relativePa pathStatus.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED)); entrySchema.setETag(pathStatus.getResult().getResponseHeader(ETAG)); - listResultSchema.paths().add(entrySchema); + // If listing is done on explicit directory, do not include directory in the listing. + if (!entrySchema.isDirectory()) { + listResultSchema.paths().add(entrySchema); + } return listResultSchema; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 6985a89166069..cb730079cc1ca 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -95,7 +95,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_DFS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH; @@ -1344,19 +1344,20 @@ public Hashtable getXMSProperties(AbfsHttpOperation result) @Override public byte[] encodeAttribute(String value) throws UnsupportedEncodingException { - return value.getBytes(XMS_PROPERTIES_ENCODING_DFS); + return value.getBytes(XMS_PROPERTIES_ENCODING_ASCII); } @Override public String decodeAttribute(byte[] value) throws UnsupportedEncodingException { - return new String(value, XMS_PROPERTIES_ENCODING_DFS); + return new String(value, XMS_PROPERTIES_ENCODING_ASCII); } private String convertXmsPropertiesToCommaSeparatedString(final Map properties) throws CharacterCodingException { StringBuilder commaSeparatedProperties = new StringBuilder(); - final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING_DFS).newEncoder(); + final CharsetEncoder encoder = Charset.forName( + XMS_PROPERTIES_ENCODING_ASCII).newEncoder(); for (Map.Entry propertyEntry : properties.entrySet()) { String key = propertyEntry.getKey(); @@ -1386,7 +1387,8 @@ private Hashtable parseCommaSeparatedXmsProperties(String xMsPro InvalidFileSystemPropertyException, InvalidAbfsRestOperationException { Hashtable properties = new Hashtable<>(); - final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING_DFS).newDecoder(); + final CharsetDecoder decoder = Charset.forName( + XMS_PROPERTIES_ENCODING_ASCII).newDecoder(); if (xMsProperties != null && !xMsProperties.isEmpty()) { String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 05c1f5db3149a..ba6f42d726a7e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; @@ -59,6 +60,7 @@ import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_ACCOUNT_NAME_DOMAIN_SUFFIX; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.*; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -103,7 +105,7 @@ protected AbstractAbfsIntegrationTest() throws Exception { assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT_NAME, accountName != null && !accountName.isEmpty()); - abfsConfig = new AbfsConfiguration(rawConfig, accountName); + abfsConfig = new AbfsConfiguration(rawConfig, accountName, identifyAbfsServiceType(accountName)); authType = abfsConfig.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey); assumeValidAuthConfigsPresent(); @@ -461,6 +463,15 @@ private static String convertTestUrls( return data; } + private AbfsServiceType identifyAbfsServiceType(String accountName) { + if (accountName.toString().contains(ABFS_BLOB_DOMAIN_NAME)) { + return AbfsServiceType.BLOB; + } + // In case of DFS Domain name or any other custom endpoint, the service + // type is to be identified as default DFS. + return AbfsServiceType.DFS; + } + public Path getTestPath() { Path path = new Path(UriUtils.generateUniqueTestPath()); return path; @@ -584,4 +595,8 @@ protected void assumeValidAuthConfigsPresent() { protected boolean isAppendBlobEnabled() { return getRawConfiguration().getBoolean(FS_AZURE_TEST_APPENDBLOB_ENABLED, false); } + + protected AbfsServiceType getAbfsServiceType() { + return abfsConfig.getFsConfiguredServiceType(); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java index d31f662e97ba4..b6f7c86008a73 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.junit.Assume; import org.junit.Test; import org.mockito.Mockito; import org.mockito.stubbing.Stubber; @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultEntrySchema; @@ -123,6 +125,7 @@ public Void call() throws Exception { @Test public void testListPathTracingContext() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); + Assume.assumeTrue(getAbfsServiceType() == AbfsServiceType.DFS); final AzureBlobFileSystem spiedFs = Mockito.spy(fs); final AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore()); final AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java index 2428b4b6749b7..c0c0c046a0f11 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java @@ -61,6 +61,8 @@ public class ITestGetNameSpaceEnabled extends AbstractAbfsIntegrationTest { private static final String TRUE_STR = "true"; private static final String FALSE_STR = "false"; + private static final String FILESYSTEM_NOT_FOUND_ERROR = "The specified filesystem does not exist."; + private static final String CONTAINER_NOT_FOUND_ERROR = "The specified container does not exist."; private boolean isUsingXNSAccount; public ITestGetNameSpaceEnabled() throws Exception { @@ -87,6 +89,8 @@ public void testNonXNSAccount() throws IOException { @Test public void testGetIsNamespaceEnabledWhenConfigIsTrue() throws Exception { assumeValidTestConfigPresent(getRawConfiguration(), FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT); + Assume.assumeTrue("Blob Endpoint Does not Allow FS init on HNS Account", + getAbfsServiceType() == AbfsServiceType.DFS); AzureBlobFileSystem fs = getNewFSWithHnsConf(TRUE_STR); Assertions.assertThat(getIsNamespaceEnabled(fs)).describedAs( "getIsNamespaceEnabled should return true when the " @@ -148,11 +152,17 @@ public void testFailedRequestWhenFSNotExist() throws Exception { AzureBlobFileSystem fs = this.getFileSystem(nonExistingFsUrl); fs.getAbfsStore().setNamespaceEnabled(Trilean.UNKNOWN); - intercept(FileNotFoundException.class, - "\"The specified filesystem does not exist.\", 404", - ()-> { - fs.getFileStatus(new Path("/")); // Run a dummy FS call - }); + FileNotFoundException ex = intercept(FileNotFoundException.class, ()-> { + fs.getFileStatus(new Path("/")); // Run a dummy FS call + }); + + String expectedExceptionMessage = getAbfsServiceType() == AbfsServiceType.DFS + ? FILESYSTEM_NOT_FOUND_ERROR + : CONTAINER_NOT_FOUND_ERROR; + + Assertions.assertThat(ex.getMessage()).describedAs( + "Expecting FileNotFoundException with message: " + expectedExceptionMessage) + .contains(expectedExceptionMessage); } @Test