From 65a5bf3b20c46207b4d6f6947eef32c33999119a Mon Sep 17 00:00:00 2001 From: Anuj Modi Date: Tue, 26 Nov 2024 20:02:59 +0530 Subject: [PATCH] HADOOP-19226: [ABFS][FNSOverBlob] Implementing Azure Rest APIs on Blob Endpoint for AbfsBlobClient (#6944) Contributed by Anuj Modi --- .../src/config/checkstyle-suppressions.xml | 4 + .../hadoop/fs/azurebfs/AbfsConfiguration.java | 2 +- .../fs/azurebfs/AzureBlobFileSystem.java | 2 +- .../fs/azurebfs/AzureBlobFileSystemStore.java | 148 +-- .../azurebfs/constants/AbfsHttpConstants.java | 45 +- .../azurebfs/constants/FSOperationType.java | 3 +- .../constants/HttpHeaderConfigurations.java | 31 +- .../azurebfs/constants/HttpQueryParams.java | 26 + .../services/AppendRequestParameters.java | 59 + .../services/AzureServiceErrorCode.java | 2 + .../services/BlobAppendRequestParameters.java | 46 + .../fs/azurebfs/services/AbfsBlobClient.java | 1087 +++++++++++++++++ .../fs/azurebfs/services/AbfsClient.java | 2 + .../azurebfs/services/AbfsClientHandler.java | 62 +- .../fs/azurebfs/services/AbfsDfsClient.java | 2 + .../azurebfs/services/AbfsRestOperation.java | 4 + .../services/AbfsRestOperationType.java | 15 + .../src/site/markdown/blobEndpoint.md | 102 ++ .../src/site/markdown/fns_blob.md | 8 +- ...ITestAzureBlobFileSystemInitAndCreate.java | 2 +- .../fs/azurebfs/services/ITestAbfsClient.java | 6 +- .../services/ITestAbfsClientHandler.java | 49 + 22 files changed, 1609 insertions(+), 98 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobAppendRequestParameters.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java create mode 100644 hadoop-tools/hadoop-azure/src/site/markdown/blobEndpoint.md create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClientHandler.java diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml index 27ab4329043f4..07aa26d238127 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml @@ -46,6 +46,10 @@ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/> + + future = client.submit(() -> lease.free()); + ListenableFuture future = getClient().submit(() -> lease.free()); futures.add(future); } try { @@ -338,7 +338,7 @@ public void close() throws IOException { } catch (ExecutionException e) { LOG.error("Error freeing leases", e); } finally { - IOUtils.cleanupWithLogger(LOG, client); + IOUtils.cleanupWithLogger(LOG, getClient()); } } @@ -477,11 +477,11 @@ public Hashtable getFilesystemProperties( try (AbfsPerfInfo perfInfo = startTracking("getFilesystemProperties", "getFilesystemProperties")) { LOG.debug("getFilesystemProperties for filesystem: {}", - client.getFileSystem()); + getClient().getFileSystem()); final Hashtable parsedXmsProperties; - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .getFilesystemProperties(tracingContext); perfInfo.registerResult(op.getResult()); @@ -503,7 +503,7 @@ public void setFilesystemProperties( } LOG.debug("setFilesystemProperties for filesystem: {} with properties: {}", - client.getFileSystem(), + getClient().getFileSystem(), properties); try (AbfsPerfInfo perfInfo = startTracking("setFilesystemProperties", @@ -519,7 +519,7 @@ public Hashtable getPathStatus(final Path path, TracingContext tracingContext) throws IOException { try (AbfsPerfInfo perfInfo = startTracking("getPathStatus", "getPathStatus")){ LOG.debug("getPathStatus for filesystem: {} path: {}", - client.getFileSystem(), + getClient().getFileSystem(), path); final Hashtable parsedXmsProperties; @@ -527,7 +527,7 @@ public Hashtable getPathStatus(final Path path, final ContextEncryptionAdapter contextEncryptionAdapter = createEncryptionAdapterFromServerStoreContext(relativePath, tracingContext); - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .getPathStatus(relativePath, true, tracingContext, contextEncryptionAdapter); perfInfo.registerResult(op.getResult()); @@ -564,10 +564,10 @@ public Hashtable getPathStatus(final Path path, */ private ContextEncryptionAdapter createEncryptionAdapterFromServerStoreContext(final String path, final TracingContext tracingContext) throws IOException { - if (client.getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT) { + if (getClient().getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT) { return NoContextEncryptionAdapter.getInstance(); } - final String responseHeaderEncryptionContext = client.getPathStatus(path, + final String responseHeaderEncryptionContext = getClient().getPathStatus(path, false, tracingContext, null).getResult() .getResponseHeader(X_MS_ENCRYPTION_CONTEXT); if (responseHeaderEncryptionContext == null) { @@ -578,7 +578,7 @@ private ContextEncryptionAdapter createEncryptionAdapterFromServerStoreContext(f StandardCharsets.UTF_8); try { - return new ContextProviderEncryptionAdapter(client.getEncryptionContextProvider(), + return new ContextProviderEncryptionAdapter(getClient().getEncryptionContextProvider(), new Path(path).toUri().getPath(), encryptionContext); } catch (IOException e) { LOG.debug("Could not initialize EncryptionAdapter"); @@ -591,7 +591,7 @@ public void setPathProperties(final Path path, throws IOException { try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", "setPathProperties")){ LOG.debug("setPathProperties for filesystem: {} path: {} with properties: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, properties); @@ -600,7 +600,7 @@ public void setPathProperties(final Path path, final ContextEncryptionAdapter contextEncryptionAdapter = createEncryptionAdapterFromServerStoreContext(relativePath, tracingContext); - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .setPathProperties(getRelativePath(path), properties, tracingContext, contextEncryptionAdapter); contextEncryptionAdapter.destroy(); @@ -612,9 +612,9 @@ public void createFilesystem(TracingContext tracingContext) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("createFilesystem", "createFilesystem")){ LOG.debug("createFilesystem for filesystem: {}", - client.getFileSystem()); + getClient().getFileSystem()); - final AbfsRestOperation op = client.createFilesystem(tracingContext); + final AbfsRestOperation op = getClient().createFilesystem(tracingContext); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -623,9 +623,9 @@ public void deleteFilesystem(TracingContext tracingContext) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("deleteFilesystem", "deleteFilesystem")) { LOG.debug("deleteFilesystem for filesystem: {}", - client.getFileSystem()); + getClient().getFileSystem()); - final AbfsRestOperation op = client.deleteFilesystem(tracingContext); + final AbfsRestOperation op = getClient().deleteFilesystem(tracingContext); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -637,7 +637,7 @@ public OutputStream createFile(final Path path, try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) { boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, overwrite, permission, @@ -660,9 +660,9 @@ public OutputStream createFile(final Path path, } final ContextEncryptionAdapter contextEncryptionAdapter; - if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { + if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { contextEncryptionAdapter = new ContextProviderEncryptionAdapter( - client.getEncryptionContextProvider(), getRelativePath(path)); + getClient().getEncryptionContextProvider(), getRelativePath(path)); } else { contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance(); } @@ -677,7 +677,7 @@ public OutputStream createFile(final Path path, ); } else { - op = client.createPath(relativePath, true, + op = getClient().createPath(relativePath, true, overwrite, new Permissions(isNamespaceEnabled, permission, umask), isAppendBlob, @@ -725,14 +725,14 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa // Trigger a create with overwrite=false first so that eTag fetch can be // avoided for cases when no pre-existing file is present (major portion // of create file traffic falls into the case of no pre-existing file). - op = client.createPath(relativePath, true, false, permissions, + op = getClient().createPath(relativePath, true, false, permissions, isAppendBlob, null, contextEncryptionAdapter, tracingContext); } catch (AbfsRestOperationException e) { if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { // File pre-exists, fetch eTag try { - op = client.getPathStatus(relativePath, false, tracingContext, null); + op = getClient().getPathStatus(relativePath, false, tracingContext, null); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { // Is a parallel access case, as file which was found to be @@ -750,7 +750,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa try { // overwrite only if eTag matches with the file properties fetched befpre - op = client.createPath(relativePath, true, true, permissions, + op = getClient().createPath(relativePath, true, true, permissions, isAppendBlob, eTag, contextEncryptionAdapter, tracingContext); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { @@ -831,7 +831,7 @@ public void createDirectory(final Path path, final FsPermission permission, try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) { boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, permission, umask, @@ -841,7 +841,7 @@ public void createDirectory(final Path path, final FsPermission permission, !isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite(); Permissions permissions = new Permissions(isNamespaceEnabled, permission, umask); - final AbfsRestOperation op = client.createPath(getRelativePath(path), + final AbfsRestOperation op = getClient().createPath(getRelativePath(path), false, overwrite, permissions, false, null, null, tracingContext); perfInfo.registerResult(op.getResult()).registerSuccess(true); } @@ -861,7 +861,7 @@ public AbfsInputStream openFileForRead(Path path, try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { LOG.debug("openFileForRead filesystem: {} path: {}", - client.getFileSystem(), path); + getClient().getFileSystem(), path); FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus) .orElse(null); @@ -878,7 +878,7 @@ public AbfsInputStream openFileForRead(Path path, * ENCRYPTION_CONTEXT. */ if ((fileStatus instanceof VersionedFileStatus) && ( - client.getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT + getClient().getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT || ((VersionedFileStatus) fileStatus).getEncryptionContext() != null)) { path = path.makeQualified(this.uri, path); @@ -891,13 +891,13 @@ public AbfsInputStream openFileForRead(Path path, eTag = ((VersionedFileStatus) fileStatus).getVersion(); final String encryptionContext = ((VersionedFileStatus) fileStatus).getEncryptionContext(); - if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { + if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { contextEncryptionAdapter = new ContextProviderEncryptionAdapter( - client.getEncryptionContextProvider(), getRelativePath(path), + getClient().getEncryptionContextProvider(), getRelativePath(path), encryptionContext.getBytes(StandardCharsets.UTF_8)); } } else { - AbfsHttpOperation op = client.getPathStatus(relativePath, false, + AbfsHttpOperation op = getClient().getPathStatus(relativePath, false, tracingContext, null).getResult(); resourceType = op.getResponseHeader( HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); @@ -908,7 +908,7 @@ public AbfsInputStream openFileForRead(Path path, * For file created with ENCRYPTION_CONTEXT, client shall receive * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT. */ - if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { + if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { final String fileEncryptionContext = op.getResponseHeader( HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); if (fileEncryptionContext == null) { @@ -917,7 +917,7 @@ public AbfsInputStream openFileForRead(Path path, "EncryptionContext not present in GetPathStatus response headers"); } contextEncryptionAdapter = new ContextProviderEncryptionAdapter( - client.getEncryptionContextProvider(), getRelativePath(path), + getClient().getEncryptionContextProvider(), getRelativePath(path), fileEncryptionContext.getBytes(StandardCharsets.UTF_8)); } } @@ -973,13 +973,13 @@ public OutputStream openFileForWrite(final Path path, TracingContext tracingContext) throws IOException { try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) { LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, overwrite); String relativePath = getRelativePath(path); - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .getPathStatus(relativePath, false, tracingContext, null); perfInfo.registerResult(op.getResult()); @@ -1005,7 +1005,7 @@ public OutputStream openFileForWrite(final Path path, AbfsLease lease = maybeCreateLease(relativePath, tracingContext); final ContextEncryptionAdapter contextEncryptionAdapter; - if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { + if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { final String encryptionContext = op.getResult() .getResponseHeader( HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); @@ -1014,7 +1014,7 @@ public OutputStream openFileForWrite(final Path path, "File doesn't have encryptionContext."); } contextEncryptionAdapter = new ContextProviderEncryptionAdapter( - client.getEncryptionContextProvider(), getRelativePath(path), + getClient().getEncryptionContextProvider(), getRelativePath(path), encryptionContext.getBytes(StandardCharsets.UTF_8)); } else { contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance(); @@ -1024,7 +1024,7 @@ public OutputStream openFileForWrite(final Path path, populateAbfsOutputStreamContext( isAppendBlob, lease, - client, + getClient(), statistics, relativePath, offset, @@ -1043,7 +1043,7 @@ public OutputStream openFileForWrite(final Path path, public void breakLease(final Path path, final TracingContext tracingContext) throws AzureBlobFileSystemException { LOG.debug("lease path: {}", path); - client.breakLease(getRelativePath(path), tracingContext); + getClient().breakLease(getRelativePath(path), tracingContext); } /** @@ -1073,7 +1073,7 @@ public boolean rename(final Path source, } LOG.debug("renameAsync filesystem: {} source: {} destination: {}", - client.getFileSystem(), + getClient().getFileSystem(), source, destination); @@ -1088,7 +1088,7 @@ public boolean rename(final Path source, try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) { boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); final AbfsClientRenameResult abfsClientRenameResult = - client.renamePath(sourceRelativePath, destinationRelativePath, + getClient().renamePath(sourceRelativePath, destinationRelativePath, continuation, tracingContext, sourceEtag, false, isNamespaceEnabled); @@ -1116,7 +1116,7 @@ public void delete(final Path path, final boolean recursive, boolean shouldContinue = true; LOG.debug("delete filesystem: {} path: {} recursive: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, String.valueOf(recursive)); @@ -1126,7 +1126,7 @@ public void delete(final Path path, final boolean recursive, do { try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) { - AbfsRestOperation op = client.deletePath(relativePath, recursive, + AbfsRestOperation op = getClient().deletePath(relativePath, recursive, continuation, tracingContext, getIsNamespaceEnabled(tracingContext)); perfInfo.registerResult(op.getResult()); continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); @@ -1146,7 +1146,7 @@ public FileStatus getFileStatus(final Path path, try (AbfsPerfInfo perfInfo = startTracking("getFileStatus", "undetermined")) { boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, isNamespaceEnabled); @@ -1154,14 +1154,14 @@ public FileStatus getFileStatus(final Path path, if (path.isRoot()) { if (isNamespaceEnabled) { perfInfo.registerCallee("getAclStatus"); - op = client.getAclStatus(getRelativePath(path), tracingContext); + op = getClient().getAclStatus(getRelativePath(path), tracingContext); } else { perfInfo.registerCallee("getFilesystemProperties"); - op = client.getFilesystemProperties(tracingContext); + op = getClient().getFilesystemProperties(tracingContext); } } else { perfInfo.registerCallee("getPathStatus"); - op = client.getPathStatus(getRelativePath(path), false, tracingContext, null); + op = getClient().getPathStatus(getRelativePath(path), false, tracingContext, null); } perfInfo.registerResult(op.getResult()); @@ -1251,7 +1251,7 @@ public String listStatus(final Path path, final String startFrom, boolean shouldContinue = true; LOG.debug("listStatus filesystem: {} path: {}, startFrom: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, startFrom); @@ -1268,7 +1268,7 @@ public String listStatus(final Path path, final String startFrom, do { try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) { - AbfsRestOperation op = client.listPath(relativePath, false, + AbfsRestOperation op = getClient().listPath(relativePath, false, abfsConfiguration.getListMaxResults(), continuation, tracingContext); perfInfo.registerResult(op.getResult()); @@ -1401,7 +1401,7 @@ public void setOwner(final Path path, final String owner, final String group, LOG.debug( "setOwner filesystem: {} path: {} owner: {} group: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, owner, group); @@ -1409,7 +1409,7 @@ public void setOwner(final Path path, final String owner, final String group, final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner); final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group); - final AbfsRestOperation op = client.setOwner(getRelativePath(path), + final AbfsRestOperation op = getClient().setOwner(getRelativePath(path), transformedOwner, transformedGroup, tracingContext); @@ -1429,11 +1429,11 @@ public void setPermission(final Path path, final FsPermission permission, LOG.debug( "setPermission filesystem: {} path: {} permission: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, permission); - final AbfsRestOperation op = client.setPermission(getRelativePath(path), + final AbfsRestOperation op = getClient().setPermission(getRelativePath(path), String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal()), tracingContext); @@ -1452,7 +1452,7 @@ public void modifyAclEntries(final Path path, final List aclSpec, LOG.debug( "modifyAclEntries filesystem: {} path: {} aclSpec: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, AclEntry.aclSpecToString(aclSpec)); @@ -1462,7 +1462,7 @@ public void modifyAclEntries(final Path path, final List aclSpec, String relativePath = getRelativePath(path); - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .getAclStatus(relativePath, useUpn, tracingContext); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -1474,7 +1474,7 @@ public void modifyAclEntries(final Path path, final List aclSpec, perfInfoGet.registerSuccess(true).finishTracking(); try (AbfsPerfInfo perfInfoSet = startTracking("modifyAclEntries", "setAcl")) { - final AbfsRestOperation setAclOp = client + final AbfsRestOperation setAclOp = getClient() .setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries), eTag, tracingContext); perfInfoSet.registerResult(setAclOp.getResult()) @@ -1495,7 +1495,7 @@ public void removeAclEntries(final Path path, final List aclSpec, LOG.debug( "removeAclEntries filesystem: {} path: {} aclSpec: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, AclEntry.aclSpecToString(aclSpec)); @@ -1505,7 +1505,7 @@ public void removeAclEntries(final Path path, final List aclSpec, String relativePath = getRelativePath(path); - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .getAclStatus(relativePath, isUpnFormat, tracingContext); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -1517,7 +1517,7 @@ public void removeAclEntries(final Path path, final List aclSpec, perfInfoGet.registerSuccess(true).finishTracking(); try (AbfsPerfInfo perfInfoSet = startTracking("removeAclEntries", "setAcl")) { - final AbfsRestOperation setAclOp = client + final AbfsRestOperation setAclOp = getClient() .setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries), eTag, tracingContext); perfInfoSet.registerResult(setAclOp.getResult()) @@ -1538,12 +1538,12 @@ public void removeDefaultAcl(final Path path, TracingContext tracingContext) LOG.debug( "removeDefaultAcl filesystem: {} path: {}", - client.getFileSystem(), + getClient().getFileSystem(), path); String relativePath = getRelativePath(path); - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .getAclStatus(relativePath, tracingContext); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -1561,7 +1561,7 @@ public void removeDefaultAcl(final Path path, TracingContext tracingContext) perfInfoGet.registerSuccess(true).finishTracking(); try (AbfsPerfInfo perfInfoSet = startTracking("removeDefaultAcl", "setAcl")) { - final AbfsRestOperation setAclOp = client + final AbfsRestOperation setAclOp = getClient() .setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries), eTag, tracingContext); perfInfoSet.registerResult(setAclOp.getResult()) @@ -1582,12 +1582,12 @@ public void removeAcl(final Path path, TracingContext tracingContext) LOG.debug( "removeAcl filesystem: {} path: {}", - client.getFileSystem(), + getClient().getFileSystem(), path); String relativePath = getRelativePath(path); - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .getAclStatus(relativePath, tracingContext); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -1602,7 +1602,7 @@ public void removeAcl(final Path path, TracingContext tracingContext) perfInfoGet.registerSuccess(true).finishTracking(); try (AbfsPerfInfo perfInfoSet = startTracking("removeAcl", "setAcl")) { - final AbfsRestOperation setAclOp = client + final AbfsRestOperation setAclOp = getClient() .setAcl(relativePath, AbfsAclHelper.serializeAclSpec(newAclEntries), eTag, tracingContext); perfInfoSet.registerResult(setAclOp.getResult()) @@ -1623,7 +1623,7 @@ public void setAcl(final Path path, final List aclSpec, LOG.debug( "setAcl filesystem: {} path: {} aclspec: {}", - client.getFileSystem(), + getClient().getFileSystem(), path, AclEntry.aclSpecToString(aclSpec)); @@ -1633,7 +1633,7 @@ public void setAcl(final Path path, final List aclSpec, String relativePath = getRelativePath(path); - final AbfsRestOperation op = client + final AbfsRestOperation op = getClient() .getAclStatus(relativePath, isUpnFormat, tracingContext); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -1646,7 +1646,7 @@ public void setAcl(final Path path, final List aclSpec, try (AbfsPerfInfo perfInfoSet = startTracking("setAcl", "setAcl")) { final AbfsRestOperation setAclOp = - client.setAcl(relativePath, + getClient().setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries), eTag, tracingContext); perfInfoSet.registerResult(setAclOp.getResult()) .registerSuccess(true) @@ -1666,10 +1666,10 @@ public AclStatus getAclStatus(final Path path, TracingContext tracingContext) LOG.debug( "getAclStatus filesystem: {} path: {}", - client.getFileSystem(), + getClient().getFileSystem(), path); - AbfsRestOperation op = client + AbfsRestOperation op = getClient() .getAclStatus(getRelativePath(path), tracingContext); AbfsHttpOperation result = op.getResult(); perfInfo.registerResult(result); @@ -1706,7 +1706,7 @@ public AclStatus getAclStatus(final Path path, TracingContext tracingContext) public void access(final Path path, final FsAction mode, TracingContext tracingContext) throws AzureBlobFileSystemException { LOG.debug("access for filesystem: {}, path: {}, mode: {}", - this.client.getFileSystem(), path, mode); + this.getClient().getFileSystem(), path, mode); if (!this.abfsConfiguration.isCheckAccessEnabled() || !getIsNamespaceEnabled(tracingContext)) { LOG.debug("Returning; either check access is not enabled or the account" @@ -1714,7 +1714,7 @@ public void access(final Path path, final FsAction mode, return; } try (AbfsPerfInfo perfInfo = startTracking("access", "checkAccess")) { - final AbfsRestOperation op = this.client + final AbfsRestOperation op = this.getClient() .checkAccess(getRelativePath(path), mode.SYMBOL, tracingContext); perfInfo.registerResult(op.getResult()).registerSuccess(true); } @@ -1745,7 +1745,7 @@ public boolean isInfiniteLeaseKey(String key) { private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) throws IOException { - if (this.client != null) { + if (this.getClient() != null) { return; } @@ -1819,7 +1819,7 @@ private void initializeClient(URI uri, String fileSystemName, populateAbfsClientContext()); } - this.client = getClientHandler().getClient(); + this.setClient(getClientHandler().getClient()); LOG.trace("AbfsClient init complete"); } @@ -2189,7 +2189,7 @@ private AbfsLease maybeCreateLease(String relativePath, TracingContext tracingCo if (!enableInfiniteLease) { return null; } - AbfsLease lease = new AbfsLease(client, relativePath, tracingContext); + AbfsLease lease = new AbfsLease(getClient(), relativePath, tracingContext); leaseRefs.put(lease, null); return lease; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 26106a717c94f..fb5cb58937220 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -50,7 +50,44 @@ public final class AbfsHttpConstants { public static final String DEFAULT_LEASE_BREAK_PERIOD = "0"; public static final String DEFAULT_TIMEOUT = "90"; public static final String APPEND_BLOB_TYPE = "appendblob"; - public static final String TOKEN_VERSION = "2"; + + //Abfs Http Client Constants for Blob Endpoint APIs. + + /** + * HTTP Header Value to denote resource type as container. + * {@value}. + */ + public static final String CONTAINER = "container"; + + /** + * HTTP Header Value to denote component as metadata. + * {@value}. + */ + public static final String METADATA = "metadata"; + + /** + * HTTP Header Value to denote component as block. + * {@value}. + */ + public static final String BLOCK = "block"; + + /** + * HTTP Header Value to denote component as blocklist. + * {@value}. + */ + public static final String BLOCKLIST = "blocklist"; + + /** + * HTTP Header Value to denote component as lease. + * {@value}. + */ + public static final String LEASE = "lease"; + + /** + * HTTP Header Value to denote bock list type as committed. + * {@value}. + */ + public static final String BLOCK_TYPE_COMMITTED = "committed"; public static final String JAVA_VENDOR = "java.vendor"; public static final String JAVA_VERSION = "java.version"; @@ -60,6 +97,10 @@ public final class AbfsHttpConstants { public static final String APN_VERSION = "APN/1.0"; public static final String CLIENT_VERSION = "Azure Blob FS/" + VersionInfo.getVersion(); + /** + * {@value}. + */ + public static final String TOKEN_VERSION = "2"; // Abfs Http Verb public static final String HTTP_METHOD_DELETE = "DELETE"; @@ -92,6 +133,7 @@ public final class AbfsHttpConstants { public static final String HTTP_HEADER_PREFIX = "x-ms-"; public static final String HASH = "#"; public static final String TRUE = "true"; + public static final String ZERO = "0"; public static final String PLUS_ENCODE = "%20"; public static final String FORWARD_SLASH_ENCODE = "%2F"; @@ -101,6 +143,7 @@ public final class AbfsHttpConstants { public static final String GMT_TIMEZONE = "GMT"; public static final String APPLICATION_JSON = "application/json"; public static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; + public static final String APPLICATION_XML = "application/xml"; public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1"; public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java index 8c9c8af75b53d..6b6e98c9c7082 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java @@ -45,8 +45,7 @@ public enum FSOperationType { SET_OWNER("SO"), SET_ACL("SA"), TEST_OP("TS"), - WRITE("WR"), - INIT("IN"); + WRITE("WR"); private final String opCode; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index b3c2b21d3c277..53020750ab310 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -59,7 +59,6 @@ public final class HttpHeaderConfigurations { public static final String X_MS_ACL = "x-ms-acl"; public static final String X_MS_PERMISSIONS = "x-ms-permissions"; public static final String X_MS_UMASK = "x-ms-umask"; - public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled"; public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency"; public static final String X_MS_ENCRYPTION_KEY = "x-ms-encryption-key"; public static final String X_MS_ENCRYPTION_KEY_SHA256 = "x-ms-encryption-key-sha256"; @@ -70,10 +69,40 @@ public final class HttpHeaderConfigurations { public static final String X_MS_LEASE_ACTION = "x-ms-lease-action"; public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration"; public static final String X_MS_LEASE_ID = "x-ms-lease-id"; + + /** + * Http Request Header for denoting the lease id of source in copy operation. + * {@value} + */ + public static final String X_MS_SOURCE_LEASE_ID = "x-ms-source-lease-id"; public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id"; public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period"; public static final String EXPECT = "Expect"; public static final String X_MS_RANGE_GET_CONTENT_MD5 = "x-ms-range-get-content-md5"; + /** + * Http Response Header for denoting directory. + * {@value} + */ + public static final String X_MS_META_HDI_ISFOLDER = "x-ms-meta-hdi_isfolder"; + + /** + * Http Response Header prefix for user-defined properties. + * {@value} + */ + public static final String X_MS_METADATA_PREFIX = "x-ms-meta-"; + + /** + * Http Request Header for denoting the source of copy operation. + * {@value} + */ + public static final String X_MS_COPY_SOURCE = "x-ms-copy-source"; + + /** + * Http Request Header for denoting MD5 hash of the blob content. + * {@value} + */ + public static final String X_MS_BLOB_CONTENT_MD5 = "x-ms-blob-content-md5"; + private HttpHeaderConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java index f7e37dcb6d50d..f4dd38585f5ee 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java @@ -42,6 +42,32 @@ public final class HttpQueryParams { public static final String QUERY_PARAM_BLOBTYPE = "blobtype"; public static final String QUERY_PARAM_PAGINATED = "paginated"; + // query parameters for Blob Endpoint Rest APIs + + /** + * Http Query parameter for specifying resource type. + * {@value} + */ + public static final String QUERY_PARAM_RESTYPE = "restype"; + + /** + * Http Query parameter for specifying component. + * {@value} + */ + public static final String QUERY_PARAM_COMP = "comp"; + + /** + * Http Query parameter for specifying blockId. + * {@value} + */ + public static final String QUERY_PARAM_BLOCKID = "blockid"; + + /** + * Http Query parameter for specifying block list type. + * {@value} + */ + public static final String QUERY_PARAM_BLOCKLISTTYPE = "blocklisttype"; + //query params for SAS public static final String QUERY_PARAM_SAOID = "saoid"; public static final String QUERY_PARAM_SKOID = "skoid"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java index 9da6427d65c2c..f0510d7ac441a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java @@ -36,7 +36,19 @@ public enum Mode { private final String leaseId; private boolean isExpectHeaderEnabled; private boolean isRetryDueToExpect; + private final BlobAppendRequestParameters blobParams; + + /** + * Constructor to be used for interacting with AbfsDfsClient. + * @param position position in remote blob at which append should happen + * @param offset position in the buffer to be appended + * @param length length of the data to be appended + * @param mode mode of the append operation + * @param isAppendBlob true if the blob is append-blob + * @param leaseId leaseId of the blob to be appended + * @param isExpectHeaderEnabled true if the expect header is enabled + */ public AppendRequestParameters(final long position, final int offset, final int length, @@ -52,6 +64,37 @@ public AppendRequestParameters(final long position, this.leaseId = leaseId; this.isExpectHeaderEnabled = isExpectHeaderEnabled; this.isRetryDueToExpect = false; + this.blobParams = null; + } + + /** + * Constructor to be used for interacting with AbfsBlobClient. + * @param position position in remote blob at which append should happen + * @param offset position in the buffer to be appended + * @param length length of the data to be appended + * @param mode mode of the append operation + * @param isAppendBlob true if the blob is append-blob + * @param leaseId leaseId of the blob to be appended + * @param isExpectHeaderEnabled true if the expect header is enabled + * @param blobParams parameters specific to append operation on Blob Endpoint. + */ + public AppendRequestParameters(final long position, + final int offset, + final int length, + final Mode mode, + final boolean isAppendBlob, + final String leaseId, + final boolean isExpectHeaderEnabled, + final BlobAppendRequestParameters blobParams) { + this.position = position; + this.offset = offset; + this.length = length; + this.mode = mode; + this.isAppendBlob = isAppendBlob; + this.leaseId = leaseId; + this.isExpectHeaderEnabled = isExpectHeaderEnabled; + this.isRetryDueToExpect = false; + this.blobParams = blobParams; } public long getPosition() { @@ -86,6 +129,22 @@ public boolean isRetryDueToExpect() { return isRetryDueToExpect; } + /** + * Returns BlockId of the block blob to be appended. + * @return blockId + */ + public String getBlockId() { + return blobParams.getBlockId(); + } + + /** + * Returns ETag of the block blob. + * @return eTag + */ + public String getETag() { + return blobParams.getETag(); + } + public void setRetryDueToExpect(boolean retryDueToExpect) { isRetryDueToExpect = retryDueToExpect; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index 439caabe2327f..db1560d541430 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -34,10 +34,12 @@ public enum AzureServiceErrorCode { FILE_SYSTEM_ALREADY_EXISTS("FilesystemAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null), PATH_ALREADY_EXISTS("PathAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null), + BLOB_ALREADY_EXISTS("BlobAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null), INTERNAL_OPERATION_ABORT("InternalOperationAbortError", HttpURLConnection.HTTP_CONFLICT, null), PATH_CONFLICT("PathConflict", HttpURLConnection.HTTP_CONFLICT, null), FILE_SYSTEM_NOT_FOUND("FilesystemNotFound", HttpURLConnection.HTTP_NOT_FOUND, null), PATH_NOT_FOUND("PathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null), + BLOB_PATH_NOT_FOUND("BlobNotFound", HttpURLConnection.HTTP_NOT_FOUND, null), PRE_CONDITION_FAILED("PreconditionFailed", HttpURLConnection.HTTP_PRECON_FAILED, null), SOURCE_PATH_NOT_FOUND("SourcePathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null), INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null), diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobAppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobAppendRequestParameters.java new file mode 100644 index 0000000000000..25e3118265d6d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/BlobAppendRequestParameters.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.services; + +/** + * Following parameters are used by AbfsBlobClient only. + * Blob Endpoint Append API requires blockId and eTag to be passed in the request. + */ +public class BlobAppendRequestParameters { + private String blockId; + private String eTag; + + /** + * Constructor to be used for interacting with AbfsBlobClient. + * @param blockId blockId of the block to be appended + * @param eTag eTag of the blob being appended + */ + public BlobAppendRequestParameters(String blockId, String eTag) { + this.blockId = blockId; + this.eTag = eTag; + } + + public String getBlockId() { + return blockId; + } + + public String getETag() { + return eTag; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java new file mode 100644 index 0000000000000..07c25b32483d3 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -0,0 +1,1087 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK_TYPE_COMMITTED; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BREAK_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CONTAINER; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LEASE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.METADATA; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_DURATION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_META_HDI_ISFOLDER; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RANGE_GET_CONTENT_MD5; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SOURCE_LEASE_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKLISTTYPE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE; + +/** + * AbfsClient interacting with Blob endpoint. + */ +public class AbfsBlobClient extends AbfsClient { + + public AbfsBlobClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, + encryptionContextProvider, abfsClientContext); + } + + public AbfsBlobClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider, + encryptionContextProvider, abfsClientContext); + } + + /** + * Create request headers for Rest Operation using the default API version. + * @return default request headers. + */ + @Override + public List createDefaultHeaders() { + return this.createDefaultHeaders(getxMsVersion()); + } + + /** + * Create request headers for Rest Operation using the specified API version. + * Blob Endpoint API responses are in JSON/XML format. + * @param xMsVersion API version to be used. + * @return default request headers + */ + @Override + public List createDefaultHeaders(ApiVersion xMsVersion) { + List requestHeaders = super.createCommonHeaders(xMsVersion); + requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON + + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM + + COMMA + SINGLE_WHITE_SPACE + APPLICATION_XML)); + return requestHeaders; + } + + /** + * Get Rest Operation for API + * Create Container. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation createFilesystem(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.CreateContainer, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * Set Container Metadata. + * @param properties comma separated list of metadata key-value pairs. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setFilesystemProperties(final Hashtable properties, + TracingContext tracingContext) throws AzureBlobFileSystemException { + List requestHeaders = createDefaultHeaders(); + /* + * Blob Endpoint supports Unicode characters but DFS Endpoint only allow ASCII. + * To match the behavior across endpoints, driver throws exception if non-ASCII characters are found. + */ + try { + List metadataRequestHeaders = getMetadataHeadersList(properties); + requestHeaders.addAll(metadataRequestHeaders); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + + AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, METADATA); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetContainerMetadata, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * Get Container Metadata. + * Gets all the properties of the filesystem. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + * */ + @Override + public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetContainerProperties, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * Delete Container. + * Deletes the Container acting as current filesystem. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.DeleteContainer, + HTTP_METHOD_DELETE, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * List Blobs. + * @param relativePath to return only blobs with names that begin with the specified prefix. + * @param recursive to return all blobs in the path, including those in subdirectories. + * @param listMaxResults maximum number of blobs to return. + * @param continuation marker to specify the continuation token. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation or response parsing fails. + */ + @Override + public AbfsRestOperation listPath(final String relativePath, + final boolean recursive, + final int listMaxResults, + final String continuation, + TracingContext tracingContext) throws AzureBlobFileSystemException { + // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of response handling of blob endpoint APIs. + throw new NotImplementedException("Blob Endpoint Support is not yet implemented"); + } + + /** + * Get Rest Operation for API + * Put Blob. + * Creates a file or directory(marker file) at specified path. + * @param path of the directory to be created. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation createPath(final String path, + final boolean isFile, + final boolean overwrite, + final AzureBlobFileSystemStore.Permissions permissions, + final boolean isAppendBlob, + final String eTag, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of response handling of blob endpoint APIs. + throw new NotImplementedException("Create Path operation on Blob endpoint yet to be implemented."); + } + + /** + * Get Rest Operation for API + * Lease Blob. + * @param path on which lease has to be acquired. + * @param duration for which lease has to be acquired. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation acquireLease(final String path, final int duration, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration))); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString())); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeaseBlob, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * Lease Blob. + * @param path on which lease has to be renewed. + * @param leaseId of the lease to be renewed. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation renewLease(final String path, final String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeaseBlob, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * Lease Blob. + * @param path on which lease has to be released. + * @param leaseId of the lease to be released. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation releaseLease(final String path, final String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeaseBlob, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * Lease Blob. + * @param path on which lease has to be broken. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation breakLease(final String path, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeaseBlob, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get results for the rename operation. + * @param source path to source file + * @param destination destination of rename. + * @param continuation continuation. + * @param tracingContext trace context + * @param sourceEtag etag of source file. may be null or empty + * @param isMetadataIncompleteState was there a rename failure due to + * incomplete metadata state? + * @param isNamespaceEnabled whether namespace enabled account or not + * @return result of rename operation + * @throws IOException if rename operation fails. + */ + @Override + public AbfsClientRenameResult renamePath(final String source, + final String destination, + final String continuation, + final TracingContext tracingContext, + final String sourceEtag, + final boolean isMetadataIncompleteState, + final boolean isNamespaceEnabled) throws IOException { + /** + * TODO: [FnsOverBlob] To be implemented as part of rename-delete over blob endpoint work. HADOOP-19233. + */ + throw new NotImplementedException("Rename operation on Blob endpoint yet to be implemented."); + } + + /** + * Get Rest Operation for API + * Put Block. + * Uploads data to be appended to a file. + * @param path to which data has to be appended. + * @param buffer containing data to be appended. + * @param reqParams containing parameters for append operation like offset, length etc. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation append(final String path, + final byte[] buffer, + final AppendRequestParameters reqParams, + final String cachedSasToken, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, String.valueOf(buffer.length))); + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, reqParams.getETag())); + if (reqParams.getLeaseId() != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId())); + } + if (reqParams.isExpectHeaderEnabled()) { + requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); + } + if (isChecksumValidationEnabled()) { + addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer); + } + if (reqParams.isRetryDueToExpect()) { + String userAgentRetry = getUserAgent(); + userAgentRetry = userAgentRetry.replace(HUNDRED_CONTINUE_USER_AGENT, EMPTY_STRING); + requestHeaders.removeIf(header -> header.getName().equalsIgnoreCase(USER_AGENT)); + requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCK); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOCKID, reqParams.getBlockId()); + + String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.PutBlock, + HTTP_METHOD_PUT, url, requestHeaders, + buffer, reqParams.getoffset(), reqParams.getLength(), + sasTokenForReuse); + + try { + op.execute(tracingContext); + } catch (AbfsRestOperationException e) { + /* + If the http response code indicates a user error we retry + the same append request with expect header being disabled. + When "100-continue" header is enabled but a non Http 100 response comes, + the response message might not get set correctly by the server. + So, this handling is to avoid breaking of backward compatibility + if someone has taken dependency on the exception message, + which is created using the error string present in the response header. + */ + int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode(); + if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) { + LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path); + reqParams.setExpectHeaderEnabled(false); + reqParams.setRetryDueToExpect(true); + return this.append(path, buffer, reqParams, cachedSasToken, + contextEncryptionAdapter, tracingContext); + } + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + + if (isMd5ChecksumError(e)) { + throw new AbfsInvalidChecksumException(e); + } + + throw e; + } + catch (AzureBlobFileSystemException e) { + // Any server side issue will be returned as AbfsRestOperationException and will be handled above. + LOG.debug("Append request failed with non server issues for path: {}, offset: {}, position: {}", + path, reqParams.getoffset(), reqParams.getPosition()); + throw e; + } + return op; + } + + /** + * Blob Endpoint needs blockIds to flush the data. + * This method is not supported on Blob Endpoint. + * @param path on which data has to be flushed. + * @param position to which data has to be flushed. + * @param retainUncommittedData whether to retain uncommitted data after flush. + * @param isClose specify if this is the last flush to the file. + * @param cachedSasToken to be used for the authenticating operation. + * @param leaseId if there is an active lease on the path. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return exception as this operation is not supported on Blob Endpoint. + * @throws UnsupportedOperationException always. + */ + @Override + public AbfsRestOperation flush(final String path, + final long position, + final boolean retainUncommittedData, + final boolean isClose, + final String cachedSasToken, + final String leaseId, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + throw new UnsupportedOperationException( + "Flush without blockIds not supported on Blob Endpoint"); + } + + /** + * Get Rest Operation for API + * Put Block List. + * The flush operation to commit the blocks. + * @param buffer This has the xml in byte format with the blockIds to be flushed. + * @param path The path to flush the data to. + * @param isClose True when the stream is closed. + * @param cachedSasToken The cachedSasToken if available. + * @param leaseId The leaseId of the blob if available. + * @param eTag The etag of the blob. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation flush(byte[] buffer, + final String path, + boolean isClose, + final String cachedSasToken, + final String leaseId, + final String eTag, + ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, String.valueOf(buffer.length))); + requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML)); + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + if (leaseId != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + } + String md5Hash = computeMD5Hash(buffer, 0, buffer.length); + requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Hash)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); + String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.PutBlockList, + HTTP_METHOD_PUT, url, requestHeaders, + buffer, 0, buffer.length, + sasTokenForReuse); + try { + op.execute(tracingContext); + } catch (AbfsRestOperationException ex) { + // If 412 Condition Not Met error is seen on retry it means it's either a + // parallel write case or the previous request has failed due to network + // issue and flush has actually succeeded in the backend. If MD5 hash of + // blockIds matches with what was set by previous request, it means the + // previous request itself was successful, else request will fail with 412 itself. + if (op.getRetryCount() >= 1 && ex.getStatusCode() == HTTP_PRECON_FAILED) { + AbfsRestOperation op1 = getPathStatus(path, true, tracingContext, + contextEncryptionAdapter); + String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5); + if (!md5Hash.equals(metadataMd5)) { + throw ex; + } + return op; + } + throw ex; + } + return op; + } + + /** + * Get Rest Operation for API + * Set Blob Metadata. + * Set the properties of a file or directory. + * @param path on which properties have to be set. + * @param properties comma separated list of metadata key-value pairs. + * @param tracingContext for tracing the service call. + * @param contextEncryptionAdapter to provide encryption context. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setPathProperties(final String path, + final Hashtable properties, + final TracingContext tracingContext, + final ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException { + List requestHeaders = createDefaultHeaders(); + /* + * Blob Endpoint supports Unicode characters but DFS Endpoint only allow ASCII. + * To match the behavior across endpoints, driver throws exception if non-ASCII characters are found. + */ + try { + List metadataRequestHeaders = getMetadataHeadersList(properties); + requestHeaders.addAll(metadataRequestHeaders); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + + AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, METADATA); + appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetPathProperties, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * Get Blob Properties. + * Get the properties of a file or directory. + * @param path of which properties have to be fetched. + * @param includeProperties to include user defined properties. + * @param tracingContext for tracing the service call. + * @param contextEncryptionAdapter to provide encryption context. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation getPathStatus(final String path, + final boolean includeProperties, + final TracingContext tracingContext, + final ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException { + return this.getPathStatus(path, tracingContext, + contextEncryptionAdapter, true); + + } + + /** + * Get Rest Operation for API + * Get Blob Properties. + * Get the properties of a file or directory. + * @param path of which properties have to be fetched. + * @param tracingContext for tracing the service call. + * @param contextEncryptionAdapter to provide encryption context. + * @param isImplicitCheckRequired specify if implicit check is required. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public AbfsRestOperation getPathStatus(final String path, + final TracingContext tracingContext, + final ContextEncryptionAdapter contextEncryptionAdapter, + final boolean isImplicitCheckRequired) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, + String.valueOf(getAbfsConfiguration().isUpnUsed())); + appendSASTokenToQuery(path, SASTokenProvider.GET_PROPERTIES_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetPathStatus, + HTTP_METHOD_HEAD, url, requestHeaders); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException ex) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw ex; + } + if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isImplicitCheckRequired) { + // This path could be present as an implicit directory in FNS. + // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of implicit directory handling over blob endpoint. + } + if (op.getResult().getStatusCode() == HTTP_NOT_FOUND) { + /* + * Exception handling at AzureBlobFileSystem happens as per the error-code. + * In case of HEAD call that gets 4XX status, error code is not parsed from the response. + * Hence, we are throwing a new exception with error code and message. + */ + throw new AbfsRestOperationException(HTTP_NOT_FOUND, + AzureServiceErrorCode.BLOB_PATH_NOT_FOUND.getErrorCode(), + ex.getMessage(), ex); + } + throw ex; + } + return op; + } + + /** + * Get Rest Operation for API + * Get Blob. + * Read the contents of the file at specified path + * @param path of the file to be read. + * @param position in the file from where data has to be read. + * @param buffer to store the data read. + * @param bufferOffset offset in the buffer to start storing the data. + * @param bufferLength length of data to be read. + * @param eTag to specify conditional headers. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation read(final String path, + final long position, + final byte[] buffer, + final int bufferOffset, + final int bufferLength, + final String eTag, + final String cachedSasToken, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, String.format( + "bytes=%d-%d", position, position + bufferLength - 1)); + requestHeaders.add(rangeHeader); + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + + // Add request header to fetch MD5 Hash of data returned by server. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetBlob, + HTTP_METHOD_GET, url, requestHeaders, + buffer, bufferOffset, bufferLength, + sasTokenForReuse); + op.execute(tracingContext); + + // Verify the MD5 hash returned by server holds valid on the data received. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + verifyCheckSumForRead(buffer, op.getResult(), bufferOffset); + } + + return op; + } + + /** + * Orchestration for delete operation to be implemented. + * @param path to be deleted. + * @param recursive if the path is a directory, delete recursively. + * @param continuation to specify continuation token. + * @param tracingContext for tracing the server calls. + * @param isNamespaceEnabled specify if the namespace is enabled. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation deletePath(final String path, + final boolean recursive, + final String continuation, + TracingContext tracingContext, + final boolean isNamespaceEnabled) throws AzureBlobFileSystemException { + // TODO: [FnsOverBlob][HADOOP-19233] To be implemented as part of rename-delete over blob endpoint work. + throw new NotImplementedException("Delete operation on Blob endpoint will be implemented in future."); + } + + /** + * Set the owner of the file or directory. + * Not supported for HNS-Disabled Accounts. + * @param path on which owner has to be set. + * @param owner to be set. + * @param group to be set. + * @param tracingContext for tracing the server calls. + * @return exception as this operation is not supported on Blob Endpoint. + * @throws UnsupportedOperationException always. + */ + @Override + public AbfsRestOperation setOwner(final String path, + final String owner, + final String group, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + throw new UnsupportedOperationException( + "SetOwner operation is only supported on HNS enabled Accounts."); + } + + /** + * Set the permission of the file or directory. + * Not supported for HNS-Disabled Accounts. + * @param path on which permission has to be set. + * @param permission to be set. + * @param tracingContext for tracing the server calls. + * @return exception as this operation is not supported on Blob Endpoint. + * @throws UnsupportedOperationException always. + */ + @Override + public AbfsRestOperation setPermission(final String path, + final String permission, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + throw new UnsupportedOperationException( + "SetPermission operation is only supported on HNS enabled Accounts."); + } + + /** + * Set the ACL of the file or directory. + * Not supported for HNS-Disabled Accounts. + * @param path on which ACL has to be set. + * @param aclSpecString to be set. + * @param eTag to specify conditional headers. Set only if etag matches. + * @param tracingContext for tracing the server calls. + * @return exception as this operation is not supported on Blob Endpoint. + * @throws UnsupportedOperationException always. + */ + @Override + public AbfsRestOperation setAcl(final String path, + final String aclSpecString, + final String eTag, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + throw new UnsupportedOperationException( + "SetAcl operation is only supported on HNS enabled Accounts."); + } + + /** + * Get the ACL of the file or directory. + * Not supported for HNS-Disabled Accounts. + * @param path of which properties have to be fetched. + * @param useUPN whether to use UPN with rest operation. + * @param tracingContext for tracing the server calls. + * @return exception as this operation is not supported on Blob Endpoint. + * @throws UnsupportedOperationException always. + */ + @Override + public AbfsRestOperation getAclStatus(final String path, + final boolean useUPN, + TracingContext tracingContext) throws AzureBlobFileSystemException { + throw new UnsupportedOperationException( + "GetAclStatus operation is only supported on HNS enabled Accounts."); + } + + /** + * Check the access of the file or directory. + * Not supported for HNS-Disabled Accounts. + * @param path Path for which access check needs to be performed + * @param rwx The permission to be checked on the path + * @param tracingContext Tracks identifiers for request header + * @return exception as this operation is not supported on Blob Endpoint. + * @throws UnsupportedOperationException always. + */ + @Override + public AbfsRestOperation checkAccess(String path, + String rwx, + TracingContext tracingContext) throws AzureBlobFileSystemException { + throw new UnsupportedOperationException( + "CheckAccess operation is only supported on HNS enabled Accounts."); + } + + /** + * Checks if the rest operation results indicate if the path is a directory. + * @param result executed rest operation containing response from server. + * @return True if the path is a directory, False otherwise. + */ + @Override + public boolean checkIsDir(AbfsHttpOperation result) { + String resourceType = result.getResponseHeader(X_MS_META_HDI_ISFOLDER); + return resourceType != null && resourceType.equals(TRUE); + } + + /** + * Returns true if the status code lies in the range of user error. + * In the case of HTTP_CONFLICT for PutBlockList we fall back to DFS and hence + * this retry handling is not needed. + * @param responseStatusCode http response status code. + * @return True or False. + */ + @Override + public boolean checkUserError(int responseStatusCode) { + return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST + && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR + && responseStatusCode != HttpURLConnection.HTTP_CONFLICT); + } + + /** + * Get Rest Operation for API + * Get Block List. + * Get the list of committed block ids of the blob. + * @param path The path to get the list of blockId's. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public AbfsRestOperation getBlockList(final String path, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + String operation = SASTokenProvider.READ_OPERATION; + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); + + abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOCKLISTTYPE, BLOCK_TYPE_COMMITTED); + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetBlockList, HTTP_METHOD_GET, url, + requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * Copy Blob. + * This is an asynchronous API, it returns copyId and expects client + * to poll the server on the destination and check the copy-progress. + * @param sourceBlobPath path of source to be copied. + * @param destinationBlobPath path of the destination. + * @param srcLeaseId if source path has an active lease. + * @param tracingContext for tracing the service call. + * @return executed rest operation containing response from server. + * This method owns the logic of triggering copyBlob API. The caller of this + * method have to own the logic of polling the destination with the copyId + * returned in the response from this method. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public AbfsRestOperation copyBlob(Path sourceBlobPath, + Path destinationBlobPath, + final String srcLeaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException { + AbfsUriQueryBuilder abfsUriQueryBuilderDst = createDefaultUriQueryBuilder(); + AbfsUriQueryBuilder abfsUriQueryBuilderSrc = new AbfsUriQueryBuilder(); + String dstBlobRelativePath = destinationBlobPath.toUri().getPath(); + String srcBlobRelativePath = sourceBlobPath.toUri().getPath(); + appendSASTokenToQuery(dstBlobRelativePath, + SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilderDst); + appendSASTokenToQuery(srcBlobRelativePath, + SASTokenProvider.READ_OPERATION, abfsUriQueryBuilderSrc); + final URL url = createRequestUrl(dstBlobRelativePath, + abfsUriQueryBuilderDst.toString()); + final String sourcePathUrl = createRequestUrl(srcBlobRelativePath, + abfsUriQueryBuilderSrc.toString()).toString(); + List requestHeaders = createDefaultHeaders(); + if (srcLeaseId != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_SOURCE_LEASE_ID, srcLeaseId)); + } + requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl)); + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + + return getAbfsRestOperation(AbfsRestOperationType.CopyBlob, HTTP_METHOD_PUT, + url, requestHeaders); + } + + /** + * Get Rest Operation for API + * Delete Blob. + * Deletes the blob at the given path. + * @param blobPath path of the blob to be deleted. + * @param leaseId if path has an active lease. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public AbfsRestOperation deleteBlobPath(final Path blobPath, + final String leaseId, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + String blobRelativePath = blobPath.toUri().getPath(); + appendSASTokenToQuery(blobRelativePath, + SASTokenProvider.DELETE_OPERATION, abfsUriQueryBuilder); + final URL url = createRequestUrl(blobRelativePath, abfsUriQueryBuilder.toString()); + final List requestHeaders = createDefaultHeaders(); + if (leaseId != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + } + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.DeleteBlob, HTTP_METHOD_DELETE, url, + requestHeaders); + op.execute(tracingContext); + return op; + } + + private static String encodeMetadataAttribute(String value) + throws UnsupportedEncodingException { + return value == null ? null + : URLEncoder.encode(value, XMS_PROPERTIES_ENCODING_UNICODE); + } + + private static String decodeMetadataAttribute(String encoded) + throws UnsupportedEncodingException { + return encoded == null ? null + : URLDecoder.decode(encoded, XMS_PROPERTIES_ENCODING_UNICODE); + } + + /** + * Checks if the value contains pure ASCII characters or not. + * @param value to be checked. + * @return true if pureASCII. + * @throws CharacterCodingException if not pure ASCII + */ + private boolean isPureASCII(String value) throws CharacterCodingException { + final CharsetEncoder encoder = Charset.forName( + XMS_PROPERTIES_ENCODING_ASCII).newEncoder(); + boolean canEncodeValue = encoder.canEncode(value); + if (!canEncodeValue) { + LOG.debug("Value {} for ne of the metadata is not pure ASCII.", value); + throw new CharacterCodingException(); + } + return true; + } + + private List getMetadataHeadersList(final Hashtable properties) + throws AbfsRestOperationException, CharacterCodingException { + List metadataRequestHeaders = new ArrayList<>(); + for (Map.Entry entry : properties.entrySet()) { + String key = X_MS_METADATA_PREFIX + entry.getKey(); + String value = entry.getValue(); + // AzureBlobFileSystem supports only ASCII Characters in property values. + if (isPureASCII(value)) { + try { + value = encodeMetadataAttribute(value); + } catch (UnsupportedEncodingException e) { + throw new InvalidAbfsRestOperationException(e); + } + metadataRequestHeaders.add(new AbfsHttpHeader(key, value)); + } + } + return metadataRequestHeaders; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 59b78cbf6195f..b51131b959854 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -794,6 +794,7 @@ public abstract AbfsRestOperation flush(String path, long position, * @param cachedSasToken to be used for the authenticating operation. * @param leaseId if there is an active lease on the path. * @param eTag to specify conditional headers. + * @param contextEncryptionAdapter to provide encryption context. * @param tracingContext for tracing the server calls. * @return executed rest operation containing response from server. * @throws AzureBlobFileSystemException if rest operation fails. @@ -804,6 +805,7 @@ public abstract AbfsRestOperation flush(byte[] buffer, String cachedSasToken, String leaseId, String eTag, + ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) throws AzureBlobFileSystemException; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java index 12d800939ae95..e0be9cbc8a82d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs; +import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromDfsToBlob; /** * AbfsClientHandler is a class that provides a way to get the AbfsClient @@ -41,6 +42,7 @@ public class AbfsClientHandler { private AbfsServiceType defaultServiceType; private final AbfsDfsClient dfsAbfsClient; + private final AbfsBlobClient blobAbfsClient; public AbfsClientHandler(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, @@ -51,6 +53,9 @@ public AbfsClientHandler(final URL baseUrl, this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, null, encryptionContextProvider, abfsClientContext); + this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials, + abfsConfiguration, tokenProvider, null, encryptionContextProvider, + abfsClientContext); initServiceType(abfsConfiguration); } @@ -63,6 +68,9 @@ public AbfsClientHandler(final URL baseUrl, this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, abfsClientContext); + this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials, + abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, + abfsClientContext); initServiceType(abfsConfiguration); } @@ -84,24 +92,24 @@ public AbfsClient getClient() { /** * Get the AbfsClient based on the service type. - * @param serviceType AbfsServiceType + * @param serviceType AbfsServiceType. * @return AbfsClient */ public AbfsClient getClient(AbfsServiceType serviceType) { - return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : null; + return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : blobAbfsClient; } /** * Create the AbfsDfsClient using the url used to configure file system. * If URL is for Blob endpoint, it will be converted to DFS endpoint. - * @param baseUrl URL - * @param creds SharedKeyCredentials - * @param abfsConfiguration AbfsConfiguration - * @param tokenProvider AccessTokenProvider - * @param sasTokenProvider SASTokenProvider - * @param encryptionContextProvider EncryptionContextProvider - * @param abfsClientContext AbfsClientContext - * @return AbfsDfsClient with DFS endpoint URL + * @param baseUrl URL. + * @param creds SharedKeyCredentials. + * @param abfsConfiguration AbfsConfiguration. + * @param tokenProvider AccessTokenProvider. + * @param sasTokenProvider SASTokenProvider. + * @param encryptionContextProvider EncryptionContextProvider. + * @param abfsClientContext AbfsClientContext. + * @return AbfsDfsClient with DFS endpoint URL. * @throws IOException if URL conversion fails. */ private AbfsDfsClient createDfsClient(final URL baseUrl, @@ -124,4 +132,38 @@ private AbfsDfsClient createDfsClient(final URL baseUrl, abfsClientContext); } } + + /** + * Create the AbfsBlobClient using the url used to configure file system. + * If URL is for DFS endpoint, it will be converted to Blob endpoint. + * @param baseUrl URL. + * @param creds SharedKeyCredentials. + * @param abfsConfiguration AbfsConfiguration. + * @param tokenProvider AccessTokenProvider. + * @param sasTokenProvider SASTokenProvider. + * @param encryptionContextProvider EncryptionContextProvider. + * @param abfsClientContext AbfsClientContext. + * @return AbfsBlobClient with Blob endpoint URL. + * @throws IOException if URL conversion fails. + */ + private AbfsBlobClient createBlobClient(final URL baseUrl, + final SharedKeyCredentials creds, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + URL blobUrl = changeUrlFromDfsToBlob(baseUrl); + if (tokenProvider != null) { + LOG.debug("Creating AbfsBlobClient with access token provider using the URL: {}", blobUrl); + return new AbfsBlobClient(blobUrl, creds, abfsConfiguration, + tokenProvider, encryptionContextProvider, + abfsClientContext); + } else { + LOG.debug("Creating AbfsBlobClient with SAS token provider using the URL: {}", blobUrl); + return new AbfsBlobClient(blobUrl, creds, abfsConfiguration, + sasTokenProvider, encryptionContextProvider, + abfsClientContext); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index f2eebd8800f15..7d50260f7bad3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -837,6 +837,7 @@ public AbfsRestOperation flush(byte[] buffer, final String cachedSasToken, final String leaseId, final String eTag, + final ContextEncryptionAdapter contextEncryptionAdapter, final TracingContext tracingContext) throws AzureBlobFileSystemException { throw new UnsupportedOperationException( "Flush with blockIds not supported on DFS Endpoint"); @@ -1282,6 +1283,7 @@ private String convertXmsPropertiesToCommaSeparatedString(final Map + +# Azure Blob Storage REST API (Blob Endpoint) + +## Introduction +The REST API for Blob Storage defines HTTP operations against the storage account, containers(filesystems), and blobs.(files) +The API includes the operations listed in the following table. + +| Operation | Resource Type | Description | +|-------------------------------------------------------|---------------|---------------------------------------------------------------------------------------------| +| [Create Container](#create-container) | Filesystem | Creates a new azure storage container to be used as an hadoop filesystem. | +| [Delete Container](#delete-container) | Filesystem | Deletes the specified container acting as hadoop filesystem. | +| [Set Container Metadata](#set-container-metadata) | Filesystem | Sets the metadata of the specified container acting as hadoop filesystem. | +| [Get Container Properties](#get-container-properties) | Filesystem | Gets the metadata of the specified container acting as hadoop filesystem. | +| [List Blobs](#list-blobs) | Filesystem | Lists the paths under the specified directory inside container acting as hadoop filesystem. | +| [Put Blob](#put-blob) | Path | Creates a new path or updates an existing path under the specified filesystem (container). | +| [Lease Blob](#lease-blob) | Path | Establishes and manages a lease on the specified path. | +| [Put Block](#put-block) | Path | Appends Data to an already created blob at specified path. | +| [Put Block List](#put-block-list) | Path | Flushes The Appended Data to the blob at specified path. | +| [Set Blob Metadata](#set-blob-metadata) | Path | Sets the user-defined attributes of the blob at specified path. | +| [Get Blob Properties](#get-blob-properties) | Path | Gets the user-defined attributes of the blob at specified path. | +| [Get Blob](#get-blob) | Path | Reads data from the blob at specified path. | +| [Delete Blob](#delete-blob) | Path | Deletes the blob at specified path. | +| [Get Block List](#get-block-list) | Path | Retrieves the list of blocks that have been uploaded as part of a block blob. | +| [Copy Blob](#copy-blob) | Path | Copies a blob to a destination within the storage account. | + +## Create Container +The Create Container operation creates a new container under the specified account. If the container with the same name +already exists, the operation fails. +Rest API Documentation: [Create Container](https://docs.microsoft.com/en-us/rest/api/storageservices/create-container) + +## Delete Container +The Delete Container operation marks the specified container for deletion. The container and any blobs contained within it. +Rest API Documentation: [Delete Container](https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container) + +## Set Container Metadata +The Set Container Metadata operation sets user-defined metadata for the specified container as one or more name-value pairs. +Rest API Documentation: [Set Container Metadata](https://docs.microsoft.com/en-us/rest/api/storageservices/set-container-metadata) + +## Get Container Properties +The Get Container Properties operation returns all user-defined metadata and system properties for the specified container. The returned data doesn't include the container's list of blobs. +Rest API Documentation: [Get Container Properties](https://docs.microsoft.com/en-us/rest/api/storageservices/get-container-properties) + +## List Blobs +The List Blobs operation returns a list of the blobs under the specified container. +Rest API Documentation: [List Blobs](https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs) + +## Put Blob +The Put Blob operation creates a new block blob, or updates the content of an existing block blob. +The Put Blob operation will overwrite all contents of an existing blob with the same name. +When you update an existing block blob, you overwrite any existing metadata on the blob. +The content of the existing blob is overwritten with the content of the new blob. +Partial updates are not supported with Put Blob +Rest API Documentation: [Put Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob) + +## Lease Blob +The Lease Blob operation creates and manages a lock on a blob for write and delete operations. The lock duration can be 15 to 60 seconds, or can be infinite. +Rest API Documentation: [Lease Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/lease-blob) + +## Put Block +The Put Block operation creates a new block to be committed as part of a blob. +Rest API Documentation: [Put Block](https://docs.microsoft.com/en-us/rest/api/storageservices/put-block) + +## Put Block List +The Put Block List operation writes a blob by specifying the list of block IDs that make up the blob. To be written as part of a blob, a block must have been successfully written to the server in an earlier Put Block operation. You can call Put Block List to update a blob by uploading only those blocks that have changed and then committing the new and existing blocks together. +Rest API Documentation: [Put Block List](https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list) + +## Set Blob Metadata +The Set Blob Metadata operation sets user-defined metadata for the specified blob as one or more name-value pairs. +Rest API Documentation: [Set Blob Metadata](https://docs.microsoft.com/en-us/rest/api/storageservices/set-blob-metadata) + +## Get Blob Properties +The Get Blob Properties operation returns all user-defined metadata, standard HTTP properties, and system properties for the blob. +Rest API Documentation: [Get Blob Properties](https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties) + +## Get Blob +The Get Blob operation reads or downloads a blob from the system, including its metadata and properties. +Rest API Documentation: [Get Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob) + +## Delete Blob +The Delete Blob operation marks the specified blob for deletion. The blob is later deleted during garbage collection. +Rest API Documentation: [Delete Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob) + +## Get Block List +The Get Block List operation retrieves the list of blocks that have been uploaded as part of a block blob. +Rest API Documentation: [Get Block List](https://docs.microsoft.com/en-us/rest/api/storageservices/get-block-list) + +## Copy Blob +The Copy Blob operation copies a blob to a destination within the storage account. +Rest API Documentation: [Copy Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/copy-blob) \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md b/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md index f93593cecfb5b..bf0835ccbe3ae 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md @@ -27,7 +27,7 @@ Refer to [WASB Deprication](./wasb.html) for more details. ## Azure Service Endpoints Used by ABFS Driver Azure Services offers two set of endpoints for interacting with storage accounts: -1. [Azure Blob Storage](https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api) referred as Blob Endpoint +1. [Azure Blob Storage](./blobEndpoint.md) referred as Blob Endpoint 2. [Azure Data Lake Storage](https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/operation-groups) referred as DFS Endpoint The ABFS Driver by default is designed to work with DFS Endpoint only which primarily @@ -70,9 +70,9 @@ to blob for HNS Enabled Accounts, FS init will fail with InvalidConfiguration er ``` 4. Service Type for Ingress Operations: This will allow an override to choose service -type only for Ingress Related Operations like [Create](https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob?tabs=microsoft-entra-id), -[Append](https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id) -and [Flush](https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id). All other operations will still use the +type only for Ingress Related Operations like [Create](./blobEndpoint.html#put-blob), +[Append](./blobEndpoint.html#put-block), +and [Flush](./blobEndpoint.html#put-block-list). All other operations will still use the configured service type. ```xml diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index 69d4f79f8b099..b15f4c997be0e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -113,7 +113,7 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception { .getAclStatus(Mockito.anyString(), any(TracingContext.class)); } - // Todo: [FnsOverBlob] Remove this test case once Blob Endpoint Support is ready and enabled. + // TODO: [FnsOverBlob][HADOOP-19179] Remove this test case once Blob Endpoint Support is enabled. @Test public void testFileSystemInitFailsWithBlobEndpoitUrl() throws Exception { Configuration configuration = getRawConfiguration(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 81897a568763e..3eae1401998b6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -160,7 +160,7 @@ private String getUserAgentString(AbfsConfiguration config, boolean includeSSLProvider) throws IOException, URISyntaxException { AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build(); - // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready. + // TODO: [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready. AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"), null, config, (AccessTokenProvider) null, null, abfsClientContext); String sslProviderName = null; @@ -364,7 +364,7 @@ public static AbfsClient createTestClientFromCurrentContext( .build(); // Create test AbfsClient - // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready. + // TODO : [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready. AbfsClient testClient = new AbfsDfsClient( baseAbfsClientInstance.getBaseUrl(), (currentAuthType == AuthType.SharedKey @@ -393,7 +393,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, (currentAuthType == AuthType.SharedKey) || (currentAuthType == AuthType.OAuth)); - // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready. + // TODO : [FnsOverBlob][HADOOP-19234] Update to work with Blob Endpoint as well when Fns Over Blob is ready. AbfsClient client = mock(AbfsDfsClient.class); AbfsPerfTracker tracker = new AbfsPerfTracker( "test", diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClientHandler.java new file mode 100644 index 0000000000000..169398e6e99f8 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClientHandler.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; + +/** + * Test AbfsClientHandler initialization. + */ +public class ITestAbfsClientHandler extends AbstractAbfsIntegrationTest { + + public ITestAbfsClientHandler() throws Exception{ + + } + + /** + * Test to verify Client Handler holds both type of clients, and they can be accessed as needed. + * @throws Exception if test fails + */ + @Test + public void testAbfsClientHandlerInitialization() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + AbfsClientHandler clientHandler = fs.getAbfsStore().getClientHandler(); + Assertions.assertThat(clientHandler.getClient()).isInstanceOf(AbfsDfsClient.class); + Assertions.assertThat(clientHandler.getClient(AbfsServiceType.DFS)).isInstanceOf(AbfsDfsClient.class); + Assertions.assertThat(clientHandler.getClient(AbfsServiceType.BLOB)).isInstanceOf(AbfsBlobClient.class); + } +}