From 0cd646012c21421c64865ee3d6fc08b703f10d59 Mon Sep 17 00:00:00 2001 From: Matt Straathof <11823378+bruuuuuuuce@users.noreply.github.com> Date: Mon, 3 Jun 2024 16:56:13 -0700 Subject: [PATCH] feat: speed up resource collection for elasticache (#327) --- .../src/commands/cloud_linter/elasticache.rs | 244 +++++++++++------- .../src/commands/cloud_linter/linter_cli.rs | 4 +- momento/src/commands/cloud_linter/metrics.rs | 1 + momento/src/commands/cloud_linter/s3.rs | 2 +- .../cloud_linter/serverless_elasticache.rs | 216 ++++++++++------ 5 files changed, 302 insertions(+), 165 deletions(-) diff --git a/momento/src/commands/cloud_linter/elasticache.rs b/momento/src/commands/cloud_linter/elasticache.rs index 234d209..2850f37 100644 --- a/momento/src/commands/cloud_linter/elasticache.rs +++ b/momento/src/commands/cloud_linter/elasticache.rs @@ -4,8 +4,9 @@ use std::time::Duration; use aws_config::SdkConfig; use aws_sdk_elasticache::types::CacheCluster; +use futures::stream::FuturesUnordered; use governor::DefaultDirectRateLimiter; -use indicatif::ProgressBar; +use indicatif::{ProgressBar, ProgressStyle}; use phf::{phf_map, Map}; use serde::Serialize; use tokio::sync::mpsc::Sender; @@ -116,30 +117,98 @@ pub(crate) async fn process_elasticache_resources( })?; let elasticache_client = aws_sdk_elasticache::Client::new(config); - let clusters = describe_clusters(&elasticache_client, control_plane_limiter).await?; + let metrics_client = aws_sdk_cloudwatch::Client::new(config); + process_resources( + &elasticache_client, + &metrics_client, + control_plane_limiter, + metrics_limiter, + region, + sender, + ) + .await?; - write_resources(clusters, config, region, sender, metrics_limiter).await?; Ok(()) } -async fn describe_clusters( +async fn process_resources( elasticache_client: &aws_sdk_elasticache::Client, - limiter: Arc, -) -> Result, CliError> { + metrics_client: &aws_sdk_cloudwatch::Client, + control_plane_limiter: Arc, + metrics_limiter: Arc, + region: &str, + sender: Sender, +) -> Result<(), CliError> { let bar = ProgressBar::new_spinner().with_message("Describing ElastiCache clusters"); bar.enable_steady_tick(Duration::from_millis(100)); - let mut elasticache_clusters = Vec::new(); + bar.set_style( + ProgressStyle::with_template("{spinner:.green} {pos:>7} {msg}") + .expect("template should be valid") + // For more spinners check out the cli-spinners project: + // https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json + .tick_strings(&[ + "▹▹▹▹▹", + "▸▹▹▹▹", + "▹▸▹▹▹", + "▹▹▸▹▹", + "▹▹▹▸▹", + "▹▹▹▹▸", + "▪▪▪▪▪", + ]), + ); let mut elasticache_stream = elasticache_client .describe_cache_clusters() .show_cache_node_info(true) .into_paginator() .send(); - while let Some(result) = rate_limit(Arc::clone(&limiter), || elasticache_stream.next()).await { + while let Some(result) = rate_limit(Arc::clone(&control_plane_limiter), || { + elasticache_stream.next() + }) + .await + { match result { Ok(result) => { - if let Some(clusters) = result.cache_clusters { - elasticache_clusters.extend(clusters); + if let Some(aws_clusters) = result.cache_clusters { + let mut chunks = Vec::new(); + for chunk in aws_clusters.chunks(10) { + chunks.push(chunk.to_owned()); + } + for clusters in chunks { + let futures = FuturesUnordered::new(); + for cluster in clusters { + let metrics_client_clone = metrics_client.clone(); + let region_clone = region.to_string().clone(); + let sender_clone = sender.clone(); + let metrics_limiter_clone = Arc::clone(&metrics_limiter); + let bar_clone = bar.clone(); + let spawn = tokio::spawn(async move { + write_resource( + cluster, + metrics_client_clone, + region_clone.as_str(), + sender_clone, + metrics_limiter_clone, + bar_clone, + ) + .await + }); + futures.push(spawn); + } + let all_results = futures::future::join_all(futures).await; + for result in all_results { + match result { + // bubble up any cli errors that we came across + Ok(res) => res?, + Err(_) => { + println!("failed to process elasticache resources"); + return Err(CliError { + msg: "failed to wait for all elasticache resources to collect data".to_string(), + }); + } + } + } + } } } Err(err) => { @@ -151,98 +220,96 @@ async fn describe_clusters( } bar.finish(); - Ok(elasticache_clusters) + Ok(()) } -async fn write_resources( - clusters: Vec, - config: &SdkConfig, +async fn write_resource( + cluster: CacheCluster, + metrics_client: aws_sdk_cloudwatch::Client, region: &str, sender: Sender, metrics_limiter: Arc, + bar: ProgressBar, ) -> Result<(), CliError> { - let metrics_client = aws_sdk_cloudwatch::Client::new(config); - let mut resources: Vec = Vec::new(); + let mut resources = Vec::new(); - for cluster in clusters { - let cache_cluster_id = cluster.cache_cluster_id.ok_or(CliError { - msg: "ElastiCache cluster has no ID".to_string(), - })?; - let cache_node_type = cluster.cache_node_type.ok_or(CliError { - msg: "ElastiCache cluster has no node type".to_string(), - })?; - let preferred_az = cluster.preferred_availability_zone.ok_or(CliError { - msg: "ElastiCache cluster has no preferred availability zone".to_string(), - })?; + let cache_cluster_id = cluster.cache_cluster_id.ok_or(CliError { + msg: "ElastiCache cluster has no ID".to_string(), + })?; + let cache_node_type = cluster.cache_node_type.ok_or(CliError { + msg: "ElastiCache cluster has no node type".to_string(), + })?; + let preferred_az = cluster.preferred_availability_zone.ok_or(CliError { + msg: "ElastiCache cluster has no preferred availability zone".to_string(), + })?; - let engine = cluster.engine.ok_or(CliError { - msg: "ElastiCache cluster has no engine type".to_string(), - })?; - match engine.as_str() { - "redis" => { - let (cluster_id, cluster_mode_enabled) = cluster - .replication_group_id - .map(|replication_group_id| { - let trimmed_cluster_id = cache_cluster_id.clone(); - let trimmed_cluster_id = trimmed_cluster_id - .trim_start_matches(&format!("{}-", replication_group_id)); - let parts_len = trimmed_cluster_id.split('-').count(); - (replication_group_id, parts_len == 2) - }) - .unwrap_or_else(|| (cache_cluster_id.clone(), false)); + let engine = cluster.engine.ok_or(CliError { + msg: "ElastiCache cluster has no engine type".to_string(), + })?; + match engine.as_str() { + "redis" => { + let (cluster_id, cluster_mode_enabled) = cluster + .replication_group_id + .map(|replication_group_id| { + let trimmed_cluster_id = cache_cluster_id.clone(); + let trimmed_cluster_id = trimmed_cluster_id + .trim_start_matches(&format!("{}-", replication_group_id)); + let parts_len = trimmed_cluster_id.split('-').count(); + (replication_group_id, parts_len == 2) + }) + .unwrap_or_else(|| (cache_cluster_id.clone(), false)); - let metadata = ElastiCacheMetadata { - cluster_id, - engine, - cache_node_type, - preferred_az, - cluster_mode_enabled, - }; + let metadata = ElastiCacheMetadata { + cluster_id, + engine, + cache_node_type, + preferred_az, + cluster_mode_enabled, + }; - let resource = Resource::ElastiCache(ElastiCacheResource { - resource_type: ResourceType::ElastiCacheRedisNode, - region: region.to_string(), - id: cache_cluster_id.clone(), - metrics: vec![], - metric_period_seconds: 0, - metadata, - }); + let resource = Resource::ElastiCache(ElastiCacheResource { + resource_type: ResourceType::ElastiCacheRedisNode, + region: region.to_string(), + id: cache_cluster_id.clone(), + metrics: vec![], + metric_period_seconds: 0, + metadata, + }); - resources.push(resource); - } - "memcached" => { - let metadata = ElastiCacheMetadata { - cluster_id: cache_cluster_id, - engine, - cache_node_type, - preferred_az, - cluster_mode_enabled: false, - }; + resources.push(resource); + } + "memcached" => { + let metadata = ElastiCacheMetadata { + cluster_id: cache_cluster_id, + engine, + cache_node_type, + preferred_az, + cluster_mode_enabled: false, + }; - if let Some(cache_nodes) = cluster.cache_nodes { - for node in cache_nodes { - let cache_node_id = node.cache_node_id.ok_or(CliError { - msg: "Cache node has no ID".to_string(), - })?; - let resource = Resource::ElastiCache(ElastiCacheResource { - resource_type: ResourceType::ElastiCacheMemcachedNode, - region: region.to_string(), - id: cache_node_id, - metrics: vec![], - metric_period_seconds: 0, - metadata: metadata.clone(), - }); - resources.push(resource) - } + if let Some(cache_nodes) = cluster.cache_nodes { + for node in cache_nodes { + let cache_node_id = node.cache_node_id.ok_or(CliError { + msg: "Cache node has no ID".to_string(), + })?; + let resource = Resource::ElastiCache(ElastiCacheResource { + resource_type: ResourceType::ElastiCacheMemcachedNode, + region: region.to_string(), + id: cache_node_id, + metrics: vec![], + metric_period_seconds: 0, + metadata: metadata.clone(), + }); + resources.push(resource) } } - _ => { - return Err(CliError { - msg: format!("Unsupported engine: {}", engine), - }); - } - }; - } + } + _ => { + return Err(CliError { + msg: format!("Unsupported engine: {}", engine), + }); + } + }; for resource in resources { match resource { @@ -255,6 +322,7 @@ async fn write_resources( .map_err(|err| CliError { msg: format!("Failed to send elasticache resource: {}", err), })?; + bar.inc(1); } _ => { return Err(CliError { diff --git a/momento/src/commands/cloud_linter/linter_cli.rs b/momento/src/commands/cloud_linter/linter_cli.rs index a785ef6..1856ac1 100644 --- a/momento/src/commands/cloud_linter/linter_cli.rs +++ b/momento/src/commands/cloud_linter/linter_cli.rs @@ -82,9 +82,9 @@ async fn process_data( metric_collection_rate: u32, ) -> Result<(), CliError> { let retry_config = RetryConfig::adaptive() - .with_initial_backoff(Duration::from_secs(1)) + .with_initial_backoff(Duration::from_millis(250)) .with_max_attempts(20) - .with_max_backoff(Duration::from_secs(15)); + .with_max_backoff(Duration::from_secs(5)); let config = aws_config::defaults(BehaviorVersion::latest()) .region(Region::new(region)) .retry_config(retry_config) diff --git a/momento/src/commands/cloud_linter/metrics.rs b/momento/src/commands/cloud_linter/metrics.rs index 006afc1..4049619 100644 --- a/momento/src/commands/cloud_linter/metrics.rs +++ b/momento/src/commands/cloud_linter/metrics.rs @@ -185,6 +185,7 @@ async fn query_metrics_for_target( }); } }; + // let result = result?; if let Some(mdr_vec) = result.metric_data_results { for mdr in mdr_vec { let name = mdr.id.ok_or_else(|| CliError { diff --git a/momento/src/commands/cloud_linter/s3.rs b/momento/src/commands/cloud_linter/s3.rs index 296e993..3337870 100644 --- a/momento/src/commands/cloud_linter/s3.rs +++ b/momento/src/commands/cloud_linter/s3.rs @@ -243,7 +243,7 @@ async fn process_buckets( control_plane_limiter: Arc, ) -> Result<(), CliError> { let process_buckets_bar = - ProgressBar::new((buckets.len() * 2) as u64).with_message("Processing S3 Buckets"); + ProgressBar::new((buckets.len()) as u64).with_message("Processing S3 Buckets"); process_buckets_bar.set_style( ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"), ); diff --git a/momento/src/commands/cloud_linter/serverless_elasticache.rs b/momento/src/commands/cloud_linter/serverless_elasticache.rs index 7650dc6..5696101 100644 --- a/momento/src/commands/cloud_linter/serverless_elasticache.rs +++ b/momento/src/commands/cloud_linter/serverless_elasticache.rs @@ -6,8 +6,9 @@ use aws_config::SdkConfig; use aws_sdk_elasticache::types::{ CacheUsageLimits, DataStorage, DataStorageUnit, EcpuPerSecond, ServerlessCache, }; +use futures::stream::FuturesUnordered; use governor::DefaultDirectRateLimiter; -use indicatif::ProgressBar; +use indicatif::{ProgressBar, ProgressStyle}; use phf::{phf_map, Map}; use serde::Serialize; use tokio::sync::mpsc::Sender; @@ -129,30 +130,98 @@ pub(crate) async fn process_serverless_elasticache_resources( })?; let elasticache_client = aws_sdk_elasticache::Client::new(config); - let clusters = describe_clusters(&elasticache_client, control_plane_limiter).await?; + let metrics_client = aws_sdk_cloudwatch::Client::new(config); + process_resources( + &elasticache_client, + &metrics_client, + control_plane_limiter, + metrics_limiter, + region, + sender, + ) + .await?; - write_resources(clusters, config, region, sender, metrics_limiter).await?; Ok(()) } -async fn describe_clusters( +async fn process_resources( elasticache_client: &aws_sdk_elasticache::Client, - limiter: Arc, -) -> Result, CliError> { + metrics_client: &aws_sdk_cloudwatch::Client, + control_plane_limiter: Arc, + metrics_limiter: Arc, + region: &str, + sender: Sender, +) -> Result<(), CliError> { let bar = ProgressBar::new_spinner().with_message("Describing Serverless ElastiCache resources"); bar.enable_steady_tick(Duration::from_millis(100)); - let mut serverless_elasticache = Vec::new(); + bar.set_style( + ProgressStyle::with_template("{spinner:.green} {pos:>7} {msg}") + .expect("template should be valid") + // For more spinners check out the cli-spinners project: + // https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json + .tick_strings(&[ + "▹▹▹▹▹", + "▸▹▹▹▹", + "▹▸▹▹▹", + "▹▹▸▹▹", + "▹▹▹▸▹", + "▹▹▹▹▸", + "▪▪▪▪▪", + ]), + ); let mut elasticache_stream = elasticache_client .describe_serverless_caches() .into_paginator() .send(); - while let Some(result) = rate_limit(Arc::clone(&limiter), || elasticache_stream.next()).await { + while let Some(result) = rate_limit(Arc::clone(&control_plane_limiter), || { + elasticache_stream.next() + }) + .await + { match result { Ok(result) => { - if let Some(caches) = result.serverless_caches { - serverless_elasticache.extend(caches); + if let Some(aws_caches) = result.serverless_caches { + let mut chunks = Vec::new(); + for chunk in aws_caches.chunks(10) { + chunks.push(chunk.to_owned()); + } + for clusters in chunks { + let futures = FuturesUnordered::new(); + for cluster in clusters { + let metrics_client_clone = metrics_client.clone(); + let region_clone = region.to_string().clone(); + let sender_clone = sender.clone(); + let metrics_limiter_clone = Arc::clone(&metrics_limiter); + let bar_clone = bar.clone(); + let spawn = tokio::spawn(async move { + write_resource( + cluster, + metrics_client_clone, + region_clone.as_str(), + sender_clone, + metrics_limiter_clone, + bar_clone, + ) + .await + }); + futures.push(spawn); + } + let all_results = futures::future::join_all(futures).await; + for result in all_results { + match result { + // bubble up any cli errors that we came across + Ok(res) => res?, + Err(_) => { + println!("failed to process serverless elasticache resources"); + return Err(CliError { + msg: "failed to wait for all serverless elasticache resources to collect data".to_string(), + }); + } + } + } + } } } Err(err) => { @@ -164,76 +233,75 @@ async fn describe_clusters( } bar.finish(); - Ok(serverless_elasticache) + Ok(()) } -async fn write_resources( - caches: Vec, - config: &SdkConfig, +async fn write_resource( + cache: ServerlessCache, + metrics_client: aws_sdk_cloudwatch::Client, region: &str, sender: Sender, metrics_limiter: Arc, + bar: ProgressBar, ) -> Result<(), CliError> { - let metrics_client = aws_sdk_cloudwatch::Client::new(config); + let cache_name = cache.serverless_cache_name.unwrap_or_default(); + let engine = cache.engine.unwrap_or_default(); + let user_group_id = cache.user_group_id.unwrap_or_default(); + let snapshot_retention_limit = cache.snapshot_retention_limit.unwrap_or(0); + let daily_snapshot_time = cache.daily_snapshot_time.unwrap_or_default(); + + let cache_usage_limits = cache + .cache_usage_limits + .unwrap_or(CacheUsageLimits::builder().build()); + // By default, every Serverless cache can scale to a maximum of 5 TBs of data storage and 15,000,000 ECPUs per second. To control costs, you can choose to set lower usage limits so that your cache will scale to a lower maximum. + // + // When a maximum Memory usage limit is set and your cache hits that limit, then ElastiCache Serverless will begin to evict data, to reject new writes with an Out of Memory error, or both. + // + // When a maximum ECPUs/second limit is set and your cache hits that limit, then ElastiCache Serverless will begin throttling or rejecting requests. + let data_storage = cache_usage_limits.data_storage.unwrap_or( + DataStorage::builder() + .set_maximum(Some(5_000)) + .set_unit(Some(DataStorageUnit::Gb)) + .build(), + ); + + let ecpu = cache_usage_limits.ecpu_per_second.unwrap_or( + EcpuPerSecond::builder() + .set_maximum(Some(15_000_000)) + .build(), + ); + let max_data_storage_gb = data_storage.maximum.unwrap_or(5_000); + let data_storage_unit = data_storage.unit.unwrap_or(DataStorageUnit::Gb); + + let metadata = ServerlessElastiCacheMetadata { + name: cache_name.clone(), + engine, + max_data_storage_gb, + max_ecpu_per_second: ecpu.maximum.unwrap_or_default(), + snapshot_retention_limit, + daily_snapshot_time, + user_group_id, + data_storage_unit: data_storage_unit.to_string(), + engine_version: cache.full_engine_version.unwrap_or_default(), + }; + + let mut serverless_ec_resource = ServerlessElastiCacheResource { + resource_type: ResourceType::ServerlessElastiCache, + region: region.to_string(), + id: cache_name, + metrics: vec![], + metric_period_seconds: 0, + metadata, + }; + serverless_ec_resource + .append_metrics(&metrics_client, Arc::clone(&metrics_limiter)) + .await?; + + let resource = Resource::ServerlessElastiCache(serverless_ec_resource); + sender.send(resource).await.map_err(|err| CliError { + msg: format!("Failed to send serverless elasticache resource: {}", err), + })?; + bar.inc(1); - for cache in caches { - let cache_name = cache.serverless_cache_name.unwrap_or_default(); - let engine = cache.engine.unwrap_or_default(); - let user_group_id = cache.user_group_id.unwrap_or_default(); - let snapshot_retention_limit = cache.snapshot_retention_limit.unwrap_or(0); - let daily_snapshot_time = cache.daily_snapshot_time.unwrap_or_default(); - - let cache_usage_limits = cache - .cache_usage_limits - .unwrap_or(CacheUsageLimits::builder().build()); - // By default, every Serverless cache can scale to a maximum of 5 TBs of data storage and 15,000,000 ECPUs per second. To control costs, you can choose to set lower usage limits so that your cache will scale to a lower maximum. - // - // When a maximum Memory usage limit is set and your cache hits that limit, then ElastiCache Serverless will begin to evict data, to reject new writes with an Out of Memory error, or both. - // - // When a maximum ECPUs/second limit is set and your cache hits that limit, then ElastiCache Serverless will begin throttling or rejecting requests. - let data_storage = cache_usage_limits.data_storage.unwrap_or( - DataStorage::builder() - .set_maximum(Some(5_000)) - .set_unit(Some(DataStorageUnit::Gb)) - .build(), - ); - - let ecpu = cache_usage_limits.ecpu_per_second.unwrap_or( - EcpuPerSecond::builder() - .set_maximum(Some(15_000_000)) - .build(), - ); - let max_data_storage_gb = data_storage.maximum.unwrap_or(5_000); - let data_storage_unit = data_storage.unit.unwrap_or(DataStorageUnit::Gb); - - let metadata = ServerlessElastiCacheMetadata { - name: cache_name.clone(), - engine, - max_data_storage_gb, - max_ecpu_per_second: ecpu.maximum.unwrap_or_default(), - snapshot_retention_limit, - daily_snapshot_time, - user_group_id, - data_storage_unit: data_storage_unit.to_string(), - engine_version: cache.full_engine_version.unwrap_or_default(), - }; - - let mut serverless_ec_resource = ServerlessElastiCacheResource { - resource_type: ResourceType::ServerlessElastiCache, - region: region.to_string(), - id: cache_name, - metrics: vec![], - metric_period_seconds: 0, - metadata, - }; - serverless_ec_resource - .append_metrics(&metrics_client, Arc::clone(&metrics_limiter)) - .await?; - - let resource = Resource::ServerlessElastiCache(serverless_ec_resource); - sender.send(resource).await.map_err(|err| CliError { - msg: format!("Failed to send serverless elasticache resource: {}", err), - })?; - } Ok(()) }