Skip to content

Commit

Permalink
javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
saxenapranav committed Jun 5, 2024
1 parent 33a5d56 commit 45815ab
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public class AbfsConfiguration{
private int maxApacheHttpClientIoExceptionsRetries;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE,
DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_CONNECTIONS)
DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS)
private int maxApacheHttpClientCacheConnections;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public final class FileSystemConfigurations {

public static final long DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME = 5_000L;

public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_CONNECTIONS = 5;
public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS = 5;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,7 +60,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
import static org.apache.http.entity.ContentType.TEXT_PLAIN;

/**
Expand All @@ -84,8 +82,8 @@ public class AbfsAHCHttpOperation extends AbfsHttpOperation {
private HttpResponse httpResponse;

/**
* Flag to indicate if the request is a payload request. API methods PUT, POST,
* PATCH are payload requests.
* Flag to indicate if the request is a payload request. HTTP methods PUT, POST,
* PATCH qualify for payload requests.
*/
private final boolean isPayloadRequest;

Expand All @@ -101,28 +99,35 @@ public AbfsAHCHttpOperation(final URL url,
final Duration readTimeout,
final AbfsApacheHttpClient abfsApacheHttpClient) throws IOException {
super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
this.isPayloadRequest = isPayloadRequest(method);
this.isPayloadRequest = HTTP_METHOD_PUT.equals(method)
|| HTTP_METHOD_PATCH.equals(method)
|| HTTP_METHOD_POST.equals(method);
this.abfsApacheHttpClient = abfsApacheHttpClient;


final URI requestUri;
try {
requestUri = url.toURI();
} catch (URISyntaxException e) {
throw new IOException(e);
}
switch (getMethod()) {
case HTTP_METHOD_PUT:
httpRequestBase = new HttpPut(getUri());
httpRequestBase = new HttpPut(requestUri);
break;
case HTTP_METHOD_PATCH:
httpRequestBase = new HttpPatch(getUri());
httpRequestBase = new HttpPatch(requestUri);
break;
case HTTP_METHOD_POST:
httpRequestBase = new HttpPost(getUri());
httpRequestBase = new HttpPost(requestUri);
break;
case HTTP_METHOD_GET:
httpRequestBase = new HttpGet(getUri());
httpRequestBase = new HttpGet(requestUri);
break;
case HTTP_METHOD_DELETE:
httpRequestBase = new HttpDelete(getUri());
httpRequestBase = new HttpDelete(requestUri);
break;
case HTTP_METHOD_HEAD:
httpRequestBase = new HttpHead(getUri());
httpRequestBase = new HttpHead(requestUri);
break;
default:
/*
Expand All @@ -134,16 +139,16 @@ public AbfsAHCHttpOperation(final URL url,
}
}

/**
* @return AbfsManagedHttpClientContext instance that captures latencies at
* different phases of network call.
*/
@VisibleForTesting
AbfsManagedHttpClientContext setFinalAbfsClientContext() {
AbfsManagedHttpClientContext getHttpClientContext() {
return new AbfsManagedHttpClientContext();
}

private boolean isPayloadRequest(final String method) {
return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
|| HTTP_METHOD_POST.equals(method);
}

/**{@inheritDoc}*/
@Override
protected InputStream getErrorStream() throws IOException {
HttpEntity entity = httpResponse.getEntity();
Expand All @@ -153,6 +158,7 @@ protected InputStream getErrorStream() throws IOException {
return entity.getContent();
}

/**{@inheritDoc}*/
@Override
String getConnProperty(final String key) {
for (AbfsHttpHeader header : getRequestHeaders()) {
Expand All @@ -163,26 +169,31 @@ String getConnProperty(final String key) {
return null;
}

/**{@inheritDoc}*/
@Override
URL getConnUrl() {
return getUrl();
}

/**{@inheritDoc}*/
@Override
String getConnRequestMethod() {
return getMethod();
}

/**{@inheritDoc}*/
@Override
Integer getConnResponseCode() throws IOException {
return getStatusCode();
}

/**{@inheritDoc}*/
@Override
String getConnResponseMessage() throws IOException {
return getStatusDescription();
}

/**{@inheritDoc}*/
@Override
public void processResponse(final byte[] buffer,
final int offset,
Expand All @@ -206,6 +217,15 @@ public void processResponse(final byte[] buffer,
}
}

/**
* Parse response stream for headers and body.
*
* @param buffer byte array to store response body.
* @param offset offset in the buffer to start storing the response body.
* @param length length of the response body.
*
* @throws IOException network error while read response stream
*/
@VisibleForTesting
void parseResponseHeaderAndBody(final byte[] buffer,
final int offset,
Expand All @@ -229,15 +249,27 @@ void parseResponseHeaderAndBody(final byte[] buffer,
parseResponse(buffer, offset, length);
}

/**
* Parse status code from response
*
* @param httpResponse response object
* @return status code
*/
@VisibleForTesting
int parseStatusCode(HttpResponse httpResponse) {
return httpResponse.getStatusLine().getStatusCode();
}

/**
* Execute network call for the request
*
* @return response object
* @throws IOException network error while executing the request
*/
@VisibleForTesting
HttpResponse executeRequest() throws IOException {
AbfsManagedHttpClientContext abfsHttpClientContext
= setFinalAbfsClientContext();
= getHttpClientContext();
try {
LOG.debug("Executing request: {}", httpRequestBase);
HttpResponse response = abfsApacheHttpClient.execute(httpRequestBase,
Expand All @@ -252,23 +284,16 @@ HttpResponse executeRequest() throws IOException {
}
}

private Map<String, List<String>> getResponseHeaders(final HttpResponse httpResponse) {
if (httpResponse == null || httpResponse.getAllHeaders() == null) {
return new HashMap<>();
}
Map<String, List<String>> map = new HashMap<>();
for (Header header : httpResponse.getAllHeaders()) {
map.put(header.getName(), new ArrayList<String>(
Collections.singleton(header.getValue())));
}
return map;
}

/**{@inheritDoc}*/
@Override
public void setRequestProperty(final String key, final String value) {
setHeader(key, value);
List<AbfsHttpHeader> headers = getRequestHeaders();
if(headers != null) {
headers.add(new AbfsHttpHeader(key, value));
}
}

/**{@inheritDoc}*/
@Override
Map<String, List<String>> getRequestProperties() {
Map<String, List<String>> map = new HashMap<>();
Expand All @@ -281,6 +306,7 @@ Map<String, List<String>> getRequestProperties() {
return map;
}

/**{@inheritDoc}*/
@Override
public String getResponseHeader(final String headerName) {
if (httpResponse == null) {
Expand All @@ -293,6 +319,7 @@ public String getResponseHeader(final String headerName) {
return null;
}

/**{@inheritDoc}*/
@Override
InputStream getContentInputStream()
throws IOException {
Expand All @@ -302,6 +329,8 @@ InputStream getContentInputStream()
return httpResponse.getEntity().getContent();
}

/**{@inheritDoc}*/
@Override
public void sendPayload(final byte[] buffer,
final int offset,
final int length)
Expand All @@ -318,7 +347,7 @@ public void sendPayload(final byte[] buffer,
httpEntity);
}

translateHeaders(httpRequestBase, getRequestHeaders());
prepareRequest();
try {
httpResponse = executeRequest();
} catch (AbfsApacheHttpExpect100Exception ex) {
Expand All @@ -337,29 +366,16 @@ public void sendPayload(final byte[] buffer,
}
}

/**
* Sets the header on the request.
*/
private void prepareRequest() {
translateHeaders(httpRequestBase, getRequestHeaders());
}

private URI getUri() throws IOException {
try {
return getUrl().toURI();
} catch (URISyntaxException e) {
throw new IOException(e);
}
}

private void translateHeaders(final HttpRequestBase httpRequestBase,
final List<AbfsHttpHeader> requestHeaders) {
for (AbfsHttpHeader header : requestHeaders) {
for (AbfsHttpHeader header : getRequestHeaders()) {
httpRequestBase.setHeader(header.getName(), header.getValue());
}
}

private void setHeader(String name, String val) {
addHeaderToRequestHeaderList(new AbfsHttpHeader(name, val));
}

/**{@inheritDoc}*/
@Override
public String getRequestProperty(String name) {
for (AbfsHttpHeader header : getRequestHeaders()) {
Expand All @@ -370,6 +386,7 @@ public String getRequestProperty(String name) {
return EMPTY_STRING;
}

/**{@inheritDoc}*/
@Override
public String getTracingContextSuffix() {
return APACHE_IMPL;
Expand Down
Loading

0 comments on commit 45815ab

Please sign in to comment.