diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 03cc14a75ff0d..92269357d803c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -417,6 +417,10 @@ public class AbfsConfiguration{ FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD) private int blobDeleteDirConsumptionParallelism; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_ENABLE_READ_CALLS_METRIC, DefaultValue = DEFAULT_ENABLE_READ_CALLS_METRIC) + private boolean isReadCallsMetricEnabled; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; @@ -1453,4 +1457,8 @@ public int getBlobRenameDirConsumptionParallelism() { public int getBlobDeleteDirConsumptionParallelism() { return blobDeleteDirConsumptionParallelism; } + + public boolean isReadCallsMetricEnabled() { + return isReadCallsMetricEnabled; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 3268f2c4b168e..4223482c9abce 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -314,6 +314,11 @@ public final class ConfigurationKeys { */ public static final String FS_AZURE_ENABLE_PAGINATED_DELETE = "fs.azure.enable.paginated.delete"; + /** + * Specify whether paginated behavior is to be expected or not in delete path. {@value} + */ + public static final String FS_AZURE_ENABLE_READ_CALLS_METRIC = "fs.azure.enable.read.calls.metric"; + /** Add extra layer of verification of the integrity of the request content during transport: {@value}. */ public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index c67331f4785fb..a0929481c7041 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -137,6 +137,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true; public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true; public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false; + public static final boolean DEFAULT_ENABLE_READ_CALLS_METRIC = true; public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 7700036866ee9..a2b65b2f29fa7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -117,6 +117,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ZERO; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE; @@ -841,7 +842,9 @@ public AbfsRestOperation read(final String path, final String cachedSasToken, final ContextEncryptionAdapter contextEncryptionAdapter, final TracingContext tracingContext) throws AzureBlobFileSystemException { + LOG.debug("Read request coming from reader type: {}", tracingContext.getReaderID()); final List requestHeaders = createDefaultHeaders(); + tracingContext.setPosition(getNormalizedValue(position, 4 * ONE_MB)); AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, String.format( "bytes=%d-%d", position, position + bufferLength - 1)); requestHeaders.add(rangeHeader); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index c093103ecdaa1..a084e1bc41fe2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -88,6 +88,7 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import static java.lang.System.currentTimeMillis; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; @@ -142,6 +143,8 @@ public abstract class AbfsClient implements Closeable { private TimerTask runningTimerTask; private boolean isSendMetricCall; private SharedKeyCredentials metricSharedkeyCredentials = null; + public StringBuilder readMetricData = new StringBuilder(); + public int readMetricFileIdx = 0; /** * logging the rename failure if metadata is in an incomplete state. @@ -1209,11 +1212,100 @@ public void getMetricCall(TracingContext tracingContext) throws IOException { requestHeaders); try { op.execute(tracingContext); + sendReadMetrics(tracingContext); } finally { this.isSendMetricCall = false; } } + public void updateReadMetrics(String inputStreamId, int bufferLength, int requestedLength, long contentLength, long nextReadPos, boolean firstRead) { + if (abfsConfiguration.isReadCallsMetricEnabled()) { + if (readMetricData.length() == 0) { + readMetricData.append("TimeStamp,StreamId,BufferLength,RequestedLength,ContentLength,NextReadPos,FirstRead\n"); + } + readMetricData.append(currentTimeMillis()).append(COMMA) + .append(inputStreamId).append(COMMA) + .append(bufferLength).append(COMMA) + .append(requestedLength).append(COMMA) + .append(contentLength).append(COMMA) + .append(nextReadPos).append(COMMA) + .append(firstRead).append("\n"); + } + } + + public void sendReadMetrics(TracingContext tracingContext) throws IOException { + if (!abfsConfiguration.isReadCallsMetricEnabled() || readMetricData.length() == 0) { + return; + } + String readMetricFileName = String.format("/%s_%d_%s.csv", tracingContext.getFileSystemID(), readMetricFileIdx, currentTimeMillis()); + TracingContext tracingContext1 = new TracingContext(tracingContext); + tracingContext1.setMetricResults(""); + createReadMetircFile(readMetricFileName, tracingContext1); + appendToReadMetricFile(readMetricFileName, tracingContext1); + flushReadMetricFile(readMetricFileName, tracingContext1); + readMetricData = new StringBuilder(); + readMetricFileIdx++; + } + + public void createReadMetircFile(String path, TracingContext tracingContext) throws IOException { + final List requestHeaders = createDefaultHeaders(); + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILE); + + final URL url = createRequestUrl(new URL(abfsMetricUrl), path, abfsUriQueryBuilder.toString()); + + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.CreatePath, + HTTP_METHOD_PUT, url, requestHeaders); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException e) { + LOG.error("Failed to create read metric file", e); + } + } + + public void appendToReadMetricFile(String path, TracingContext tracingContext) throws IOException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(0)); + + final URL url = createRequestUrl(new URL(abfsMetricUrl), path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.Append, + HTTP_METHOD_PUT, url, requestHeaders, + readMetricData.toString().getBytes(), 0, readMetricData.length(), null); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException e) { + LOG.error("Failed to append to read metric file", e); + } + } + + public void flushReadMetricFile(String path, TracingContext tracingContext) throws IOException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, String.valueOf(readMetricData.length())); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(true)); + + final URL url = createRequestUrl(new URL(abfsMetricUrl), path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.Flush, + HTTP_METHOD_PUT, url, requestHeaders, null); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException e) { + LOG.error("Failed to flush read metric file", e); + } + } + public boolean isSendMetricCall() { return isSendMetricCall; } @@ -1333,4 +1425,12 @@ public abstract Hashtable getXMSProperties(AbfsHttpOperation res public abstract byte[] encodeAttribute(String value) throws UnsupportedEncodingException; public abstract String decodeAttribute(byte[] value) throws UnsupportedEncodingException; + + public static String getNormalizedValue(long value, int maxValue) { + if (value < maxValue) { + return String.valueOf(value); + } else { + return String.format("%.1f", (float)value/(ONE_MB)); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index f40a48a471425..3357ff3b0590c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -96,6 +96,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH; @@ -946,9 +947,11 @@ public AbfsRestOperation read(final String path, String cachedSasToken, ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) throws AzureBlobFileSystemException { + LOG.debug("Read request coming from reader type: {}", tracingContext.getReaderID()); final List requestHeaders = createDefaultHeaders(); addEncryptionKeyRequestHeaders(path, requestHeaders, false, contextEncryptionAdapter, tracingContext); + tracingContext.setPosition(getNormalizedValue(position, 4 * ONE_MB)); AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, String.format("bytes=%d-%d", position, position + bufferLength - 1)); requestHeaders.add(rangeHeader); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index cacd3b092eb3f..9d6c71ed0a0df 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -51,6 +51,7 @@ import static java.lang.Math.min; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; import static org.apache.hadoop.util.StringUtils.toLowerCase; @@ -211,7 +212,9 @@ public int read(long position, byte[] buffer, int offset, int length) if (streamStatistics != null) { streamStatistics.readOperationStarted(); } - int bytesRead = readRemote(position, buffer, offset, length, tracingContext); + LOG.debug("Direct read with position called, no Optimizations"); + tracingContext.setReaderID("NR"); + int bytesRead = readRemote(position, buffer, offset, length, new TracingContext(tracingContext)); if (statistics != null) { statistics.incrementBytesRead(bytesRead); } @@ -238,8 +241,10 @@ public synchronized int read(final byte[] b, final int off, final int len) throw if (b != null) { LOG.debug("read requested b.length = {} offset = {} len = {}", b.length, off, len); + client.updateReadMetrics(inputStreamId, b.length, len, contentLength, nextReadPos, firstRead); } else { LOG.debug("read requested b = null offset = {} len = {}", off, len); + client.updateReadMetrics(inputStreamId, -1, len, contentLength, nextReadPos, firstRead); } int currentOff = off; @@ -337,9 +342,10 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO LOG.debug("Sequential read with read ahead size of {}", bufferSize); bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); } else { - // Enabling read ahead for random reads as well to reduce number of remote calls. + // Disabling read ahead for random reads as well to reduce number of remote calls. int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize); - LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); + LOG.debug("Random reads detected with read ahead size of {}", lengthWithReadAhead); + tracingContext.setReaderID("RR"); bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true); } } @@ -369,6 +375,8 @@ private int readFileCompletely(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, bCursor has // to be the current fCusor bCursor = (int) fCursor; + LOG.debug("Read Qualified for Small File Optimization"); + tracingContext.setReaderID("SF"); return optimisedRead(b, off, len, 0, contentLength); } @@ -389,6 +397,8 @@ private int readLastBlock(final byte[] b, final int off, final int len) bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize long actualLenToRead = min(footerReadSize, contentLength); + LOG.debug("Read Qualified for Footer Optimization"); + tracingContext.setReaderID("FR"); return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); } @@ -504,6 +514,7 @@ private int readInternal(final long position, final byte[] b, final int offset, LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads); TracingContext readAheadTracingContext = new TracingContext(tracingContext); readAheadTracingContext.setPrimaryRequestID(); + readAheadTracingContext.setReaderID("PF"); while (numReadAheads > 0 && nextOffset < contentLength) { LOG.debug("issuing read ahead requestedOffset = {} requested size {}", nextOffset, nextSize); @@ -528,10 +539,12 @@ private int readInternal(final long position, final byte[] b, final int offset, } // got nothing from read-ahead, do our own read now + LOG.debug("got nothing from read-ahead, do our own read now"); + tracingContext.setReaderID("PN"); receivedBytes = readRemote(position, b, offset, length, new TracingContext(tracingContext)); return receivedBytes; } else { - LOG.debug("read ahead disabled, reading remote"); + LOG.debug("read ahead disabled, reading remote" + tracingContext.getReaderID()); return readRemote(position, b, offset, length, new TracingContext(tracingContext)); } } @@ -562,6 +575,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t streamStatistics.remoteReadOperation(); } LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); + LOG.debug("Reading for reader Type: " + tracingContext.getReaderID()); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), contextEncryptionAdapter, tracingContext); @@ -608,7 +622,14 @@ private void incrementReadOps() { */ @Override public synchronized void seek(long n) throws IOException { - LOG.debug("requested seek to position {}", n); + if (firstRead && n > 0) { + tracingContext.setFirstReadPosition(AbfsClient.getNormalizedValue(n, 4 * ONE_MB)); + tracingContext.setFirstReadPositionFromEnd(AbfsClient.getNormalizedValue(contentLength - n, 4 * ONE_MB)); + } else { + tracingContext.setFirstReadPosition(""); + tracingContext.setFirstReadPositionFromEnd(""); + } + LOG.debug("requested seek to position {}, firstRead {}", n, firstRead); if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index a30f06261ef6f..031b3c9b49033 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; class ReadBufferWorker implements Runnable { @@ -63,6 +64,8 @@ public void run() { if (buffer != null) { try { // do the actual read, from the file. + TracingContext tracingContext = buffer.getTracingContext(); + tracingContext.setReaderID("PF"); int bytesRead = buffer.getStream().readRemote( buffer.getOffset(), buffer.getBuffer(), @@ -71,7 +74,7 @@ public void run() { // read-ahead buffer size, make sure a valid length is passed // for remote read Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), - buffer.getTracingContext()); + tracingContext); bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager } catch (IOException ex) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java index d976c6f9b6617..f8e032696ad53 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java @@ -32,4 +32,7 @@ public interface Listener { void setOperation(FSOperationType operation); void updateIngressHandler(String ingressHandler); void updatePosition(String position); + void updateReaderId(String readerId); + void updateFirstReadPosition(String firstReaPosition); + void updateFirstReadPositionFromEnd(String firstReaPositionFromEnd); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 61235bd8c15f5..ae46987f93ae1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -65,6 +65,9 @@ public class TracingContext { private String header = EMPTY_STRING; private String ingressHandler = EMPTY_STRING; private String position = EMPTY_STRING; + private String firstReadPosition = EMPTY_STRING; + private String firstReadPositionFromEnd = EMPTY_STRING; + private String readerID = EMPTY_STRING; private String metricResults = EMPTY_STRING; private String metricHeader = EMPTY_STRING; @@ -137,6 +140,9 @@ public TracingContext(TracingContext originalTracingContext) { this.format = originalTracingContext.format; this.operatedBlobCount = originalTracingContext.operatedBlobCount; this.position = originalTracingContext.getPosition(); + this.firstReadPosition = originalTracingContext.getFirstReadPosition(); + this.firstReadPositionFromEnd = originalTracingContext.getFirstReadPositionFromEnd(); + this.readerID = originalTracingContext.readerID; this.ingressHandler = originalTracingContext.getIngressHandler(); if (originalTracingContext.listener != null) { this.listener = originalTracingContext.listener.getClone(); @@ -204,6 +210,17 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail } if (!(position.equals(EMPTY_STRING))) { header += ":" + position; + if (firstReadPosition.equals(position)) { + header += "_S"; + } else if (!firstReadPosition.equals(EMPTY_STRING)) { + header += "_" + firstReadPosition; + } + if (!firstReadPositionFromEnd.equals(EMPTY_STRING)) { + header += "_" + firstReadPositionFromEnd; + } + } + if (!(readerID.equals(EMPTY_STRING))) { + header += ":" + readerID; } if (operatedBlobCount != null) { header += (":" + operatedBlobCount); @@ -224,8 +241,9 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail } httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header); if (!metricHeader.equals(EMPTY_STRING)) { - httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_FECLIENT_METRICS, metricHeader); + header += metricHeader; } + httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header); /* * In case the primaryRequestId is an empty-string and if it is the first try to * API call (previousFailure shall be null), maintain the last part of clientRequestId's @@ -294,6 +312,32 @@ public String getPosition() { return position; } + /** + * Gets the first read position. + * + * @return the first read position as a String. + */ + public String getFirstReadPosition() { + return firstReadPosition; + } + + /** + * Gets the firstReadPositionFromEnd. + * + * @return the firstReadPositionFromEnd as a String. + */ + public String getFirstReadPositionFromEnd() { + return firstReadPositionFromEnd; + } + + public String getFileSystemID() { + return fileSystemID; + } + + public String getReaderID() { + return readerID; + } + public FSOperationType getOpType() { return opType; } @@ -310,6 +354,10 @@ public void setIngressHandler(final String ingressHandler) { } } + public void setMetricResults(final String metricResults) { + this.metricResults = metricResults; + } + /** * Sets the position. * @@ -322,4 +370,35 @@ public void setPosition(final String position) { } } + /** + * Sets the first read position. + * + * @param firstReadPosition the first read position to set, must not be null. + */ + public void setFirstReadPosition(final String firstReadPosition) { + this.firstReadPosition = firstReadPosition; + if (listener != null) { + listener.updateFirstReadPosition(firstReadPosition); + } + } + + /** + * Sets the first read position from end. + * + * @param firstReadPositionFromEnd the first read position from end to set, must not be null. + */ + public void setFirstReadPositionFromEnd(final String firstReadPositionFromEnd) { + this.firstReadPositionFromEnd = firstReadPositionFromEnd; + if (listener != null) { + listener.updateFirstReadPositionFromEnd(firstReadPositionFromEnd); + } + } + + public void setReaderID(final String readerID) { + this.readerID = readerID; + if (listener != null) { + listener.updateReaderId(readerID); + } + } + } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index 994c698f632ee..2fa37cf857fd7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -40,6 +40,9 @@ public class TracingHeaderValidator implements Listener { private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"; private String ingressHandler = null; private String position = null; + private String firstReaPosition = null; + private String firstReaPositionFromEnd = null; + private String readerId = null; private Integer operatedBlobCount = null; @@ -59,6 +62,9 @@ public TracingHeaderValidator getClone() { tracingHeaderValidator.operatedBlobCount = operatedBlobCount; tracingHeaderValidator.ingressHandler = ingressHandler; tracingHeaderValidator.position = position; + tracingHeaderValidator.firstReaPosition = firstReaPosition; + tracingHeaderValidator.firstReaPositionFromEnd = firstReaPositionFromEnd; + tracingHeaderValidator.readerId = readerId; return tracingHeaderValidator; } @@ -106,7 +112,7 @@ private void validateTracingHeader(String tracingContextHeader) { private void validateBasicFormat(String[] idList) { if (format == TracingHeaderFormat.ALL_ID_FORMAT) { - int expectedSize = operatedBlobCount == null ? 7 : 8; + int expectedSize = operatedBlobCount == null ? 9 : 10; if (ingressHandler != null) { expectedSize += 2; } @@ -185,4 +191,19 @@ public void updateIngressHandler(String ingressHandler) { public void updatePosition(String position) { this.position = position; } + + @Override + public void updateReaderId(String readerId) { + this.readerId = readerId; + } + + @Override + public void updateFirstReadPosition(String firstReaPosition) { + this.firstReaPosition = firstReaPosition; + } + + @Override + public void updateFirstReadPositionFromEnd(String firstReaPositionFromEnd) { + this.firstReaPositionFromEnd = firstReaPositionFromEnd; + } }