Skip to content

Commit

Permalink
[fix](Azure) Enhance the glob list's logic for azure file system in FE (
Browse files Browse the repository at this point in the history
apache#37490)

Previously in fe, for files like
`s3://qa-build/regression/tpcds/sf100_split/catalog_sales.dat.*.gz` it
can not work.
  • Loading branch information
ByteYue authored Jul 9, 2024
1 parent 1772f78 commit 84fcb73
Showing 1 changed file with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,47 +299,81 @@ private String constructS3Path(String fileName, String bucket) throws UserExcept
return String.format("s3://%s/%s", bucket, fileName);
}

public static String getLongestPrefix(String globPattern) {
int length = globPattern.length();
int earliestSpecialCharIndex = length;

char[] specialChars = {'*', '?', '[', '{', '\\'};

for (char specialChar : specialChars) {
int index = globPattern.indexOf(specialChar);
if (index != -1 && index < earliestSpecialCharIndex) {
earliestSpecialCharIndex = index;
}
}

return globPattern.substring(0, earliestSpecialCharIndex);
}

public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
long roundCnt = 0;
long elementCnt = 0;
long matchCnt = 0;
long startTime = System.nanoTime();
Status st = Status.OK;
try {
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
String globPath = uri.getKey();
String bucket = uri.getBucket();
LOG.info("try to glob list for azure, remote path {}, orig {}", globPath, remotePath);
BlobContainerClient client = getClient().getBlobContainerClient(uri.getBucket());
BlobContainerClient client = getClient().getBlobContainerClient(bucket);
java.nio.file.Path pathPattern = Paths.get(globPath);
LOG.info("path pattern {}", pathPattern.toString());
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString());

ListBlobsOptions options = new ListBlobsOptions().setPrefix(globPath);
String listPrefix = getLongestPrefix(globPath);
LOG.info("azure glob list prefix is {}", listPrefix);
ListBlobsOptions options = new ListBlobsOptions().setPrefix(listPrefix);
String newContinuationToken = null;
do {
roundCnt++;
PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, newContinuationToken, null);
PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next();

for (BlobItem blobItem : pagedResponse.getElements()) {
elementCnt++;
java.nio.file.Path blobPath = Paths.get(blobItem.getName());

if (matcher.matches(blobPath)) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(),
uri.getBucket()),
!blobItem.isPrefix(),
blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(),
blobItem.getProperties().getContentLength(),
blobItem.getProperties().getLastModified().getSecond());
result.add(remoteFile);
if (!matcher.matches(blobPath)) {
continue;
}
matchCnt++;
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(),
uri.getBucket()),
!blobItem.isPrefix(),
blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(),
blobItem.getProperties().getContentLength(),
blobItem.getProperties().getLastModified().getSecond());
result.add(remoteFile);
}
newContinuationToken = pagedResponse.getContinuationToken();
} while (newContinuationToken != null);

} catch (BlobStorageException e) {
LOG.warn("glob file " + remotePath + " failed because azure error: " + e.getMessage());
return new Status(Status.ErrCode.COMMON_ERROR, "glob file " + remotePath
st = new Status(Status.ErrCode.COMMON_ERROR, "glob file " + remotePath
+ " failed because azure error: " + e.getMessage());
} catch (Exception e) {
LOG.warn("errors while glob file " + remotePath, e);
return new Status(Status.ErrCode.COMMON_ERROR, "errors while glob file " + remotePath + e.getMessage());
st = new Status(Status.ErrCode.COMMON_ERROR, "errors while glob file " + remotePath + e.getMessage());
} finally {
long endTime = System.nanoTime();
long duration = endTime - startTime;
LOG.info("process {} elements under prefix {} for {} round, match {} elements, take {} micro second",
remotePath, elementCnt, matchCnt, roundCnt,
duration / 1000);
}
return Status.OK;
return st;
}
}

0 comments on commit 84fcb73

Please sign in to comment.