-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19232: [ABFS][FNSOverBlob] Implementing Ingress Support with various Fallback Handling #7272
base: trunk
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some thoughts around production code.
Will do another round of review for test code,.
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
Outdated
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Outdated
Show resolved
Hide resolved
...op-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
Show resolved
Hide resolved
...ls/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
Show resolved
Hide resolved
.../java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidIngressServiceException.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlock.java
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlockStatus.java
Show resolved
Hide resolved
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
Outdated
Show resolved
Hide resolved
|
||
/** | ||
* Creates or retrieves an existing Azure ingress handler based on the service type and provided parameters. | ||
* <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like xml tag is not closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which tag is not closed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<p>
tag in comments
...ools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
Show resolved
Hide resolved
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some Suggestions for test code improvement.
@@ -61,4 +61,6 @@ | |||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/> | |||
<suppress checks="ParameterNumber" | |||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/> | |||
<suppress checks="MagicNumber" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this supression?
If we can avoid by defining static constants for all the magic numbers used in class that would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
* @param errorMessage the error message | ||
* @param innerException the inner exception | ||
*/ | ||
public InvalidIngressServiceException(final int statusCode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also try to include erver request Id for the failed request here??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is already included in the exception message
} else { | ||
return ingressClient.append(path, "val".getBytes(), | ||
new AppendRequestParameters(3, 0, 3, APPEND_MODE, false, null, | ||
true, new BlobAppendRequestParameters("MF8tNDE1MjkzOTE4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", null)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be define a constant variable for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
* If service type is blob, then path should have blob domain name. | ||
* @param path to be asserted. | ||
*/ | ||
protected void assertPathDns(Path path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used anywhere in this patch, can be removed and added later if required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -244,6 +255,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs, | |||
Path testPath, EncryptionContextProvider ecp) | |||
throws Exception { | |||
AbfsClient client = fs.getAbfsClient(); | |||
AbfsClient ingressClient = fs.getAbfsStore().getClientHandler().getIngressClient(); | |||
AbfsClientUtils.setEncryptionContextProvider(client, ecp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be set on ingress client as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
AbfsClient abfsClient = fs.getAbfsStore() | ||
.getClientHandler() | ||
.getIngressClient(); | ||
Assume.assumeTrue("Skipping for DFS client", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use Assume.assumeTrue(getAbfsServiceType==BLOB)
, we won't need client here then. Other places alos I see this can be simplified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs the check for Ingress service type
fs.create(filePath); | ||
FSDataOutputStream outputStream = fs.append(filePath); | ||
outputStream.write(10); | ||
final AzureBlobFileSystem fs1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use try() to autoclose here and every where else applicable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
* Verify that parallel write with same offset from different output streams will not throw exception. | ||
**/ | ||
@Test | ||
public void testParallelWriteSameOffsetDifferentOutputStreams() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this and next 2 tests, there is a lot of code redundancy, not sure if possible but would be good to refactor common code into a parameterised method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the common code into a common method
out::hsync | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EOF Error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
|
||
// Create a spy of AzureBlobFileSystemStore | ||
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); | ||
Assume.assumeTrue(store.getClient() instanceof AbfsBlobClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use base class method assumeBlobServiceType()
here and other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pending Test Code Review
public AzcopyExecutionException(String message, String azcopyPath, Throwable cause) { | ||
super(message + SUGGESTION + azcopyPath, cause); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EOF Error here and other classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
|
||
import java.io.IOException; | ||
|
||
public class AzcopyExecutionException extends IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc for this class, specifying only used in test code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -142,6 +143,8 @@ private void setTestUserConf(Configuration conf, String key, String value) { | |||
*/ | |||
@Test | |||
public void testRecursiveDeleteWithPagination() throws Exception { | |||
Assume.assumeTrue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use base class method to check service type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
}); | ||
if (spiedClient instanceof AbfsDfsClient) { | ||
intercept(FileNotFoundException.class, os::close); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the exact difference in behavior between DFS and Blob here?
Can we assert on status code as well??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DFS throws FileNotFoundException while Blob throws IOException
AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); | ||
Assume.assumeTrue(!getIsNamespaceEnabled(fs)); | ||
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); | ||
Assume.assumeTrue(store.getClient() instanceof AbfsBlobClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assumeBlobEndpoint()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
Mockito.doThrow(exception).when(blobClient).getBlockList(Mockito.anyString(), Mockito.any(TracingContext.class)); | ||
|
||
// Create a non-empty file | ||
os.write(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TEN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -61,6 +65,12 @@ public ITestAbfsHttpClientRequestExecutor() throws Exception { | |||
public void testExpect100ContinueHandling() throws Exception { | |||
AzureBlobFileSystem fs = getFileSystem(); | |||
Path path = new Path("/testExpect100ContinueHandling"); | |||
if (isAppendBlobEnabled()) { | |||
Assume.assumeFalse("Not valid for AppendBlob with blob endpoint", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can check using getAbfsServiceType()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since service type and ingress type can be different this might not work, we can add a getIngressServiceType() method in base class ?
@@ -1042,6 +1043,7 @@ public void testSetPermissionOnlyDefault() throws Exception { | |||
public void testDefaultAclNewFile() throws Exception { | |||
final AzureBlobFileSystem fs = this.getFileSystem(); | |||
assumeTrue(getIsNamespaceEnabled(fs)); | |||
Assume.assumeTrue(fs.getAbfsStore().getClientHandler().getIngressClient() instanceof AbfsDfsClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here as well, we can assert on getAbfsServiceType()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
@@ -51,6 +51,8 @@ public final class AbfsHttpConstants { | |||
public static final String DEFAULT_TIMEOUT = "90"; | |||
public static final String APPEND_BLOB_TYPE = "appendblob"; | |||
public static final String LIST = "list"; | |||
public static final String BLOCK_BLOB_TYPE = "BlockBlob"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockBlob should be in camel case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how it is defined in the backend
@@ -51,6 +51,8 @@ public final class AbfsHttpConstants { | |||
public static final String DEFAULT_TIMEOUT = "90"; | |||
public static final String APPEND_BLOB_TYPE = "appendblob"; | |||
public static final String LIST = "list"; | |||
public static final String BLOCK_BLOB_TYPE = "BlockBlob"; | |||
public static final String APPEND_BLOCK = "appendblock"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above: appendBlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
@@ -1140,7 +1387,7 @@ public boolean checkIsDir(AbfsHttpOperation result) { | |||
public boolean checkUserError(int responseStatusCode) { | |||
return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST | |||
&& responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR | |||
&& responseStatusCode != HttpURLConnection.HTTP_CONFLICT); | |||
&& responseStatusCode != HTTP_CONFLICT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do same changes for above two codes (HTTP_INTERNAL_ERROR, HTTP_BAD_REQUEST) as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted this change
this.outputStream = outputStream; | ||
this.offset = offset; | ||
DataBlocks.BlockFactory blockFactory = outputStream.getBlockManager().getBlockFactory(); | ||
long blockCount = outputStream.getBlockManager().getBlockCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are using these variables (blockCount, blockSize) only at one place, it would be better to call it inplace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to store them as variables for better readability
+ "exists and its resource type is invalid for this operation."; | ||
public static final String BLOB_OPERATION_NOT_SUPPORTED = "Blob operation is not supported."; | ||
public static final String INVALID_APPEND_OPERATION = "The resource was created or modified by the Azure Blob Service API " | ||
+ "and cannot be appended to by the Azure Data Lake Storage Service API"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo error: and cannot be appended by the Azure Data Lake Storage Service API
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how the error message comes from the backend
boolean hasActiveBlockDataToUpload() { | ||
AzureBlockManager blockManager = getBlockManager(); | ||
AbfsBlock activeBlock = blockManager.getActiveBlock(); | ||
return blockManager.hasActiveBlock() && activeBlock.hasData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can blockManager and activeBlock be null in any of the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasActiveBlock checks for null for activeBlock and blockManager can not be null
if (hasActiveBlock()) { | ||
clearActiveBlock(); | ||
} | ||
getBlockManager().clearActiveBlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not have check on hasActiveBlock before clearActiveBlock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@VisibleForTesting | ||
AbfsClient getClient() { | ||
synchronized AbfsClient getClient() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the need of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was reported as a spot bug hence I added it but dont see the need for it hence reverting
AbfsBlobClient blobClient = abfsOutputStream.getClientHandler().getBlobClient(); | ||
final AbfsRestOperation op = blobClient | ||
.getBlockList(abfsOutputStream.getPath(), tracingContext); | ||
committedBlockIdList = op.getResult().getBlockIdList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A check on op
and op.getResult
before calling getBlockIdList is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
BlobAppendRequestParameters blobParams = new BlobAppendRequestParameters(blockToUpload.getBlockId(), getETag()); | ||
reqParams.setBlobParams(blobParams); | ||
AbfsRestOperation op; | ||
long threadId = Thread.currentThread().getId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get Thread.currentThread().getId() inplace as we are only using it once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
TracingContext tracingContext) throws IOException { | ||
TracingContext tracingContextAppend = new TracingContext(tracingContext); | ||
long threadId = Thread.currentThread().getId(); | ||
String threadIdStr = String.valueOf(threadId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, if threadIf is getting used at only one place, can we keep it inplace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
IOUtils.closeStreams(uploadData, activeBlock); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove 1 extra empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
AbfsRestOperation op; | ||
TracingContext tracingContextAppend = new TracingContext(tracingContext); | ||
long threadId = Thread.currentThread().getId(); | ||
String threadIdStr = String.valueOf(threadId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
throws IOException { | ||
TracingContext tracingContextFlush = new TracingContext(tracingContext); | ||
if (tracingContextFlush.getIngressHandler().equals(EMPTY_STRING)) { | ||
tracingContextFlush.setIngressHandler("DFlush"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create constant for DFlush and use it whereever required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
// Perform the upload within a performance tracking context. | ||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo( | ||
dfsClient.getAbfsPerfTracker(), | ||
"writeCurrentBufferToService", "append")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create constant for append?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already present, added it here
} | ||
try { | ||
TracingContext tracingContextFlush = new TracingContext(tracingContext); | ||
tracingContextFlush.setIngressHandler("FBFlush"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, constant for FBFlush
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
// Perform the upload within a performance tracking context. | ||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo( | ||
getClient().getAbfsPerfTracker(), | ||
"writeCurrentBufferToService", "append")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, constant for append.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
Description of PR :
This Pr is in correlation to the series of work done under Parent Jira: [HADOOP-19179]
(https://issues.apache.org/jira/browse/HADOOP-19179)
Jira for this Patch: https://issues.apache.org/jira/browse/HADOOP-19232
Scope of this task is to refactor the AbfsOutputStream class to handle the ingress for DFS and Blob endpoint effectively.
Production code changes :
The
AbfsOutputStream
class is crucial for handling the data being written to Azure Storage. Its primary responsibilities include:New Additions
The new additions introduce a more modular and flexible approach to managing data ingress (data being written to storage), catering to both Azure Data Lake Storage (ADLS) and Azure Blob Storage.
AzureIngressHandler
The
AzureIngressHandler
is a new parent class designed to encapsulate common logic for data ingress operations. It simplifies the process of writing data to Azure Storage by providing a unified interface. This class has two specialized child classes:AzureDfsIngressHandler:
AzureBlobIngressHandler:
blockId
.AbfsBlock and AbfsBlobBlock
Data is managed in discrete blocks to improve efficiency and manageability.
AbfsBlock:
AbfsBlobBlock:
AbfsBlock
tailored for Blob Storage.blockId
for each block, which is necessary for the Blob Storage API.Block Managers
To manage these data blocks, new manager classes have been introduced. These classes handle the lifecycle of blocks, including creation, appending, and flushing.
AzureBlockManager:
AzureDFSBlockManager:
AbfsBlock
instances for DFS.AzureBlobBlockManager:
AbfsBlobBlock
instances for Blob Storage.blockId
.Integration with AbfsOutputStream
The
AbfsOutputStream
class has been updated to incorporate the new ingress flow logic, enhancing its ability to handle data writes to both DFS and Blob Storage. Here’s how it integrates:Configuration Selection:
AbfsOutputStream
reads the configuration parameterfs.azure.ingress.service.type
to determine whether the user has configured the system to useBLOB
orDFS
for data ingress.Handler Initialization:
AbfsOutputStream
initializes the appropriate handler (AzureBlobIngressHandler
orAzureDfsIngressHandler
).Buffering Data:
AbfsOutputStream
, it is buffered into blocks (AbfsBlock
for DFS orAbfsBlobBlock
for Blob Storage).Managing Blocks:
AzureDFSBlockManager
orAzureBlobBlockManager
) manages the lifecycle of these blocks, ensuring that data is correctly created, appended, and flushed.Block Id Management (Blob Specific):
AzureBlobBlockManager
ensures that each block has a uniqueblockId
, adhering to the requirements of the Blob Storage API.Detailed Flow
Creating Data Blocks:
AbfsOutputStream
, it is divided into blocks (AbfsBlock
for DFS orAbfsBlobBlock
for Blob Storage).Appending Data:
AzureBlobIngressHandler
orAzureDfsIngressHandler
).Flushing Data:
Lifecycle Management:
AzureDFSBlockManager
andAzureBlobBlockManager
) oversee the lifecycle of blocks, handling retries, errors, and ensuring data integrity.Test Code Changes: