Skip to content

Commit

Permalink
Merge branch 'azureBlobClient' of https://github.com/ABFSDriver/AbfsH…
Browse files Browse the repository at this point in the history
…adoop into azureBlobClientOutputStreamChanges
  • Loading branch information
anmolanmol1234 committed Jun 7, 2024
2 parents 3d4584b + 3fee064 commit 337830c
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ public void close() throws IOException {
}

byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
return value.getBytes(XMS_PROPERTIES_ENCODING);
return client.encodeAttribute(value);
}

String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
return new String(value, XMS_PROPERTIES_ENCODING);
return client.decodeAttribute(value);
}

private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
Expand Down Expand Up @@ -455,9 +455,7 @@ public Hashtable<String, String> getFilesystemProperties(
.getFilesystemProperties(tracingContext);
perfInfo.registerResult(op.getResult());

final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);

parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
parsedXmsProperties = client.getXMSProperties(op.getResult());
perfInfo.registerSuccess(true);

return parsedXmsProperties;
Expand All @@ -478,15 +476,8 @@ public void setFilesystemProperties(

try (AbfsPerfInfo perfInfo = startTracking("setFilesystemProperties",
"setFilesystemProperties")) {
final String commaSeparatedProperties;
try {
commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
} catch (CharacterCodingException ex) {
throw new InvalidAbfsRestOperationException(ex);
}

final AbfsRestOperation op = client
.setFilesystemProperties(commaSeparatedProperties, tracingContext);
.setFilesystemProperties(properties, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
Expand All @@ -509,10 +500,7 @@ public Hashtable<String, String> getPathStatus(final Path path,
perfInfo.registerResult(op.getResult());
contextEncryptionAdapter.destroy();

final String xMsProperties = client.getXMSProperties(op.getResult());

parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);

parsedXmsProperties = client.getXMSProperties(op.getResult());
perfInfo.registerSuccess(true);

return parsedXmsProperties;
Expand Down Expand Up @@ -571,18 +559,12 @@ public void setPathProperties(final Path path,
path,
properties);

final String commaSeparatedProperties;
try {
commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
} catch (CharacterCodingException ex) {
throw new InvalidAbfsRestOperationException(ex);
}
final String relativePath = getRelativePath(path);
final ContextEncryptionAdapter contextEncryptionAdapter
= createEncryptionAdapterFromServerStoreContext(relativePath,
tracingContext);
final AbfsRestOperation op = client
.setPathProperties(getRelativePath(path), commaSeparatedProperties,
.setPathProperties(getRelativePath(path), properties,
tracingContext, contextEncryptionAdapter);
contextEncryptionAdapter.destroy();
perfInfo.registerResult(op.getResult()).registerSuccess(true);
Expand Down Expand Up @@ -1187,7 +1169,7 @@ public FileStatus getFileStatus(final Path path,
resourceIsDir = true;
} else {
contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
resourceIsDir = parseIsDirectory(client.checkIsDir(op.getResult()) ? DIRECTORY : FILE);
resourceIsDir = client.checkIsDir(result);
}

final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
Expand Down Expand Up @@ -1918,81 +1900,6 @@ private boolean parseIsDirectory(final String resourceType) {
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
}

/**
* Convert properties stored in a Map into a comma separated string. For map
* <key1:value1; key2:value2: keyN:valueN>, method would convert to:
* key1=value1,key2=value,...,keyN=valueN
* */
@VisibleForTesting
String convertXmsPropertiesToCommaSeparatedString(final Map<String,
String> properties) throws
CharacterCodingException {
StringBuilder commaSeparatedProperties = new StringBuilder();

final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder();

for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
String key = propertyEntry.getKey();
String value = propertyEntry.getValue();

Boolean canEncodeValue = encoder.canEncode(value);
if (!canEncodeValue) {
throw new CharacterCodingException();
}

String encodedPropertyValue = Base64.encode(encoder.encode(CharBuffer.wrap(value)).array());
commaSeparatedProperties.append(key)
.append(AbfsHttpConstants.EQUAL)
.append(encodedPropertyValue);

commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
}

if (commaSeparatedProperties.length() != 0) {
commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1);
}

return commaSeparatedProperties.toString();
}

private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws
InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
Hashtable<String, String> properties = new Hashtable<>();

final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder();

if (xMsProperties != null && !xMsProperties.isEmpty()) {
String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA);

if (userProperties.length == 0) {
return properties;
}

for (String property : userProperties) {
if (property.isEmpty()) {
throw new InvalidFileSystemPropertyException(xMsProperties);
}

String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2);
if (nameValue.length != 2) {
throw new InvalidFileSystemPropertyException(xMsProperties);
}

byte[] decodedValue = Base64.decode(nameValue[1]);

final String value;
try {
value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString();
} catch (CharacterCodingException ex) {
throw new InvalidAbfsRestOperationException(ex);
}
properties.put(nameValue[0], value);
}
}

return properties;
}

private boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
for (String dir : dirSet) {
if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public final class AbfsHttpConstants {
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
public static final String APPLICATION_XML = "application/xml";

public static final String XMS_PROPERTIES_ENCODING_DFS = "ISO-8859-1";
public static final String XMS_PROPERTIES_ENCODING_BLOB = "UTF-8";

public static final String ROOT_PATH = "/";
public static final String ACCESS_MASK = "mask:";
public static final String ACCESS_USER = "user:";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ public final class FileSystemUriSchemes {
public static final String ABFS_BLOB = "BLOB";

private FileSystemUriSchemes() {}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,4 @@ public void characters(final char[] ch, final int start, final int length)
throws SAXException {
bld.append(ch, start, length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -50,7 +52,7 @@
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.UnsupportedAbfsOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema;
Expand Down Expand Up @@ -153,7 +155,7 @@ public AbfsRestOperation createFilesystem(TracingContext tracingContext)
* @throws AzureBlobFileSystemException if rest operation fails.
*/
@Override
public AbfsRestOperation setFilesystemProperties(final String properties,
public AbfsRestOperation setFilesystemProperties(final Hashtable<String, String> properties,
TracingContext tracingContext) throws AzureBlobFileSystemException {
List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
List<AbfsHttpHeader> metadataRequestHeaders = getMetadataHeadersList(properties);
Expand Down Expand Up @@ -813,7 +815,7 @@ public AbfsRestOperation flush(byte[] buffer,
*/
@Override
public AbfsRestOperation setPathProperties(final String path,
final String properties,
final Hashtable<String, String> properties,
final TracingContext tracingContext,
final ContextEncryptionAdapter contextEncryptionAdapter)
throws AzureBlobFileSystemException {
Expand Down Expand Up @@ -919,15 +921,15 @@ public AbfsRestOperation setOwner(final String path,
final String owner,
final String group,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
throw new UnsupportedAbfsOperationException(
throw new UnsupportedOperationException(
"SetOwner operation is only supported on HNS enabled Accounts.");
}

@Override
public AbfsRestOperation setPermission(final String path,
final String permission,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
throw new UnsupportedAbfsOperationException(
throw new UnsupportedOperationException(
"SetPermission operation is only supported on HNS enabled Accounts.");
}

Expand All @@ -936,21 +938,21 @@ public AbfsRestOperation setAcl(final String path,
final String aclSpecString,
final String eTag,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
throw new UnsupportedAbfsOperationException(
throw new UnsupportedOperationException(
"SetAcl operation is only supported on HNS enabled Accounts.");
}

@Override
public AbfsRestOperation getAclStatus(final String path, final boolean useUPN,
TracingContext tracingContext) throws AzureBlobFileSystemException {
throw new UnsupportedAbfsOperationException(
throw new UnsupportedOperationException(
"GetAclStatus operation is only supported on HNS enabled Accounts.");
}

@Override
public AbfsRestOperation checkAccess(String path, String rwx, TracingContext tracingContext)
throws AzureBlobFileSystemException {
throw new UnsupportedAbfsOperationException(
throw new UnsupportedOperationException(
"CheckAccess operation is only supported on HNS enabled Accounts.");
}

Expand Down Expand Up @@ -1181,31 +1183,46 @@ public StorageErrorResponseSchema processStorageErrorResponse(final InputStream
}

@Override
public String getXMSProperties(final AbfsHttpOperation result) {
boolean firstProperty = true;
StringBuilder xmsProperties = new StringBuilder();
public Hashtable<String, String> getXMSProperties(AbfsHttpOperation result)
throws InvalidAbfsRestOperationException {
Hashtable<String, String> properties = new Hashtable<>();
Map<String, List<String>> responseHeaders = result.getResponseHeaders();
for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
if (entry.getKey()!= null && entry.getKey().startsWith(X_MS_METADATA_PREFIX)) {
if (!firstProperty) {
xmsProperties.append(",");
firstProperty = false;
String name = entry.getKey();
if (name != null && name.startsWith(X_MS_METADATA_PREFIX)) {
String value = null;
try {
value = decodeMetadataAttribute(entry.getValue().get(0));
} catch (UnsupportedEncodingException e) {
throw new InvalidAbfsRestOperationException(e);
}
xmsProperties.append(entry.getKey().substring(X_MS_METADATA_PREFIX.length()));
xmsProperties.append("=");
xmsProperties.append(entry.getValue().get(0));
properties.put(name.substring(X_MS_METADATA_PREFIX.length()), value);
}
}
return xmsProperties.toString();
return properties;
}

private List<AbfsHttpHeader> getMetadataHeadersList(final String properties) {
List<AbfsHttpHeader> metadataRequestHeaders = new ArrayList<AbfsHttpHeader>();
String[] propertiesArray = properties.split(",");
for (String property : propertiesArray) {
String key = property.substring(0, property.indexOf('='));
String value = property.substring(property.indexOf('=') + 1);
metadataRequestHeaders.add(new AbfsHttpHeader(X_MS_METADATA_PREFIX + key, value));
@Override
public byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
return value.getBytes(XMS_PROPERTIES_ENCODING_BLOB);
}

@Override
public String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
return new String(value, XMS_PROPERTIES_ENCODING_BLOB);
}

private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, String> properties) throws AbfsRestOperationException {
List<AbfsHttpHeader> metadataRequestHeaders = new ArrayList<>();
for(Map.Entry<String,String> entry : properties.entrySet()) {
String key = X_MS_METADATA_PREFIX + entry.getKey();
String value = null;
try {
value = encodeMetadataAttribute(entry.getValue());
} catch (UnsupportedEncodingException e) {
throw new InvalidAbfsRestOperationException(e);
}
metadataRequestHeaders.add(new AbfsHttpHeader(key, value));
}
return metadataRequestHeaders;
}
Expand Down Expand Up @@ -1261,4 +1278,16 @@ private BlobListResultSchema getListResultSchemaFromPathStatus(String relativePa
listResultSchema.paths().add(entrySchema);
return listResultSchema;
}

private static String encodeMetadataAttribute(String value)
throws UnsupportedEncodingException {
return value == null ? null
: URLEncoder.encode(value, StandardCharsets.UTF_8.name());
}

private static String decodeMetadataAttribute(String encoded)
throws UnsupportedEncodingException {
return encoded == null ? null :
java.net.URLDecoder.decode(encoded, StandardCharsets.UTF_8.name());
}
}
Loading

0 comments on commit 337830c

Please sign in to comment.