Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] [AbfsInputStream] Add position and reader type in tracing context #129

Draft
wants to merge 8 commits into
base: wasbDepCodeReview
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ public class AbfsConfiguration{
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
private int blobDeleteDirConsumptionParallelism;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_READ_CALLS_METRIC, DefaultValue = DEFAULT_ENABLE_READ_CALLS_METRIC)
private boolean isReadCallsMetricEnabled;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1453,4 +1457,8 @@ public int getBlobRenameDirConsumptionParallelism() {
public int getBlobDeleteDirConsumptionParallelism() {
return blobDeleteDirConsumptionParallelism;
}

public boolean isReadCallsMetricEnabled() {
return isReadCallsMetricEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_ENABLE_PAGINATED_DELETE = "fs.azure.enable.paginated.delete";

/**
* Specify whether paginated behavior is to be expected or not in delete path. {@value}
*/
public static final String FS_AZURE_ENABLE_READ_CALLS_METRIC = "fs.azure.enable.read.calls.metric";

/** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false;
public static final boolean DEFAULT_ENABLE_READ_CALLS_METRIC = true;
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ZERO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
Expand Down Expand Up @@ -841,7 +842,9 @@ public AbfsRestOperation read(final String path,
final String cachedSasToken,
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
LOG.debug("Read request coming from reader type: {}", tracingContext.getReaderID());
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
tracingContext.setPosition(getNormalizedValue(position, 4 * ONE_MB));
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, String.format(
"bytes=%d-%d", position, position + bufferLength - 1));
requestHeaders.add(rangeHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;

import static java.lang.System.currentTimeMillis;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
Expand Down Expand Up @@ -142,6 +143,8 @@ public abstract class AbfsClient implements Closeable {
private TimerTask runningTimerTask;
private boolean isSendMetricCall;
private SharedKeyCredentials metricSharedkeyCredentials = null;
public StringBuilder readMetricData = new StringBuilder();
public int readMetricFileIdx = 0;

/**
* logging the rename failure if metadata is in an incomplete state.
Expand Down Expand Up @@ -1209,11 +1212,100 @@ public void getMetricCall(TracingContext tracingContext) throws IOException {
requestHeaders);
try {
op.execute(tracingContext);
sendReadMetrics(tracingContext);
} finally {
this.isSendMetricCall = false;
}
}

public void updateReadMetrics(String inputStreamId, int bufferLength, int requestedLength, long contentLength, long nextReadPos, boolean firstRead) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets have it synchronized.

if (abfsConfiguration.isReadCallsMetricEnabled()) {
if (readMetricData.length() == 0) {
readMetricData.append("TimeStamp,StreamId,BufferLength,RequestedLength,ContentLength,NextReadPos,FirstRead\n");
}
readMetricData.append(currentTimeMillis()).append(COMMA)
.append(inputStreamId).append(COMMA)
.append(bufferLength).append(COMMA)
.append(requestedLength).append(COMMA)
.append(contentLength).append(COMMA)
.append(nextReadPos).append(COMMA)
.append(firstRead).append("\n");
}
}

public void sendReadMetrics(TracingContext tracingContext) throws IOException {
if (!abfsConfiguration.isReadCallsMetricEnabled() || readMetricData.length() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since readMetricData is init with some string, length should not be 0. i think we can remove it.

return;
}
String readMetricFileName = String.format("/%s_%d_%s.csv", tracingContext.getFileSystemID(), readMetricFileIdx, currentTimeMillis());
TracingContext tracingContext1 = new TracingContext(tracingContext);
tracingContext1.setMetricResults("");
createReadMetircFile(readMetricFileName, tracingContext1);
appendToReadMetricFile(readMetricFileName, tracingContext1);
flushReadMetricFile(readMetricFileName, tracingContext1);
Comment on lines +1243 to +1245
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are at client level, what if we directly call createPath, append, flush.

readMetricData = new StringBuilder();
readMetricFileIdx++;
}

public void createReadMetircFile(String path, TracingContext tracingContext) throws IOException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILE);

final URL url = createRequestUrl(new URL(abfsMetricUrl), path, abfsUriQueryBuilder.toString());

final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.CreatePath,
HTTP_METHOD_PUT, url, requestHeaders);
try {
op.execute(tracingContext);
} catch (AzureBlobFileSystemException e) {
LOG.error("Failed to create read metric file", e);
}
}

public void appendToReadMetricFile(String path, TracingContext tracingContext) throws IOException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(0));

final URL url = createRequestUrl(new URL(abfsMetricUrl), path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.Append,
HTTP_METHOD_PUT, url, requestHeaders,
readMetricData.toString().getBytes(), 0, readMetricData.length(), null);
try {
op.execute(tracingContext);
} catch (AzureBlobFileSystemException e) {
LOG.error("Failed to append to read metric file", e);
}
}

public void flushReadMetricFile(String path, TracingContext tracingContext) throws IOException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, String.valueOf(readMetricData.length()));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(true));

final URL url = createRequestUrl(new URL(abfsMetricUrl), path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.Flush,
HTTP_METHOD_PUT, url, requestHeaders, null);
try {
op.execute(tracingContext);
} catch (AzureBlobFileSystemException e) {
LOG.error("Failed to flush read metric file", e);
}
}

public boolean isSendMetricCall() {
return isSendMetricCall;
}
Expand Down Expand Up @@ -1333,4 +1425,12 @@ public abstract Hashtable<String, String> getXMSProperties(AbfsHttpOperation res
public abstract byte[] encodeAttribute(String value) throws UnsupportedEncodingException;

public abstract String decodeAttribute(byte[] value) throws UnsupportedEncodingException;

public static String getNormalizedValue(long value, int maxValue) {
if (value < maxValue) {
return String.valueOf(value);
} else {
return String.format("%.1f", (float)value/(ONE_MB));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
Expand Down Expand Up @@ -946,9 +947,11 @@ public AbfsRestOperation read(final String path,
String cachedSasToken,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) throws AzureBlobFileSystemException {
LOG.debug("Read request coming from reader type: {}", tracingContext.getReaderID());
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addEncryptionKeyRequestHeaders(path, requestHeaders, false,
contextEncryptionAdapter, tracingContext);
tracingContext.setPosition(getNormalizedValue(position, 4 * ONE_MB));
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1));
requestHeaders.add(rangeHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static java.lang.Math.min;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
Expand Down Expand Up @@ -211,7 +212,9 @@ public int read(long position, byte[] buffer, int offset, int length)
if (streamStatistics != null) {
streamStatistics.readOperationStarted();
}
int bytesRead = readRemote(position, buffer, offset, length, tracingContext);
LOG.debug("Direct read with position called, no Optimizations");
tracingContext.setReaderID("NR");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a new tracingContext object in the method, else parallel thread would use same tc and probably metric-corruption can happen.

int bytesRead = readRemote(position, buffer, offset, length, new TracingContext(tracingContext));
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
Expand All @@ -238,8 +241,10 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
if (b != null) {
LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
off, len);
client.updateReadMetrics(inputStreamId, b.length, len, contentLength, nextReadPos, firstRead);
} else {
LOG.debug("read requested b = null offset = {} len = {}", off, len);
client.updateReadMetrics(inputStreamId, -1, len, contentLength, nextReadPos, firstRead);
}

int currentOff = off;
Expand Down Expand Up @@ -337,9 +342,10 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
LOG.debug("Sequential read with read ahead size of {}", bufferSize);
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
// Enabling read ahead for random reads as well to reduce number of remote calls.
// Disabling read ahead for random reads as well to reduce number of remote calls.
int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize);
LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead);
LOG.debug("Random reads detected with read ahead size of {}", lengthWithReadAhead);
tracingContext.setReaderID("RR");
bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true);
}
}
Expand Down Expand Up @@ -369,6 +375,8 @@ private int readFileCompletely(final byte[] b, final int off, final int len)
// data need to be copied to user buffer from index bCursor, bCursor has
// to be the current fCusor
bCursor = (int) fCursor;
LOG.debug("Read Qualified for Small File Optimization");
tracingContext.setReaderID("SF");
return optimisedRead(b, off, len, 0, contentLength);
}

Expand All @@ -389,6 +397,8 @@ private int readLastBlock(final byte[] b, final int off, final int len)
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
long actualLenToRead = min(footerReadSize, contentLength);
LOG.debug("Read Qualified for Footer Optimization");
tracingContext.setReaderID("FR");
return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
}

Expand Down Expand Up @@ -504,6 +514,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
TracingContext readAheadTracingContext = new TracingContext(tracingContext);
readAheadTracingContext.setPrimaryRequestID();
readAheadTracingContext.setReaderID("PF");
while (numReadAheads > 0 && nextOffset < contentLength) {
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
Expand All @@ -528,10 +539,12 @@ private int readInternal(final long position, final byte[] b, final int offset,
}

// got nothing from read-ahead, do our own read now
LOG.debug("got nothing from read-ahead, do our own read now");
tracingContext.setReaderID("PN");
receivedBytes = readRemote(position, b, offset, length, new TracingContext(tracingContext));
return receivedBytes;
} else {
LOG.debug("read ahead disabled, reading remote");
LOG.debug("read ahead disabled, reading remote" + tracingContext.getReaderID());
return readRemote(position, b, offset, length, new TracingContext(tracingContext));
}
}
Expand Down Expand Up @@ -562,6 +575,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t
streamStatistics.remoteReadOperation();
}
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
LOG.debug("Reading for reader Type: " + tracingContext.getReaderID());
op = client.read(path, position, b, offset, length,
tolerateOobAppends ? "*" : eTag, cachedSasToken.get(),
contextEncryptionAdapter, tracingContext);
Expand Down Expand Up @@ -608,7 +622,14 @@ private void incrementReadOps() {
*/
@Override
public synchronized void seek(long n) throws IOException {
LOG.debug("requested seek to position {}", n);
if (firstRead && n > 0) {
tracingContext.setFirstReadPosition(AbfsClient.getNormalizedValue(n, 4 * ONE_MB));
tracingContext.setFirstReadPositionFromEnd(AbfsClient.getNormalizedValue(contentLength - n, 4 * ONE_MB));
} else {
tracingContext.setFirstReadPosition("");
Comment on lines +625 to +629
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might reflect on all the calls post seek. Any way if we can reset it.

tracingContext.setFirstReadPositionFromEnd("");
}
LOG.debug("requested seek to position {}, firstRead {}", n, firstRead);
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

class ReadBufferWorker implements Runnable {

Expand Down Expand Up @@ -63,6 +64,8 @@ public void run() {
if (buffer != null) {
try {
// do the actual read, from the file.
TracingContext tracingContext = buffer.getTracingContext();
tracingContext.setReaderID("PF");
int bytesRead = buffer.getStream().readRemote(
buffer.getOffset(),
buffer.getBuffer(),
Expand All @@ -71,7 +74,7 @@ public void run() {
// read-ahead buffer size, make sure a valid length is passed
// for remote read
Math.min(buffer.getRequestedLength(), buffer.getBuffer().length),
buffer.getTracingContext());
tracingContext);

bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ public interface Listener {
void setOperation(FSOperationType operation);
void updateIngressHandler(String ingressHandler);
void updatePosition(String position);
void updateReaderId(String readerId);
void updateFirstReadPosition(String firstReaPosition);
void updateFirstReadPositionFromEnd(String firstReaPositionFromEnd);
}
Loading
Loading