Skip to content

Commit

Permalink
HADOOP-18325: ABFS: Add correlated metric support for ABFS operations (
Browse files Browse the repository at this point in the history
…#6314)


Adds support for metric collection at the filesystem instance level.
Metrics are pushed to the store upon the closure of a filesystem instance, encompassing all operations
that utilized that specific instance.

Collected Metrics:

- Number of successful requests without any retries.
- Count of requests that succeeded after a specified number of retries (x retries).
- Request count subjected to throttling.
- Number of requests that failed despite exhausting all retry attempts. etc.
Implementation Details:

Incorporated logic in the AbfsClient to facilitate metric pushing through an additional request.
This occurs in scenarios where no requests are sent to the backend for a defined idle period.
By implementing these enhancements, we ensure comprehensive monitoring and analysis of filesystem interactions, enabling a deeper understanding of success rates, retry scenarios, throttling instances, and exhaustive failure scenarios. Additionally, the AbfsClient logic ensures that metrics are proactively pushed even during idle periods, maintaining a continuous and accurate representation of filesystem performance.

Contributed by Anmol Asrani
  • Loading branch information
anmolanmol1234 authored May 23, 2024
1 parent d876505 commit d168d3f
Show file tree
Hide file tree
Showing 26 changed files with 2,042 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THOUSAND;

public class AbfsBackoffMetrics {

private AtomicLong numberOfRequestsSucceeded;

private AtomicLong minBackoff;

private AtomicLong maxBackoff;

private AtomicLong totalRequests;

private AtomicLong totalBackoff;

private String retryCount;

private AtomicLong numberOfIOPSThrottledRequests;

private AtomicLong numberOfBandwidthThrottledRequests;

private AtomicLong numberOfOtherThrottledRequests;

private AtomicLong numberOfNetworkFailedRequests;

private AtomicLong maxRetryCount;

private AtomicLong totalNumberOfRequests;

private AtomicLong numberOfRequestsSucceededWithoutRetrying;

private AtomicLong numberOfRequestsFailed;

private final Map<String, AbfsBackoffMetrics> metricsMap
= new ConcurrentHashMap<>();

public AbfsBackoffMetrics() {
initializeMap();
this.numberOfIOPSThrottledRequests = new AtomicLong();
this.numberOfBandwidthThrottledRequests = new AtomicLong();
this.numberOfOtherThrottledRequests = new AtomicLong();
this.totalNumberOfRequests = new AtomicLong();
this.maxRetryCount = new AtomicLong();
this.numberOfRequestsSucceededWithoutRetrying = new AtomicLong();
this.numberOfRequestsFailed = new AtomicLong();
this.numberOfNetworkFailedRequests = new AtomicLong();
}

public AbfsBackoffMetrics(String retryCount) {
this.retryCount = retryCount;
this.numberOfRequestsSucceeded = new AtomicLong();
this.minBackoff = new AtomicLong(Long.MAX_VALUE);
this.maxBackoff = new AtomicLong();
this.totalRequests = new AtomicLong();
this.totalBackoff = new AtomicLong();
}

private void initializeMap() {
ArrayList<String> retryCountList = new ArrayList<String>(
Arrays.asList("1", "2", "3", "4", "5_15", "15_25", "25AndAbove"));
for (String s : retryCountList) {
metricsMap.put(s, new AbfsBackoffMetrics(s));
}
}

public long getNumberOfRequestsSucceeded() {
return this.numberOfRequestsSucceeded.get();
}

public void setNumberOfRequestsSucceeded(long numberOfRequestsSucceeded) {
this.numberOfRequestsSucceeded.set(numberOfRequestsSucceeded);
}

public void incrementNumberOfRequestsSucceeded() {
this.numberOfRequestsSucceeded.getAndIncrement();
}

public long getMinBackoff() {
return this.minBackoff.get();
}

public void setMinBackoff(long minBackoff) {
this.minBackoff.set(minBackoff);
}

public long getMaxBackoff() {
return this.maxBackoff.get();
}

public void setMaxBackoff(long maxBackoff) {
this.maxBackoff.set(maxBackoff);
}

public long getTotalRequests() {
return this.totalRequests.get();
}

public void incrementTotalRequests() {
this.totalRequests.incrementAndGet();
}

public void setTotalRequests(long totalRequests) {
this.totalRequests.set(totalRequests);
}

public long getTotalBackoff() {
return this.totalBackoff.get();
}

public void setTotalBackoff(long totalBackoff) {
this.totalBackoff.set(totalBackoff);
}

public String getRetryCount() {
return this.retryCount;
}

public long getNumberOfIOPSThrottledRequests() {
return this.numberOfIOPSThrottledRequests.get();
}

public void setNumberOfIOPSThrottledRequests(long numberOfIOPSThrottledRequests) {
this.numberOfIOPSThrottledRequests.set(numberOfIOPSThrottledRequests);
}

public void incrementNumberOfIOPSThrottledRequests() {
this.numberOfIOPSThrottledRequests.getAndIncrement();
}

public long getNumberOfBandwidthThrottledRequests() {
return this.numberOfBandwidthThrottledRequests.get();
}

public void setNumberOfBandwidthThrottledRequests(long numberOfBandwidthThrottledRequests) {
this.numberOfBandwidthThrottledRequests.set(numberOfBandwidthThrottledRequests);
}

public void incrementNumberOfBandwidthThrottledRequests() {
this.numberOfBandwidthThrottledRequests.getAndIncrement();
}

public long getNumberOfOtherThrottledRequests() {
return this.numberOfOtherThrottledRequests.get();
}

public void setNumberOfOtherThrottledRequests(long numberOfOtherThrottledRequests) {
this.numberOfOtherThrottledRequests.set(numberOfOtherThrottledRequests);
}

public void incrementNumberOfOtherThrottledRequests() {
this.numberOfOtherThrottledRequests.getAndIncrement();
}

public long getMaxRetryCount() {
return this.maxRetryCount.get();
}

public void setMaxRetryCount(long maxRetryCount) {
this.maxRetryCount.set(maxRetryCount);
}

public void incrementMaxRetryCount() {
this.maxRetryCount.getAndIncrement();
}

public long getTotalNumberOfRequests() {
return this.totalNumberOfRequests.get();
}

public void setTotalNumberOfRequests(long totalNumberOfRequests) {
this.totalNumberOfRequests.set(totalNumberOfRequests);
}

public void incrementTotalNumberOfRequests() {
this.totalNumberOfRequests.getAndIncrement();
}

public Map<String, AbfsBackoffMetrics> getMetricsMap() {
return metricsMap;
}

public long getNumberOfRequestsSucceededWithoutRetrying() {
return this.numberOfRequestsSucceededWithoutRetrying.get();
}

public void setNumberOfRequestsSucceededWithoutRetrying(long numberOfRequestsSucceededWithoutRetrying) {
this.numberOfRequestsSucceededWithoutRetrying.set(numberOfRequestsSucceededWithoutRetrying);
}

public void incrementNumberOfRequestsSucceededWithoutRetrying() {
this.numberOfRequestsSucceededWithoutRetrying.getAndIncrement();
}

public long getNumberOfRequestsFailed() {
return this.numberOfRequestsFailed.get();
}

public void setNumberOfRequestsFailed(long numberOfRequestsFailed) {
this.numberOfRequestsFailed.set(numberOfRequestsFailed);
}

public void incrementNumberOfRequestsFailed() {
this.numberOfRequestsFailed.getAndIncrement();
}

public long getNumberOfNetworkFailedRequests() {
return this.numberOfNetworkFailedRequests.get();
}

public void setNumberOfNetworkFailedRequests(long numberOfNetworkFailedRequests) {
this.numberOfNetworkFailedRequests.set(numberOfNetworkFailedRequests);
}

public void incrementNumberOfNetworkFailedRequests() {
this.numberOfNetworkFailedRequests.getAndIncrement();
}

/*
Acronyms :-
1.RCTSI :- Request count that succeeded in x retries
2.MMA :- Min Max Average (This refers to the backoff or sleep time between 2 requests)
3.s :- seconds
4.BWT :- Number of Bandwidth throttled requests
5.IT :- Number of IOPS throttled requests
6.OT :- Number of Other throttled requests
7.NFR :- Number of requests which failed due to network errors
8.%RT :- Percentage of requests that are throttled
9.TRNR :- Total number of requests which succeeded without retrying
10.TRF :- Total number of requests which failed
11.TR :- Total number of requests which were made
12.MRC :- Max retry count across all requests
*/
@Override
public String toString() {
StringBuilder metricString = new StringBuilder();
long totalRequestsThrottled = getNumberOfBandwidthThrottledRequests()
+ getNumberOfIOPSThrottledRequests()
+ getNumberOfOtherThrottledRequests();
double percentageOfRequestsThrottled =
((double) totalRequestsThrottled / getTotalNumberOfRequests()) * HUNDRED;
for (Map.Entry<String, AbfsBackoffMetrics> entry : metricsMap.entrySet()) {
metricString.append("$RCTSI$_").append(entry.getKey())
.append("R_").append("=")
.append(entry.getValue().getNumberOfRequestsSucceeded());
long totalRequests = entry.getValue().getTotalRequests();
if (totalRequests > 0) {
metricString.append("$MMA$_").append(entry.getKey())
.append("R_").append("=")
.append(String.format("%.3f",
(double) entry.getValue().getMinBackoff() / THOUSAND))
.append("s")
.append(String.format("%.3f",
(double) entry.getValue().getMaxBackoff() / THOUSAND))
.append("s")
.append(String.format("%.3f",
((double) entry.getValue().getTotalBackoff() / totalRequests)
/ THOUSAND))
.append("s");
} else {
metricString.append("$MMA$_").append(entry.getKey())
.append("R_").append("=0s");
}
}
metricString.append("$BWT=")
.append(getNumberOfBandwidthThrottledRequests())
.append("$IT=")
.append(getNumberOfIOPSThrottledRequests())
.append("$OT=")
.append(getNumberOfOtherThrottledRequests())
.append("$RT=")
.append(String.format("%.3f", percentageOfRequestsThrottled))
.append("$NFR=")
.append(getNumberOfNetworkFailedRequests())
.append("$TRNR=")
.append(getNumberOfRequestsSucceededWithoutRetrying())
.append("$TRF=")
.append(getNumberOfRequestsFailed())
.append("$TR=")
.append(getTotalNumberOfRequests())
.append("$MRC=")
.append(getMaxRetryCount());

return metricString + "";
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.Field;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.util.Preconditions;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -291,6 +292,26 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_IDLE_TIMEOUT,
DefaultValue = DEFAULT_METRIC_IDLE_TIMEOUT_MS)
private int metricIdleTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ANALYSIS_TIMEOUT,
DefaultValue = DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS)
private int metricAnalysisTimeout;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_URI,
DefaultValue = EMPTY_STRING)
private String metricUri;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_NAME,
DefaultValue = EMPTY_STRING)
private String metricAccount;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_KEY,
DefaultValue = EMPTY_STRING)
private String metricAccountKey;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
private int accountOperationIdleTimeout;
Expand Down Expand Up @@ -818,6 +839,26 @@ public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling;
}

public int getMetricIdleTimeout() {
return this.metricIdleTimeout;
}

public int getMetricAnalysisTimeout() {
return this.metricAnalysisTimeout;
}

public String getMetricUri() {
return metricUri;
}

public String getMetricAccount() {
return metricAccount;
}

public String getMetricAccountKey() {
return metricAccountKey;
}

public int getAccountOperationIdleTimeout() {
return accountOperationIdleTimeout;
}
Expand Down Expand Up @@ -854,6 +895,10 @@ public TracingHeaderFormat getTracingHeaderFormat() {
return getEnum(FS_AZURE_TRACINGHEADER_FORMAT, TracingHeaderFormat.ALL_ID_FORMAT);
}

public MetricFormat getMetricFormat() {
return getEnum(FS_AZURE_METRIC_FORMAT, MetricFormat.EMPTY);
}

public AuthType getAuthType(String accountName) {
return getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
}
Expand Down
Loading

0 comments on commit d168d3f

Please sign in to comment.