-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AbfsoutputStream refactor #118
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments.
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Outdated
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Outdated
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Outdated
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Show resolved
Hide resolved
...ls/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
Outdated
Show resolved
Hide resolved
...op-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
Show resolved
Hide resolved
...p-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobBlock.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlock.java
Outdated
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlockStatus.java
Outdated
Show resolved
Hide resolved
…adoop into azureBlobClientOriginal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking the suggestions. +1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM...
Thanks
...ls/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great code! minor comments. One suggestion related to the outputStream verification in #prepareListToCommit.
APR_10_2021("2021-04-10"), | ||
AUG_06_2021("2021-08-06"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aug06 looks unused, lets remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -109,6 +109,7 @@ public abstract class AbfsClient implements Closeable { | |||
|
|||
private final URL baseUrl; | |||
private final SharedKeyCredentials sharedKeyCredentials; | |||
// TODO: Abstract for blob and dfs for CPK in OSS PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take care of it while raising OSS PR. Currently we are planning to do scale tests with the latest version, so added a TODO as a reminder
.contains(BLOB_OPERATION_NOT_SUPPORTED); | ||
return ex.getStatusCode() == HTTP_CONFLICT && (ex.getErrorCode() | ||
.getErrorCode().equals(AzureServiceErrorCode.BLOB_OPERATION_NOT_SUPPORTED.getErrorCode()) || | ||
ex.getErrorMessage().contains(INVALID_APPEND_OPERATION)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we take dependency on error code and not on the message (message might change in future, or if that is not the case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have written a mail to XFE folks to confirm that can we take dependency on this error code because InvalidAppend is a very generic one. We want to confirm if there are any other scenarios where they return this error code. Once they confirm will update the code as needed
if (blockManager instanceof AzureBlobBlockManager) { | ||
this.blobBlockManager = (AzureBlobBlockManager) blockManager; | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this is redundant check. reason being, this class can be used only if the outputStream is a newly created class. From the code of switchHadler, looks like this class cant come in switching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
if (blockManager instanceof AzureDFSBlockManager) { | ||
this.dfsBlockManager = (AzureDFSBlockManager) blockManager; | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also looks a redundant check, as for the two cases when this class would inited:
- outputStream init: blockManager would be null.
- in switch, the blockManager sent would be blobBlockManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
if (switchCompleted && ingressHandler != null) { | ||
return ingressHandler; // Return the existing ingress handler | ||
} | ||
this.client = clientHandler.getClient(serviceType); | ||
if (serviceType == AbfsServiceType.BLOB) { | ||
if (isDFSToBlobFallbackEnabled && serviceTypeAtInit == AbfsServiceType.BLOB) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a constraint that there can not be switch from dfsHandler to blobHandler if the config is not given. As in if the dfs appends error for inter-op, and the config is not given. Job should throw error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens already because we don't have switchHandler handling in case of DFS appends as they don't throw the InvalidIngressServiceException, and we switch only if that exception is caught. So DFSIngressHandler to BlobIngress is not posssible via code
} catch (InvalidIngressServiceException ex) { | ||
// If an invalid ingress service is encountered, switch handler and retry | ||
switchHandler(); | ||
op = remoteFlush(offset, retainUncommittedData, isClose, leaseId, | ||
tracingContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont think that switch can happen at this this state. Or are you referring that the outputStream is closed without writing anything? If yes, then probably we can completely leave flushing if no write has happened? What you say?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So DFS today calls flush even if there are no appends, in case of Blob handling we have taken care of this by not doing any PutBlockList calls if map is empty. Since we are planning not to change the DFS behaviour, this handling is needed.
|
||
int mapEntry = 0; | ||
// If any of the entry in the map doesn't have the status of SUCCESS, fail the flush. | ||
for (Map.Entry<String, AbfsBlockStatus> entry : getBlockStatusMap().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that this block is to verify if the AbfsOutputStream has correctly added blockIds in correct sequence. Is there any more reason?
If its to verify, I would be more inclined towards removing this block -> this would save computation at flush + computation to set blockStatusMap. And as far as the verification is concerned, I would say our integ tests should give that confidence (plus if this is breaking, this will break at large scale).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we remove this block, I would suggest further to remove blockStatusMap totally. orderedBlockList
is the important data-structure for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate the suggestion here but would like to keep this code block as it is. Computation cost is not much here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that this block is to verify if the AbfsOutputStream has correctly added blockIds in correct sequence. Is there any more reason? -> this is not just for sequence, it validates that all appends are successful before flushing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code would be firing at flushWrittenBytesToServiceInternal
which has two callers:
- flushWrittenBytesToServiceAsync
- flushWrittenBytesToService
In both the cases, flushWrittenBytesToServiceInternal can be only called when the parallel threads are completely done sucessfully.
So, technically this flush api on server would be getting called only once the writing is successful on all the threads. As per code, the data-consistency for write / flush is maintained.
Description of PR :
This Pr is in correlation to the series of work done under Parent Jira: [HADOOP-19179]
(https://issues.apache.org/jira/browse/HADOOP-19179)
Jira for this Patch: https://issues.apache.org/jira/browse/HADOOP-19187
Scope of this task is to refactor the AbfsOutputStream class to handle the ingress for DFS and Blob endpoint effectively.
Production code changes :
The
AbfsOutputStream
class is crucial for handling the data being written to Azure Storage. Its primary responsibilities include:New Additions
The new additions introduce a more modular and flexible approach to managing data ingress (data being written to storage), catering to both Azure Data Lake Storage (ADLS) and Azure Blob Storage.
AzureIngressHandler
The
AzureIngressHandler
is a new parent class designed to encapsulate common logic for data ingress operations. It simplifies the process of writing data to Azure Storage by providing a unified interface. This class has two specialized child classes:AzureDfsIngressHandler:
AzureBlobIngressHandler:
blockId
.AbfsBlock and AbfsBlobBlock
Data is managed in discrete blocks to improve efficiency and manageability.
AbfsBlock:
AbfsBlobBlock:
AbfsBlock
tailored for Blob Storage.blockId
for each block, which is necessary for the Blob Storage API.Block Managers
To manage these data blocks, new manager classes have been introduced. These classes handle the lifecycle of blocks, including creation, appending, and flushing.
AzureBlockManager:
AzureDFSBlockManager:
AbfsBlock
instances for DFS.AzureBlobBlockManager:
AbfsBlobBlock
instances for Blob Storage.blockId
.Integration with AbfsOutputStream
The
AbfsOutputStream
class has been updated to incorporate the new ingress flow logic, enhancing its ability to handle data writes to both DFS and Blob Storage. Here’s how it integrates:Configuration Selection:
AbfsOutputStream
reads the configuration parameterfs.azure.ingress.service.type
to determine whether the user has configured the system to useBLOB
orDFS
for data ingress.Handler Initialization:
AbfsOutputStream
initializes the appropriate handler (AzureBlobIngressHandler
orAzureDfsIngressHandler
).Buffering Data:
AbfsOutputStream
, it is buffered into blocks (AbfsBlock
for DFS orAbfsBlobBlock
for Blob Storage).Managing Blocks:
AzureDFSBlockManager
orAzureBlobBlockManager
) manages the lifecycle of these blocks, ensuring that data is correctly created, appended, and flushed.Block Id Management (Blob Specific):
AzureBlobBlockManager
ensures that each block has a uniqueblockId
, adhering to the requirements of the Blob Storage API.Detailed Flow
Creating Data Blocks:
AbfsOutputStream
, it is divided into blocks (AbfsBlock
for DFS orAbfsBlobBlock
for Blob Storage).Appending Data:
AzureBlobIngressHandler
orAzureDfsIngressHandler
).Flushing Data:
Lifecycle Management:
AzureDFSBlockManager
andAzureBlobBlockManager
) oversee the lifecycle of blocks, handling retries, errors, and ensuring data integrity.Test Code Changes: