Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
akumar1214 committed Nov 9, 2024
1 parent 0f465f2 commit 8451f65
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ElasticGraphGemspecHelper.define_elasticgraph_gem(gemspec_file: __FILE__, catego

spec.add_dependency "aws-sdk-lambda", "~> 1.125"
spec.add_dependency "aws-sdk-sqs", "~> 1.80"
spec.add_dependency "aws-sdk-cloudwatch", "~> 1.10"
spec.add_dependency "aws-sdk-cloudwatch", "~> 1.104"

# aws-sdk-sqs requires an XML library be available. On Ruby < 3 it'll use rexml from the standard library but on Ruby 3.0+
# we have to add an explicit dependency. It supports ox, oga, libxml, nokogiri or rexml, and of those, ox seems to be the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ def initialize(datastore_core:, sqs_client:, lambda_client:, cloudwatch_client:)

MINIMUM_CONCURRENCY = 2

def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, minimum_free_storage:, indexer_function_name:, cluster_name:)
def tune_indexer_concurrency(
queue_urls:,
min_cpu_target:,
max_cpu_target:,
maximum_concurrency:,
required_free_storage_in_mb:,
indexer_function_name:,
cluster_name:
)
queue_attributes = get_queue_attributes(queue_urls)
queue_arns = queue_attributes.fetch(:queue_arns)
num_messages = queue_attributes.fetch(:total_messages)
Expand All @@ -38,7 +46,7 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi

new_target_concurrency =
if num_messages.positive?
free_storage = get_min_free_storage(cluster_name)
lowest_node_free_storage_in_mb = get_lowest_node_free_storage_in_mb(cluster_name)

cpu_utilization = get_max_cpu_utilization
cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0
Expand All @@ -48,15 +56,19 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi
if current_concurrency.nil?
details_logger.log_unset
nil
elsif free_storage < minimum_free_storage
details_logger.log_pause(free_storage)
elsif lowest_node_free_storage_in_mb < required_free_storage_in_mb
details_logger.log_pause(
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb
)
MINIMUM_CONCURRENCY
elsif cpu_utilization < min_cpu_target
increase_factor = (cpu_midpoint / cpu_utilization).clamp(0.0, 1.5)
(current_concurrency * increase_factor).round.tap do |new_concurrency|
details_logger.log_increase(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
)
Expand All @@ -66,15 +78,17 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi
(current_concurrency - (current_concurrency * decrease_factor)).round.tap do |new_concurrency|
details_logger.log_decrease(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
)
end
else
details_logger.log_no_change(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency
)
current_concurrency
Expand Down Expand Up @@ -103,7 +117,7 @@ def get_max_cpu_utilization
end.max.to_f
end

def get_min_free_storage(cluster_name)
def get_lowest_node_free_storage_in_mb(cluster_name)
metric_response = @cloudwatch_client.get_metric_data({
start_time: ::Time.now - 1200, # past 20 minutes
end_time: ::Time.now,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,43 @@ def initialize(
}
end

def log_increase(cpu_utilization:, min_free_storage:, current_concurrency:, new_concurrency:)
def log_increase(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:, new_concurrency:)
log_result({
"action" => "increase",
"cpu_utilization" => cpu_utilization,
"min_free_storage" => min_free_storage,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_decrease(cpu_utilization:, min_free_storage:, current_concurrency:, new_concurrency:)
def log_decrease(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:, new_concurrency:)
log_result({
"action" => "decrease",
"cpu_utilization" => cpu_utilization,
"min_free_storage" => min_free_storage,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_no_change(cpu_utilization:, min_free_storage:, current_concurrency:)
def log_no_change(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:)
log_result({
"action" => "no_change",
"cpu_utilization" => cpu_utilization,
"min_free_storage" => min_free_storage,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency
})
end

def log_pause(min_free_storage)
def log_pause(lowest_node_free_storage_in_mb:, required_free_storage_in_mb:)
log_result({
"action" => "pause",
"min_free_storage" => min_free_storage
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb
})
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def handle_request(event:, context:)
min_cpu_target: event.fetch("min_cpu_target"),
max_cpu_target: event.fetch("max_cpu_target"),
maximum_concurrency: event.fetch("maximum_concurrency"),
minimum_free_storage: event.fetch("minimum_free_storage"),
required_free_storage_in_mb: event.fetch("required_free_storage_in_mb"),
indexer_function_name: event.fetch("indexer_function_name"),
cluster_name: event.fetch("cluster_name")
)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module ElasticGraph
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
maximum_concurrency: ::Integer,
minimum_free_storage: ::Integer,
required_free_storage_in_mb: ::Integer,
indexer_function_name: ::String,
cluster_name: ::String
) -> void
Expand All @@ -29,7 +29,7 @@ module ElasticGraph
@cloudwatch_client: Aws::CloudWatch::Client

def get_max_cpu_utilization: () -> ::Float
def get_min_free_storage: (::String) -> ::Float
def get_lowest_node_free_storage_in_mb: (::String) -> ::Float
def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] }
def get_concurrency: (::String) -> ::Integer?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,36 @@ module ElasticGraph
queue_urls: ::Array[::String],
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
num_messages: ::Integer,
num_messages: ::Integer
) -> void

def log_increase: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_decrease: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_no_change: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer
) -> void

