Skip to content

Commit

Permalink
chore: stop processing directory buckets and handle bucket redirects
Browse files Browse the repository at this point in the history
  • Loading branch information
pgautier404 committed May 22, 2024
1 parent 5c03423 commit a5052e9
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 51 deletions.
8 changes: 7 additions & 1 deletion momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ async fn process_data(region: String, sender: Sender<Resource>) -> Result<(), Cl
Quota::per_second(core::num::NonZeroU32::new(20).expect("should create non-zero quota"));
let metrics_limiter = Arc::new(RateLimiter::direct(metrics_quota));

process_s3_resources(&config, Arc::clone(&metrics_limiter), sender.clone()).await?;
process_s3_resources(
&config,
Arc::clone(&metrics_limiter),
Arc::clone(&control_plane_limiter),
sender.clone(),
)
.await?;

process_ddb_resources(
&config,
Expand Down
103 changes: 53 additions & 50 deletions momento/src/commands/cloud_linter/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::commands::cloud_linter::metrics::{
AppendMetrics, Metric, MetricTarget, ResourceWithMetrics,
};
use crate::commands::cloud_linter::resource::{Resource, ResourceType, S3Resource};
use crate::commands::cloud_linter::utils::rate_limit;
use crate::error::CliError;
use aws_config::SdkConfig;
use aws_sdk_s3::types::MetricsConfiguration;
Expand All @@ -11,6 +12,7 @@ use phf::{phf_map, Map};
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
use aws_sdk_s3::error::ProvideErrorMetadata;
use tokio::sync::mpsc::Sender;

const S3_METRICS_STANDARD_STORAGE_TYPES: Map<&'static str, &'static [&'static str]> = phf_map! {
Expand Down Expand Up @@ -64,8 +66,6 @@ const S3_METRICS_REQUEST: Map<&'static str, &'static [&'static str]> = phf_map!

#[derive(Serialize, Clone, Debug)]
pub(crate) struct S3Metadata {
#[serde(rename = "bucketType")]
bucket_type: String,
#[serde(rename = "requestMetricsFilter")]
request_metrics_filter: String,
}
Expand Down Expand Up @@ -126,6 +126,7 @@ impl ResourceWithMetrics for S3Resource {
pub(crate) async fn process_s3_resources(
config: &SdkConfig,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
) -> Result<(), CliError> {
let region = config.region().map(|r| r.as_ref()).ok_or(CliError {
Expand All @@ -135,35 +136,22 @@ pub(crate) async fn process_s3_resources(
let metrics_client = aws_sdk_cloudwatch::Client::new(config);

let list_buckets_bar =
ProgressBar::new_spinner().with_message("Listing S3 General Purpose Buckets");
ProgressBar::new_spinner().with_message("Listing S3 Buckets");
list_buckets_bar.enable_steady_tick(std::time::Duration::from_millis(100));
let bucket_names = list_buckets(&s3client).await?;
let bucket_names = list_buckets(&s3client).await.unwrap_or_else(|err| {
eprint!("{}", err);
vec![]
});
list_buckets_bar.finish();

process_buckets(
s3client.clone(),
bucket_names,
"general_purpose",
region,
sender.clone(),
&metrics_client,
&metrics_limiter,
)
.await?;

let list_buckets_bar = ProgressBar::new_spinner().with_message("Listing S3 Directory Buckets");
list_buckets_bar.enable_steady_tick(std::time::Duration::from_millis(100));
let bucket_names = list_directory_buckets(&s3client).await?;
list_buckets_bar.finish();

process_buckets(
s3client.clone(),
bucket_names,
"directory",
region,
sender,
&metrics_client,
&metrics_limiter,
metrics_limiter.clone(),
control_plane_limiter.clone(),
)
.await?;

Expand All @@ -173,16 +161,20 @@ pub(crate) async fn process_s3_resources(
async fn list_bucket_metrics_configs(
s3client: aws_sdk_s3::Client,
bucket: String,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<Vec<MetricsConfiguration>, CliError> {
let mut all_configs: Vec<MetricsConfiguration> = Vec::new();
let mut continuation_token: Option<String> = None;
loop {
let configs = s3client
.list_bucket_metrics_configurations()
.bucket(&bucket)
.continuation_token(continuation_token.unwrap_or_default())
.send()
.await;
let configs = rate_limit(Arc::clone(&control_plane_limiter), || async {
s3client
.list_bucket_metrics_configurations()
.bucket(&bucket)
.continuation_token(continuation_token.unwrap_or_default())
.send()
.await
})
.await;
match configs {
Ok(configs) => {
if configs.metrics_configuration_list.is_none() {
Expand All @@ -198,6 +190,13 @@ async fn list_bucket_metrics_configs(
}
}
Err(err) => {
if err.code() == Some("PermanentRedirect") {
// https://github.com/awslabs/aws-sdk-rust/issues/183
// There may be some extra processing we can do to follow the redirect we're getting
// here, but for now we'll just print an error.
eprintln!("Skipping redirected bucket {}", bucket);
break;
}
return Err(CliError {
msg: format!("Failed to get bucket metrics configuration: {}", err),
});
Expand All @@ -210,8 +209,14 @@ async fn list_bucket_metrics_configs(
async fn try_get_bucket_metrics_filter(
s3client: aws_sdk_s3::Client,
bucket: String,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<String, CliError> {
let bucket_metrics = list_bucket_metrics_configs(s3client.clone(), bucket.clone()).await;
let bucket_metrics = list_bucket_metrics_configs(
s3client.clone(),
bucket.clone(),
Arc::clone(&control_plane_limiter),
)
.await;
match bucket_metrics {
Ok(bucket_metrics) => {
for config in bucket_metrics {
Expand All @@ -223,21 +228,22 @@ async fn try_get_bucket_metrics_filter(
}
Err(err) => {
return Err(CliError {
msg: format!("Failed to get bucket metrics configuration: {}", err),
msg: format!("{}", err),
});
}
}
Ok("".to_string())
}

#[allow(clippy::too_many_arguments)]
async fn process_buckets(
s3client: aws_sdk_s3::Client,
buckets: Vec<String>,
bucket_type: &str,
region: &str,
sender: Sender<Resource>,
metrics_client: &aws_sdk_cloudwatch::Client,
metrics_limiter: &Arc<DefaultDirectRateLimiter>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<(), CliError> {
let mut resources: Vec<Resource> = Vec::new();

Expand All @@ -246,15 +252,22 @@ async fn process_buckets(
process_buckets_bar
.set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template"));
for bucket in buckets {
let mut all_objects_filter: String = "".to_string();
if bucket_type == "general_purpose" {
let filter_id = try_get_bucket_metrics_filter(s3client.clone(), bucket.clone()).await?;
all_objects_filter = filter_id;
}
let filter_id = try_get_bucket_metrics_filter(
s3client.clone(),
bucket.clone(),
Arc::clone(&control_plane_limiter),
)
.await;
let filter_id = match filter_id {
Ok(filter_id) => filter_id,
Err(err) => {
eprint!("{}", err);
continue;
}
};

let metadata = S3Metadata {
bucket_type: bucket_type.to_string(),
request_metrics_filter: all_objects_filter,
request_metrics_filter: filter_id,
};

let s3_resource = S3Resource {
Expand All @@ -273,7 +286,7 @@ async fn process_buckets(
match resource {
Resource::S3(mut my_resource) => {
my_resource
.append_metrics(metrics_client, Arc::clone(metrics_limiter))
.append_metrics(metrics_client, Arc::clone(&metrics_limiter))
.await?;
sender
.send(Resource::S3(my_resource))
Expand Down Expand Up @@ -303,13 +316,3 @@ async fn list_buckets(s3_client: &aws_sdk_s3::Client) -> Result<Vec<String>, Cli
}
Ok(bucket_names)
}

async fn list_directory_buckets(s3_client: &aws_sdk_s3::Client) -> Result<Vec<String>, CliError> {
let mut bucket_names = Vec::new();
let resp = s3_client.list_directory_buckets().send().await?;
let buckets = resp.buckets();
for bucket in buckets {
bucket_names.push(bucket.name().unwrap_or_default().to_string());
}
Ok(bucket_names)
}

0 comments on commit a5052e9

Please sign in to comment.