Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty listing Bug Fix and Other Pending Changes #144

Merged
merged 10 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
Expand Down Expand Up @@ -222,16 +223,16 @@ public void initialize(URI uri, Configuration configuration)
this.configuredServiceType = abfsStore.getConfiguredServiceType().equals(
AbfsServiceType.BLOB) ? "BLOB": "DFS";

TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
TracingContext initTracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.INIT, tracingHeaderFormat, listener);
if (configuredServiceType.equals("BLOB")) {
tracingContext.setPrimaryRequestIDBlob();
initTracingContext.setPrimaryRequestIDBlob();
}

// Check if valid service type is configured even before creating the file system.
try {
abfsConfiguration.validateConfiguredServiceType(
tryGetIsNamespaceEnabled(new TracingContext(tracingContext)));
tryGetIsNamespaceEnabled(initTracingContext));
} catch (InvalidConfigurationValueException ex) {
LOG.debug("File system configured with Invalid Service Type", ex);
throw ex;
Expand All @@ -240,34 +241,31 @@ public void initialize(URI uri, Configuration configuration)
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

// Create the file system if it does not exist.
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
try {
this.createFileSystem(tracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS);
}
}
}

/*
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
* Fail initialization of filesystem if the configs are provided. CPK is of
* two types: GLOBAL_KEY, and ENCRYPTION_CONTEXT.
*/
if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
abfsConfiguration))
&& !getIsNamespaceEnabled(new TracingContext(tracingContext))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the close() method call here ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We needed close because we were checking it after creating filesystem.
Now we are checking it before creating file system and failing, so no need to close.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a significant change will trigger a CI.
We can wait for CI results before merging this.

/*
* Close the filesystem gracefully before throwing exception. Graceful close
* will ensure that all resources are released properly.
*/
close();
&& !tryGetIsNamespaceEnabled(new TracingContext(initTracingContext))) {
throw new PathIOException(uri.getPath(),
CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE);
}

// Create the file system if it does not exist.
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
TracingContext createTracingContext = new TracingContext(initTracingContext);
createTracingContext.setOperation(CREATE_FILESYSTEM);
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), createTracingContext) == null) {
try {
this.createFileSystem(createTracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS);
}
}
}

LOG.trace("Initiate check for delegation token manager");
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum FSOperationType {
SET_OWNER("SO"),
SET_ACL("SA"),
TEST_OP("TS"),
WRITE("WR");
WRITE("WR"),
INIT("IN");

private final String opCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public final class FileSystemUriSchemes {
public static final String WASB_SECURE_SCHEME = "wasbs";
public static final String WASB_DNS_PREFIX = "blob";

public static final String ABFS_DFS_DOMAIN_NAME = "dfs.core.windows.net";
public static final String ABFS_BLOB_DOMAIN_NAME = "blob.core.windows.net";
public static final String ABFS_DFS_DOMAIN_NAME = ".dfs.";
public static final String ABFS_BLOB_DOMAIN_NAME = ".blob.";

private FileSystemUriSchemes() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -647,12 +647,10 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LIST);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_INCLUDE, METADATA);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_PREFIX, getDirectoryQueryParameter(relativePath));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_MARKER, continuation);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (continuation != null) {
  abfsUriQueryBuilder.addQuery(QUERY_PARAM_MARKER, continuation);
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a redundant check. Same check is already added in AbfsUriQueryBuilder.java

public void addQuery(final String name, final String value) {
    if (value != null && !value.isEmpty()) {
      this.parameters.put(name, value);
    }
  }

if (!recursive) {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH);
}
if (continuation != null) {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_MARKER, continuation);
}
abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, String.valueOf(listMaxResults));
appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_BLOB_OPERATION, abfsUriQueryBuilder);

Expand Down Expand Up @@ -714,12 +712,6 @@ private void fixAtomicEntriesInListResults(final AbfsRestOperation op,
listResultSchema.withPaths(filteredEntries);
}

private boolean isEmptyListResults(AbfsHttpOperation result) {
return result != null && result.getStatusCode() == HTTP_OK &&
result.getListResultSchema() != null &&
result.getListResultSchema().paths().isEmpty();
}

/**
* Get Rest Operation for API <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob></a>.
* @param path on which lease has to be acquired.
Expand Down Expand Up @@ -1789,13 +1781,24 @@ private static String decodeMetadataAttribute(String encoded)
return encoded == null ? null :
java.net.URLDecoder.decode(encoded, StandardCharsets.UTF_8.name());
}

private boolean isNonEmptyListing(String path,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext, false);
BlobListResultSchema listResultSchema =
(BlobListResultSchema) listOp.getResult().getListResultSchema();
return listResultSchema.paths() != null && !listResultSchema.paths()
.isEmpty();
return !isEmptyListResults(listOp.getResult());
}

private boolean isEmptyListResults(AbfsHttpOperation result) {
boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful
result.getListResultSchema() != null && // Parsing of list response was successful
result.getListResultSchema().paths().isEmpty() && // No paths were returned
result.getListResultSchema() instanceof BlobListResultSchema && // It is safe to typecast to BlobListResultSchema
((BlobListResultSchema) result.getListResultSchema()).getNextMarker() == null; // No continuation token was returned
if (isEmptyList) {
LOG.debug("List call returned empty results without any continuation token.");
return true;
} else if (result != null && !(result.getListResultSchema() instanceof BlobListResultSchema)) {
throw new RuntimeException("List call returned unexpected results over Blob Endpoint.");
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.util.Preconditions;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromDfsToBlob;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static String getMaskedUrl(URL url) {

public static URL changeUrlFromBlobToDfs(URL url) throws InvalidUriException {
try {
url = new URL(url.toString().replace(ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME));
url = new URL(url.toString().replaceFirst(ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have more than one .blob. in the url ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated code as per the discussion offline

} catch (MalformedURLException ex) {
throw new InvalidUriException(url.toString());
}
Expand All @@ -184,7 +184,7 @@ public static URL changeUrlFromBlobToDfs(URL url) throws InvalidUriException {

public static URL changeUrlFromDfsToBlob(URL url) throws InvalidUriException {
try {
url = new URL(url.toString().replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME));
url = new URL(url.toString().replaceFirst(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME));
} catch (MalformedURLException ex) {
throw new InvalidUriException(url.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -672,7 +674,7 @@ protected void assumeBlobServiceType() {
*/
protected void assertPathDns(Path path) {
String expectedDns = getAbfsServiceType() == AbfsServiceType.BLOB
? ABFS_BLOB_DOMAIN_NAME : ABFS_DFS_DOMAIN_NAME;
? ABFS_BLOB_DOMAIN_NAME : ABFS_DFS_DOMAIN_NAME;;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double semicolon

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

Assertions.assertThat(path.toString()).contains(expectedDns);
}
}