def log_pause: (::Float) -> void
def log_pause: (
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer
) -> void

def log_reset: () -> void

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ class IndexerAutoscalerLambda
let(:max_cpu_target) { 80 }
let(:cpu_midpoint) { 75 }
let(:maximum_concurrency) { 1000 }
let(:minimum_free_storage) { 10000 }
let(:required_free_storage_in_mb) { 10000 }

it "1.5x the concurrency when the CPU usage is significantly below the minimum target" do
lambda_client = lambda_client_with_concurrency(200)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(10.0),
sqs_client: sqs_client_with_number_of_messages(1),
Expand All @@ -43,7 +43,7 @@ class IndexerAutoscalerLambda
it "increases concurrency by a factor CPU usage when CPU is slightly below the minimum target" do
# CPU is at 50% and our target range is 70-80. 75 / 50 = 1.5, so increase it by 50%.
lambda_client = lambda_client_with_concurrency(200)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(50.0),
sqs_client: sqs_client_with_number_of_messages(1),
Expand All @@ -59,7 +59,7 @@ class IndexerAutoscalerLambda
it "sets concurrency to the max when it cannot be increased anymore when CPU usage is under the limit" do
current_concurrency = maximum_concurrency - 1
lambda_client = lambda_client_with_concurrency(current_concurrency)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(10),
sqs_client: sqs_client_with_number_of_messages(1),
Expand All @@ -75,7 +75,7 @@ class IndexerAutoscalerLambda
it "decreases concurrency by a factor of the CPU when the CPU usage is over the limit" do
# CPU is at 90% and our target range is 70-80. 90 / 75 = 1.2, so decrease it by 20%.
lambda_client = lambda_client_with_concurrency(500)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(90.0),
sqs_client: sqs_client_with_number_of_messages(1),
Expand All @@ -91,7 +91,7 @@ class IndexerAutoscalerLambda
it "leaves concurrency unchanged when it cannot be decreased anymore when CPU utilization is over the limit" do
current_concurrency = 0
lambda_client = lambda_client_with_concurrency(current_concurrency)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(100),
sqs_client: sqs_client_with_number_of_messages(1),
Expand All @@ -106,7 +106,7 @@ class IndexerAutoscalerLambda

it "does not adjust concurrency when the CPU is within the target range" do
lambda_client = lambda_client_with_concurrency(500)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)
[min_cpu_target, cpu_midpoint, max_cpu_target].each do |cpu_usage|
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(cpu_usage),
Expand All @@ -127,7 +127,7 @@ class IndexerAutoscalerLambda
expect(high_cpu_usage).to be > max_cpu_target

lambda_client = lambda_client_with_concurrency(current_concurrency)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(min_cpu_target, high_cpu_usage),
sqs_client: sqs_client_with_number_of_messages(1),
Expand All @@ -141,9 +141,9 @@ class IndexerAutoscalerLambda
expect(updated_concurrency_requested_from(lambda_client)).to eq [460] # 500 - 8% since 81/75 = 1.08
end

it "resets the concurrency when free storage space drops below the minimum regardless of cpu" do
it "pauses the concurrency when free storage space drops below the threshold regardless of cpu" do
lambda_client = lambda_client_with_concurrency(500)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage - 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb - 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1),
sqs_client: sqs_client_with_number_of_messages(1),
Expand All @@ -159,7 +159,7 @@ class IndexerAutoscalerLambda
it "sets concurrency to the min when there are no messages in the queue" do
current_concurrency = 500
lambda_client = lambda_client_with_concurrency(current_concurrency)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1),
sqs_client: sqs_client_with_number_of_messages(0),
Expand All @@ -174,7 +174,7 @@ class IndexerAutoscalerLambda

it "leaves concurrency unset if it is currently unset" do
lambda_client = lambda_client_without_concurrency
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1)

# CPU is at 50% and our target range is 70-80.
concurrency_scaler = build_concurrency_scaler(
Expand Down Expand Up @@ -277,7 +277,7 @@ def tune_indexer_concurrency(concurrency_scaler)
min_cpu_target: min_cpu_target,
max_cpu_target: max_cpu_target,
maximum_concurrency: maximum_concurrency,
minimum_free_storage: minimum_free_storage,
required_free_storage_in_mb: required_free_storage_in_mb,
indexer_function_name: indexer_function_name,
cluster_name: "some-eg-cluster"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"min_cpu_target" => 70,
"max_cpu_target" => 80,
"maximum_concurrency" => 1000,
"minimum_free_storage" => 100,
"required_free_storage_in_mb" => 100,
"cluster_name" => "some-eg-cluster",
"indexer_function_name" => "some-eg-app-indexer"
}
Expand Down
2 changes: 2 additions & 0 deletions rbs_collection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ gems:
ignore: true

# Use `ignore: false` to tell rbs collection to pull the RBS signatures from these gems.
- name: aws-sdk-cloudwatch
ignore: false
- name: aws-sdk-lambda
ignore: false
- name: aws-sdk-sqs
Expand Down

0 comments on commit 8451f65

Please sign in to comment.