Skip to content

Commit

Permalink
Merge pull request #326 from momentohq/feat/speedUpCollection2
Browse files Browse the repository at this point in the history
fix: more parrellelization around dynamodb metric collection
  • Loading branch information
cprice404 authored Jun 3, 2024
2 parents 81f77fc + 06fb7c6 commit 8193607
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 56 deletions.
7 changes: 6 additions & 1 deletion momento-cli-opts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ to help find opportunities for optimizations with Momento.
help = "Opt in to check whether ddb tables have ttl enabled. If there are lots of tables, could slow down data collection"
)]
enable_ddb_ttl_check: bool,
#[arg(
long = "enable-gsi",
help = "Opt in to check metrics on dynamodb gsi's. If there are lots of tables with gsi's, could slow down data collection"
)]
enable_gsi: bool,
#[arg(
value_enum,
long = "resource",
Expand All @@ -238,7 +243,7 @@ to help find opportunities for optimizations with Momento.
#[arg(
long = "metric-collection-rate",
help = "tps at which to invoke the aws `get-metric-data` api",
default_value = "20"
default_value = "10"
)]
metric_collection_rate: u32,
},
Expand Down
5 changes: 3 additions & 2 deletions momento/src/commands/cloud_linter/api_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ async fn process_apis(
let mut resources: Vec<Resource> = Vec::with_capacity(apis.len());
let get_apis_bar =
ProgressBar::new((apis.len() * 2) as u64).with_message("Processing API Gateway resources");
get_apis_bar
.set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template"));
get_apis_bar.set_style(
ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"),
);
for api in apis {
let the_api = apig_client
.get_rest_api()
Expand Down
153 changes: 104 additions & 49 deletions momento/src/commands/cloud_linter/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::commands::cloud_linter::resource::{DynamoDbResource, Resource, Resour
use crate::commands::cloud_linter::utils::rate_limit;
use crate::error::CliError;

const DDB_TABLE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
const DDB_TABLE_PROVISIONED_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
"Sum" => &[
"ConsumedReadCapacityUnits",
"ConsumedWriteCapacityUnits",
Expand All @@ -27,8 +27,6 @@ const DDB_TABLE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
"ReadThrottleEvents",
"WriteThrottleEvents",
"TimeToLiveDeletedItemCount",
"TransactionConflict",
"ConditionalCheckFailedRequests",
],
"Average" => &[
"ConsumedReadCapacityUnits",
Expand All @@ -46,6 +44,26 @@ const DDB_TABLE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
],
};

const DDB_TABLE_PAY_PER_USE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
"Sum" => &[
"ConsumedReadCapacityUnits",
"ConsumedWriteCapacityUnits",
"ReadThrottleEvents",
"WriteThrottleEvents",
"TimeToLiveDeletedItemCount",
],
"Average" => &[
"ConsumedReadCapacityUnits",
"ConsumedWriteCapacityUnits",
],
"Maximum" => &[
"ConsumedReadCapacityUnits",
"ConsumedWriteCapacityUnits",
"ReadThrottleEvents",
"WriteThrottleEvents",
],
};

const DDB_GSI_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
"Sum" => &[
"ConsumedReadCapacityUnits",
Expand Down Expand Up @@ -132,12 +150,28 @@ pub(crate) struct GsiMetadata {
impl ResourceWithMetrics for DynamoDbResource {
fn create_metric_targets(&self) -> Result<Vec<MetricTarget>, CliError> {
match self.resource_type {
ResourceType::DynamoDbTable => Ok(vec![MetricTarget {
namespace: "AWS/DynamoDB".to_string(),
expression: "".to_string(),
dimensions: HashMap::from([("TableName".to_string(), self.id.clone())]),
targets: DDB_TABLE_METRICS,
}]),
ResourceType::DynamoDbTable => {
if self
.metadata
.billing_mode
.clone()
.unwrap_or_default()
.eq("PAY_PER_REQUEST")
{
return Ok(vec![MetricTarget {
namespace: "AWS/DynamoDB".to_string(),
expression: "".to_string(),
dimensions: HashMap::from([("TableName".to_string(), self.id.clone())]),
targets: DDB_TABLE_PAY_PER_USE_METRICS,
}]);
}
Ok(vec![MetricTarget {
namespace: "AWS/DynamoDB".to_string(),
expression: "".to_string(),
dimensions: HashMap::from([("TableName".to_string(), self.id.clone())]),
targets: DDB_TABLE_PROVISIONED_METRICS,
}])
}
ResourceType::DynamoDbGsi => {
let gsi_name = self
.metadata
Expand Down Expand Up @@ -179,6 +213,7 @@ pub(crate) async fn process_ddb_resources(
describe_ttl_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
enable_ddb_ttl_check: bool,
enable_gsi: bool,
) -> Result<(), CliError> {
let ddb_client = aws_sdk_dynamodb::Client::new(config);
let metrics_client = aws_sdk_cloudwatch::Client::new(config);
Expand All @@ -190,47 +225,56 @@ pub(crate) async fn process_ddb_resources(

let describe_ddb_tables_bar =
ProgressBar::new(table_names.len() as u64).with_message("Processing Dynamo DB tables");
describe_ddb_tables_bar
.set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template"));

let futures = FuturesUnordered::new();

for table_name in table_names {
let sender_clone = sender.clone();
let ddb_client_clone = ddb_client.clone();
let metrics_client_clone = metrics_client.clone();
let table_name_clone = table_name.clone();
let control_plane_limiter_clone = control_plane_limiter.clone();
let metrics_limiter_clone = metrics_limiter.clone();
let describe_ttl_limiter_clone = describe_ttl_limiter.clone();
let progress_bar_clone = describe_ddb_tables_bar.clone();
let spawn = tokio::spawn(async move {
let res = process_table_resources(
&ddb_client_clone,
&metrics_client_clone,
&table_name_clone,
control_plane_limiter_clone,
metrics_limiter_clone,
describe_ttl_limiter_clone,
sender_clone,
enable_ddb_ttl_check,
)
.await;
progress_bar_clone.inc(1);
res
});
futures.push(spawn);
}
describe_ddb_tables_bar.set_style(
ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"),
);

let all_results = futures::future::join_all(futures).await;
for result in all_results {
match result {
// bubble up any cli errors that we came across
Ok(res) => res?,
Err(_) => {
return Err(CliError {
msg: "failed to wait for all dynamo resources to collect data".to_string(),
})
// we don't want to spawn 1 million processes at the same time. We chunk the tables into chunks of 10, complete 10
// at a time, describing tables as well as getting all table metrics, and then move on to the next 10
let table_chunks = table_names.chunks(10);
let vec_tables: Vec<&[String]> = table_chunks.into_iter().collect();

for table_batch in vec_tables {
let futures = FuturesUnordered::new();
for table_name in table_batch {
let sender_clone = sender.clone();
let ddb_client_clone = ddb_client.clone();
let metrics_client_clone = metrics_client.clone();
let table_name_clone = table_name.clone();
let control_plane_limiter_clone = Arc::clone(&control_plane_limiter);
let metrics_limiter_clone = Arc::clone(&metrics_limiter);
let describe_ttl_limiter_clone = Arc::clone(&describe_ttl_limiter);
let progress_bar_clone = describe_ddb_tables_bar.clone();
let spawn = tokio::spawn(async move {
let res = process_table_resources(
&ddb_client_clone,
&metrics_client_clone,
&table_name_clone,
control_plane_limiter_clone,
metrics_limiter_clone,
describe_ttl_limiter_clone,
sender_clone,
enable_ddb_ttl_check,
enable_gsi,
)
.await;
progress_bar_clone.inc(1);
res
});
futures.push(spawn);
}

let all_results = futures::future::join_all(futures).await;
for result in all_results {
match result {
// bubble up any cli errors that we came across
Ok(res) => res?,
Err(_) => {
println!("failed to wait for all dynamodb tables");
return Err(CliError {
msg: "failed to wait for all dynamo resources to collect data".to_string(),
});
}
}
}
}
Expand Down Expand Up @@ -323,6 +367,7 @@ async fn process_table_resources(
describe_ttl_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
enable_ddb_ttl_check: bool,
enable_gsi: bool,
) -> Result<(), CliError> {
let region = ddb_client
.config()
Expand Down Expand Up @@ -481,6 +526,16 @@ async fn process_table_resources(
});

for mut resource in resources {
// if we have disabled collecting gsi metrics, then forward the gsi to the sender and continue
if resource.resource_type == ResourceType::DynamoDbGsi && !enable_gsi {
sender
.send(Resource::DynamoDb(resource))
.await
.map_err(|err| CliError {
msg: format!("Failed to stream dynamodb resource to file: {}", err),
})?;
continue;
}
resource
.append_metrics(metrics_client, Arc::clone(&metrics_limiter))
.await?;
Expand Down
14 changes: 13 additions & 1 deletion momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::io::{copy, BufReader};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use crate::commands::cloud_linter::api_gateway::process_api_gateway_resources;
use aws_config::retry::RetryConfig;
use aws_config::{BehaviorVersion, Region};
use flate2::write::GzEncoder;
use flate2::Compression;
Expand All @@ -24,6 +26,7 @@ use super::resource::Resource;
pub async fn run_cloud_linter(
region: String,
enable_ddb_ttl_check: bool,
enable_gsi: bool,
only_collect_for_resource: Option<CloudLinterResources>,
metric_collection_rate: u32,
) -> Result<(), CliError> {
Expand All @@ -44,6 +47,7 @@ pub async fn run_cloud_linter(
region,
tx,
enable_ddb_ttl_check,
enable_gsi,
only_collect_for_resource,
metric_collection_rate,
)
Expand Down Expand Up @@ -73,11 +77,17 @@ async fn process_data(
region: String,
sender: Sender<Resource>,
enable_ddb_ttl_check: bool,
enable_gsi: bool,
only_collect_for_resource: Option<CloudLinterResources>,
metric_collection_rate: u32,
) -> Result<(), CliError> {
let retry_config = RetryConfig::adaptive()
.with_initial_backoff(Duration::from_secs(1))
.with_max_attempts(20)
.with_max_backoff(Duration::from_secs(15));
let config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(region))
.retry_config(retry_config)
.load()
.await;
check_aws_credentials(&config).await?;
Expand All @@ -88,7 +98,7 @@ async fn process_data(
let control_plane_limiter = Arc::new(RateLimiter::direct(control_plane_quota));

let describe_ttl_quota = Quota::per_second(
core::num::NonZeroU32::new(3).expect("should create non-zero describe_ttl_quota"),
core::num::NonZeroU32::new(1).expect("should create non-zero describe_ttl_quota"),
);
let describe_ttl_limiter = Arc::new(RateLimiter::direct(describe_ttl_quota));

Expand Down Expand Up @@ -127,6 +137,7 @@ async fn process_data(
Arc::clone(&describe_ttl_limiter),
sender.clone(),
enable_ddb_ttl_check,
enable_gsi,
)
.await?;
return Ok(());
Expand Down Expand Up @@ -175,6 +186,7 @@ async fn process_data(
Arc::clone(&describe_ttl_limiter),
sender.clone(),
enable_ddb_ttl_check,
enable_gsi,
)
.await?;

Expand Down
33 changes: 32 additions & 1 deletion momento/src/commands/cloud_linter/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ pub(crate) trait AppendMetrics {
) -> Result<(), CliError>;
}

// impl<T> AppendMetrics for T
// where
// T: ResourceWithMetrics,
// {
// async fn append_metrics(
// &mut self,
// metrics_client: &Client,
// limiter: Arc<DefaultDirectRateLimiter>,
// ) -> Result<(), CliError> {
// let metric_targets = self.create_metric_targets()?;
// let mut metrics: Vec<Vec<Metric>> = Vec::new();
// for target in metric_targets {
// metrics.push(
// query_metrics_for_target(metrics_client, Arc::clone(&limiter), target).await?,
// );
// }
// self.set_metrics(metrics.into_iter().flatten().collect());
// self.set_metric_period_seconds(60 * 60 * 24);

// Ok(())
// }
// }

impl<T> AppendMetrics for T
where
T: ResourceWithMetrics,
Expand Down Expand Up @@ -153,7 +176,15 @@ async fn query_metrics_for_target(
.send();

while let Some(result) = rate_limit(Arc::clone(&limiter), || metric_stream.next()).await {
let result = result?;
let result = match result {
Ok(res) => res,
Err(e) => {
println!("get_metric_data_error: {:?}", e);
return Err(CliError {
msg: "error from aws api while querying metrics".to_string(),
});
}
};
if let Some(mdr_vec) = result.metric_data_results {
for mdr in mdr_vec {
let name = mdr.id.ok_or_else(|| CliError {
Expand Down
6 changes: 4 additions & 2 deletions momento/src/commands/cloud_linter/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,10 @@ async fn process_buckets(
) -> Result<(), CliError> {
let process_buckets_bar =
ProgressBar::new((buckets.len() * 2) as u64).with_message("Processing S3 Buckets");
process_buckets_bar
.set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template"));
process_buckets_bar.set_style(
ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"),
);

let futures = FuturesUnordered::new();
for bucket in buckets {
let s3_client_clone = s3client.clone();
Expand Down
2 changes: 2 additions & 0 deletions momento/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,12 @@ async fn run_momento_command(args: momento_cli_opts::Momento) -> Result<(), CliE
enable_ddb_ttl_check,
resource,
metric_collection_rate,
enable_gsi,
} => {
commands::cloud_linter::linter_cli::run_cloud_linter(
region,
enable_ddb_ttl_check,
enable_gsi,
resource,
metric_collection_rate,
)
Expand Down

0 comments on commit 8193607

Please sign in to comment.