Skip to content

Commit

Permalink
feat: speed up resource collection for elasticache (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
bruuuuuuuce authored Jun 3, 2024
1 parent bd007ac commit 0cd6460
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 165 deletions.
244 changes: 156 additions & 88 deletions momento/src/commands/cloud_linter/elasticache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::time::Duration;

use aws_config::SdkConfig;
use aws_sdk_elasticache::types::CacheCluster;
use futures::stream::FuturesUnordered;
use governor::DefaultDirectRateLimiter;
use indicatif::ProgressBar;
use indicatif::{ProgressBar, ProgressStyle};
use phf::{phf_map, Map};
use serde::Serialize;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -116,30 +117,98 @@ pub(crate) async fn process_elasticache_resources(
})?;

let elasticache_client = aws_sdk_elasticache::Client::new(config);
let clusters = describe_clusters(&elasticache_client, control_plane_limiter).await?;
let metrics_client = aws_sdk_cloudwatch::Client::new(config);
process_resources(
&elasticache_client,
&metrics_client,
control_plane_limiter,
metrics_limiter,
region,
sender,
)
.await?;

write_resources(clusters, config, region, sender, metrics_limiter).await?;
Ok(())
}

async fn describe_clusters(
async fn process_resources(
elasticache_client: &aws_sdk_elasticache::Client,
limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<Vec<CacheCluster>, CliError> {
metrics_client: &aws_sdk_cloudwatch::Client,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
region: &str,
sender: Sender<Resource>,
) -> Result<(), CliError> {
let bar = ProgressBar::new_spinner().with_message("Describing ElastiCache clusters");
bar.enable_steady_tick(Duration::from_millis(100));
let mut elasticache_clusters = Vec::new();
bar.set_style(
ProgressStyle::with_template("{spinner:.green} {pos:>7} {msg}")
.expect("template should be valid")
// For more spinners check out the cli-spinners project:
// https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json
.tick_strings(&[
"▹▹▹▹▹",
"▸▹▹▹▹",
"▹▸▹▹▹",
"▹▹▸▹▹",
"▹▹▹▸▹",
"▹▹▹▹▸",
"▪▪▪▪▪",
]),
);
let mut elasticache_stream = elasticache_client
.describe_cache_clusters()
.show_cache_node_info(true)
.into_paginator()
.send();

while let Some(result) = rate_limit(Arc::clone(&limiter), || elasticache_stream.next()).await {
while let Some(result) = rate_limit(Arc::clone(&control_plane_limiter), || {
elasticache_stream.next()
})
.await
{
match result {
Ok(result) => {
if let Some(clusters) = result.cache_clusters {
elasticache_clusters.extend(clusters);
if let Some(aws_clusters) = result.cache_clusters {
let mut chunks = Vec::new();
for chunk in aws_clusters.chunks(10) {
chunks.push(chunk.to_owned());
}
for clusters in chunks {
let futures = FuturesUnordered::new();
for cluster in clusters {
let metrics_client_clone = metrics_client.clone();
let region_clone = region.to_string().clone();
let sender_clone = sender.clone();
let metrics_limiter_clone = Arc::clone(&metrics_limiter);
let bar_clone = bar.clone();
let spawn = tokio::spawn(async move {
write_resource(
cluster,
metrics_client_clone,
region_clone.as_str(),
sender_clone,
metrics_limiter_clone,
bar_clone,
)
.await
});
futures.push(spawn);
}
let all_results = futures::future::join_all(futures).await;
for result in all_results {
match result {
// bubble up any cli errors that we came across
Ok(res) => res?,
Err(_) => {
println!("failed to process elasticache resources");
return Err(CliError {
msg: "failed to wait for all elasticache resources to collect data".to_string(),
});
}
}
}
}
}
}
Err(err) => {
Expand All @@ -151,98 +220,96 @@ async fn describe_clusters(
}
bar.finish();

Ok(elasticache_clusters)
Ok(())
}

async fn write_resources(
clusters: Vec<CacheCluster>,
config: &SdkConfig,
async fn write_resource(
cluster: CacheCluster,
metrics_client: aws_sdk_cloudwatch::Client,
region: &str,
sender: Sender<Resource>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
bar: ProgressBar,
) -> Result<(), CliError> {
let metrics_client = aws_sdk_cloudwatch::Client::new(config);
let mut resources: Vec<Resource> = Vec::new();
let mut resources = Vec::new();

for cluster in clusters {
let cache_cluster_id = cluster.cache_cluster_id.ok_or(CliError {
msg: "ElastiCache cluster has no ID".to_string(),
})?;
let cache_node_type = cluster.cache_node_type.ok_or(CliError {
msg: "ElastiCache cluster has no node type".to_string(),
})?;
let preferred_az = cluster.preferred_availability_zone.ok_or(CliError {
msg: "ElastiCache cluster has no preferred availability zone".to_string(),
})?;
let cache_cluster_id = cluster.cache_cluster_id.ok_or(CliError {
msg: "ElastiCache cluster has no ID".to_string(),
})?;
let cache_node_type = cluster.cache_node_type.ok_or(CliError {
msg: "ElastiCache cluster has no node type".to_string(),
})?;
let preferred_az = cluster.preferred_availability_zone.ok_or(CliError {
msg: "ElastiCache cluster has no preferred availability zone".to_string(),
})?;

let engine = cluster.engine.ok_or(CliError {
msg: "ElastiCache cluster has no engine type".to_string(),
})?;
match engine.as_str() {
"redis" => {
let (cluster_id, cluster_mode_enabled) = cluster
.replication_group_id
.map(|replication_group_id| {
let trimmed_cluster_id = cache_cluster_id.clone();
let trimmed_cluster_id = trimmed_cluster_id
.trim_start_matches(&format!("{}-", replication_group_id));
let parts_len = trimmed_cluster_id.split('-').count();
(replication_group_id, parts_len == 2)
})
.unwrap_or_else(|| (cache_cluster_id.clone(), false));
let engine = cluster.engine.ok_or(CliError {
msg: "ElastiCache cluster has no engine type".to_string(),
})?;
match engine.as_str() {
"redis" => {
let (cluster_id, cluster_mode_enabled) = cluster
.replication_group_id
.map(|replication_group_id| {
let trimmed_cluster_id = cache_cluster_id.clone();
let trimmed_cluster_id = trimmed_cluster_id
.trim_start_matches(&format!("{}-", replication_group_id));
let parts_len = trimmed_cluster_id.split('-').count();
(replication_group_id, parts_len == 2)
})
.unwrap_or_else(|| (cache_cluster_id.clone(), false));

let metadata = ElastiCacheMetadata {
cluster_id,
engine,
cache_node_type,
preferred_az,
cluster_mode_enabled,
};
let metadata = ElastiCacheMetadata {
cluster_id,
engine,
cache_node_type,
preferred_az,
cluster_mode_enabled,
};

let resource = Resource::ElastiCache(ElastiCacheResource {
resource_type: ResourceType::ElastiCacheRedisNode,
region: region.to_string(),
id: cache_cluster_id.clone(),
metrics: vec![],
metric_period_seconds: 0,
metadata,
});
let resource = Resource::ElastiCache(ElastiCacheResource {
resource_type: ResourceType::ElastiCacheRedisNode,
region: region.to_string(),
id: cache_cluster_id.clone(),
metrics: vec![],
metric_period_seconds: 0,
metadata,
});

resources.push(resource);
}
"memcached" => {
let metadata = ElastiCacheMetadata {
cluster_id: cache_cluster_id,
engine,
cache_node_type,
preferred_az,
cluster_mode_enabled: false,
};
resources.push(resource);
}
"memcached" => {
let metadata = ElastiCacheMetadata {
cluster_id: cache_cluster_id,
engine,
cache_node_type,
preferred_az,
cluster_mode_enabled: false,
};

if let Some(cache_nodes) = cluster.cache_nodes {
for node in cache_nodes {
let cache_node_id = node.cache_node_id.ok_or(CliError {
msg: "Cache node has no ID".to_string(),
})?;
let resource = Resource::ElastiCache(ElastiCacheResource {
resource_type: ResourceType::ElastiCacheMemcachedNode,
region: region.to_string(),
id: cache_node_id,
metrics: vec![],
metric_period_seconds: 0,
metadata: metadata.clone(),
});
resources.push(resource)
}
if let Some(cache_nodes) = cluster.cache_nodes {
for node in cache_nodes {
let cache_node_id = node.cache_node_id.ok_or(CliError {
msg: "Cache node has no ID".to_string(),
})?;
let resource = Resource::ElastiCache(ElastiCacheResource {
resource_type: ResourceType::ElastiCacheMemcachedNode,
region: region.to_string(),
id: cache_node_id,
metrics: vec![],
metric_period_seconds: 0,
metadata: metadata.clone(),
});
resources.push(resource)
}
}
_ => {
return Err(CliError {
msg: format!("Unsupported engine: {}", engine),
});
}
};
}
}
_ => {
return Err(CliError {
msg: format!("Unsupported engine: {}", engine),
});
}
};

for resource in resources {
match resource {
Expand All @@ -255,6 +322,7 @@ async fn write_resources(
.map_err(|err| CliError {
msg: format!("Failed to send elasticache resource: {}", err),
})?;
bar.inc(1);
}
_ => {
return Err(CliError {
Expand Down
4 changes: 2 additions & 2 deletions momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ async fn process_data(
metric_collection_rate: u32,
) -> Result<(), CliError> {
let retry_config = RetryConfig::adaptive()
.with_initial_backoff(Duration::from_secs(1))
.with_initial_backoff(Duration::from_millis(250))
.with_max_attempts(20)
.with_max_backoff(Duration::from_secs(15));
.with_max_backoff(Duration::from_secs(5));
let config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(region))
.retry_config(retry_config)
Expand Down
1 change: 1 addition & 0 deletions momento/src/commands/cloud_linter/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ async fn query_metrics_for_target(
});
}
};
// let result = result?;
if let Some(mdr_vec) = result.metric_data_results {
for mdr in mdr_vec {
let name = mdr.id.ok_or_else(|| CliError {
Expand Down
2 changes: 1 addition & 1 deletion momento/src/commands/cloud_linter/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ async fn process_buckets(
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<(), CliError> {
let process_buckets_bar =
ProgressBar::new((buckets.len() * 2) as u64).with_message("Processing S3 Buckets");
ProgressBar::new((buckets.len()) as u64).with_message("Processing S3 Buckets");
process_buckets_bar.set_style(
ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"),
);
Expand Down
Loading

0 comments on commit 0cd6460

Please sign in to comment.