Skip to content

Commit

Permalink
tune indexer lambda by updating the reserved concurrency on the lambd…
Browse files Browse the repository at this point in the history
…a instead of updating the concurrency on the ESM
  • Loading branch information
akumar1214 committed Nov 6, 2024
1 parent c0c4e55 commit e032ddc
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ def initialize(datastore_core:, sqs_client:, lambda_client:)
@lambda_client = lambda_client
end

# AWS requires the value be in this range:
# https://docs.aws.amazon.com/lambda/latest/api/API_ScalingConfig.html#API_ScalingConfig_Contents
MAXIMUM_CONCURRENCY = 1000
MINIMUM_CONCURRENCY = 2

def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, event_source_mapping_uuids:)
def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, indexer_function_name:)
queue_attributes = get_queue_attributes(queue_urls)
queue_arns = queue_attributes.fetch(:queue_arns)
num_messages = queue_attributes.fetch(:total_messages)
Expand All @@ -43,7 +40,7 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, even
cpu_utilization = get_max_cpu_utilization
cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0

current_concurrency = get_total_concurrency(event_source_mapping_uuids)
current_concurrency = get_concurrency(indexer_function_name)

if current_concurrency.nil?
details_logger.log_unset
Expand Down Expand Up @@ -79,9 +76,10 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, even
end

if new_target_concurrency && new_target_concurrency != current_concurrency
update_event_source_mapping(
event_source_mapping_uuids: event_source_mapping_uuids,
concurrency: new_target_concurrency
update_concurrency(
indexer_function_name: indexer_function_name,
concurrency: new_target_concurrency,
maximum_concurrency: maximum_concurrency
)
end
end
Expand Down Expand Up @@ -116,29 +114,18 @@ def get_queue_attributes(queue_urls)
}
end

def get_total_concurrency(event_source_mapping_uuids)
maximum_concurrencies = event_source_mapping_uuids.map do |event_source_mapping_uuid|
@lambda_client.get_event_source_mapping(
uuid: event_source_mapping_uuid
).scaling_config&.maximum_concurrency
end.compact
maximum_concurrencies.empty? ? nil : maximum_concurrencies.sum
def get_concurrency(indexer_function_name)
@lambda_client.get_function_concurrency(
function_name: indexer_function_name
).reserved_concurrent_executions
end

def update_event_source_mapping(concurrency:, event_source_mapping_uuids:)
concurrency_per_queue = concurrency / event_source_mapping_uuids.length

target_concurrency =
concurrency_per_queue.clamp(MINIMUM_CONCURRENCY, MAXIMUM_CONCURRENCY)

event_source_mapping_uuids.map do |event_source_mapping_uuid|
@lambda_client.update_event_source_mapping(
uuid: event_source_mapping_uuid,
scaling_config: {
maximum_concurrency: target_concurrency
}
)
end
def update_concurrency(indexer_function_name:, concurrency:, maximum_concurrency:)
target_concurrency = concurrency.clamp(MINIMUM_CONCURRENCY, maximum_concurrency)
@lambda_client.put_function_concurrency(
function_name: indexer_function_name,
reserved_concurrent_executions: target_concurrency
)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def handle_request(event:, context:)
queue_urls: event.fetch("queue_urls"),
min_cpu_target: event.fetch("min_cpu_target"),
max_cpu_target: event.fetch("max_cpu_target"),
event_source_mapping_uuids: event.fetch("event_source_mapping_uuids")
maximum_concurrency: event.fetch("maximum_concurrency"),
indexer_function_name: event.fetch("indexer_function_name")
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ module ElasticGraph
lambda_client: Aws::Lambda::Client
) -> void

MAXIMUM_CONCURRENCY: ::Integer
MINIMUM_CONCURRENCY: ::Integer

def tune_indexer_concurrency: (
queue_urls: ::Array[::String],
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
event_source_mapping_uuids: ::Array[::String]
maximum_concurrency: ::Integer,
indexer_function_name: ::String
) -> void

private
Expand All @@ -26,11 +26,12 @@ module ElasticGraph

def get_max_cpu_utilization: () -> ::Float
def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] }
def get_total_concurrency: (::Array[::String]) -> ::Integer?
def get_concurrency: (::String) -> ::Integer?

def update_event_source_mapping: (
def update_concurrency: (
indexer_function_name: ::String,
concurrency: ::Integer,
event_source_mapping_uuids: ::Array[::String]
maximum_concurrency: ::Integer
) -> void
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ class IndexerAutoscalerLambda
include BuildsIndexerAutoscalerLambda

describe "#tune_indexer_concurrency" do
let(:event_source_mapping_uuid) { "uuid123" }
let(:indexer_function_name) { "indexer-lambda" }
let(:min_cpu_target) { 70 }
let(:max_cpu_target) { 80 }
let(:cpu_midpoint) { 75 }
let(:maximum_concurrency) { 1000 }

it "1.5x the concurrency when the CPU usage is significantly below the minimum target" do
lambda_client = lambda_client_with_concurrency(200)
Expand All @@ -32,7 +33,7 @@ class IndexerAutoscalerLambda

tune_indexer_concurrency(concurrency_scaler)

expect(updated_concurrencies_requested_from(lambda_client)).to eq [300] # 200 * 1.5
expect(updated_concurrency_requested_from(lambda_client)).to eq [300] # 200 * 1.5
end

it "increases concurrency by a factor CPU usage when CPU is slightly below the minimum target" do
Expand All @@ -46,11 +47,11 @@ class IndexerAutoscalerLambda

tune_indexer_concurrency(concurrency_scaler)

expect(updated_concurrencies_requested_from(lambda_client)).to eq [300] # 200 + 50%
expect(updated_concurrency_requested_from(lambda_client)).to eq [300] # 200 + 50%
end

it "sets concurrency to the max when it cannot be increased anymore when CPU usage is under the limit" do
current_concurrency = ConcurrencyScaler::MAXIMUM_CONCURRENCY - 1
current_concurrency = maximum_concurrency - 1
lambda_client = lambda_client_with_concurrency(current_concurrency)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(10),
Expand All @@ -60,7 +61,7 @@ class IndexerAutoscalerLambda

tune_indexer_concurrency(concurrency_scaler)

expect(updated_concurrencies_requested_from(lambda_client)).to eq [ConcurrencyScaler::MAXIMUM_CONCURRENCY]
expect(updated_concurrency_requested_from(lambda_client)).to eq [1000] # maximum_concurrency = 1000
end

it "decreases concurrency by a factor of the CPU when the CPU usage is over the limit" do
Expand All @@ -74,11 +75,11 @@ class IndexerAutoscalerLambda

tune_indexer_concurrency(concurrency_scaler)

expect(updated_concurrencies_requested_from(lambda_client)).to eq [400] # 500 - 20%
expect(updated_concurrency_requested_from(lambda_client)).to eq [400] # 500 - 20%
end

it "sets concurrency to the min when it cannot be decreased anymore when CPU utilization is over the limit" do
current_concurrency = ConcurrencyScaler::MINIMUM_CONCURRENCY + 1
it "leaves concurrency unchanged when it cannot be decreased anymore when CPU utilization is over the limit" do
current_concurrency = 0
lambda_client = lambda_client_with_concurrency(current_concurrency)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(100),
Expand All @@ -88,7 +89,7 @@ class IndexerAutoscalerLambda

tune_indexer_concurrency(concurrency_scaler)

expect(updated_concurrencies_requested_from(lambda_client)).to eq [ConcurrencyScaler::MINIMUM_CONCURRENCY]
expect(updated_concurrency_requested_from(lambda_client)).to eq []
end

it "does not adjust concurrency when the CPU is within the target range" do
Expand All @@ -103,7 +104,7 @@ class IndexerAutoscalerLambda
tune_indexer_concurrency(concurrency_scaler)
end

expect(updated_concurrencies_requested_from(lambda_client)).to eq []
expect(updated_concurrency_requested_from(lambda_client)).to eq []
end

it "decreases the concurrency when at least one of the node's CPU is over the limit" do
Expand All @@ -121,7 +122,7 @@ class IndexerAutoscalerLambda
tune_indexer_concurrency(concurrency_scaler)

expect(high_cpu_usage).to be > max_cpu_target
expect(updated_concurrencies_requested_from(lambda_client)).to eq [460] # 500 - 8% since 81/75 = 1.08
expect(updated_concurrency_requested_from(lambda_client)).to eq [460] # 500 - 8% since 81/75 = 1.08
end

it "sets concurrency to the min when there are no messages in the queue" do
Expand All @@ -135,7 +136,7 @@ class IndexerAutoscalerLambda

tune_indexer_concurrency(concurrency_scaler)

expect(updated_concurrencies_requested_from(lambda_client)).to eq [ConcurrencyScaler::MINIMUM_CONCURRENCY]
expect(updated_concurrency_requested_from(lambda_client)).to eq [2] # 2 is the minimum
end

it "leaves concurrency unset if it is currently unset" do
Expand All @@ -150,51 +151,15 @@ class IndexerAutoscalerLambda

tune_indexer_concurrency(concurrency_scaler)

expect(updated_concurrencies_requested_from(lambda_client)).to eq []
end

it "supports setting the concurrency on multiple sqs queues" do
current_concurrency = 500
cpu_usage = 60.0
lambda_client = lambda_client_with_concurrency(current_concurrency)

queue_urls = [
"https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name1",
"https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name2"
]

event_source_mapping_uuids = [
"event_source_mapping_uuid1",
"event_source_mapping_uuid2"
]

concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(cpu_usage),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
)

tune_indexer_concurrency(
concurrency_scaler,
queue_urls: queue_urls,
event_source_mapping_uuids: event_source_mapping_uuids
)

# Each event source mapping started with a concurrency of 500 (for a total of 1000).
# Adding 25% (since the midpoint of our target range is 25% higher than our usage of 60)
# gives us 1250 total concurrency. Dividing evenly across queues gives us 625 each.
expect(updated_concurrencies_requested_from(
lambda_client,
event_source_mapping_uuids: event_source_mapping_uuids
)).to eq [625, 625]
expect(updated_concurrency_requested_from(lambda_client)).to eq []
end
end

def updated_concurrencies_requested_from(lambda_client, event_source_mapping_uuids: [event_source_mapping_uuid])
def updated_concurrency_requested_from(lambda_client)
lambda_client.api_requests.filter_map do |req|
if req.fetch(:operation_name) == :update_event_source_mapping
expect(event_source_mapping_uuids).to include(req.dig(:params, :uuid))
req.dig(:params, :scaling_config, :maximum_concurrency)
if req.fetch(:operation_name) == :put_function_concurrency
expect(indexer_function_name).to include(req.dig(:params, :function_name))
req.dig(:params, :reserved_concurrent_executions)
end
end
end
Expand Down Expand Up @@ -233,21 +198,17 @@ def sqs_client_with_number_of_messages(num_messages)

def lambda_client_with_concurrency(concurrency)
::Aws::Lambda::Client.new(stub_responses: true).tap do |lambda_client|
lambda_client.stub_responses(:get_event_source_mapping, {
uuid: event_source_mapping_uuid,
scaling_config: {
maximum_concurrency: concurrency
}
lambda_client.stub_responses(:get_function_concurrency, {
reserved_concurrent_executions: concurrency
})
end
end

# If the concurrency on the event source mapping is not set, the scaling_config on the Lambda client will be nil.
# If the lambda is using unreserved concurrency, reserved_concurrent_executions on the Lambda client will be nil.
def lambda_client_without_concurrency
::Aws::Lambda::Client.new(stub_responses: true).tap do |lambda_client|
lambda_client.stub_responses(:get_event_source_mapping, {
uuid: event_source_mapping_uuid,
scaling_config: nil
lambda_client.stub_responses(:get_function_concurrency, {
reserved_concurrent_executions: nil
})
end
end
Expand All @@ -260,16 +221,13 @@ def build_concurrency_scaler(datastore_client:, sqs_client:, lambda_client:)
).concurrency_scaler
end

def tune_indexer_concurrency(
concurrency_scaler,
queue_urls: ["https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name"],
event_source_mapping_uuids: [event_source_mapping_uuid]
)
def tune_indexer_concurrency(concurrency_scaler)
concurrency_scaler.tune_indexer_concurrency(
queue_urls: queue_urls,
queue_urls: ["https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name"],
min_cpu_target: min_cpu_target,
max_cpu_target: max_cpu_target,
event_source_mapping_uuids: event_source_mapping_uuids
maximum_concurrency: maximum_concurrency,
indexer_function_name: indexer_function_name
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
"queue_urls" => ["https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name"],
"min_cpu_target" => 70,
"max_cpu_target" => 80,
"event_source_mapping_uuids" => ["12345678-1234-1234-1234-123456789012"]
"maximum_concurrency" => 1000,
"indexer_function_name" => "some-eg-app-indexer"
}
lambda_function.handle_request(event: event, context: {})
end
Expand Down

0 comments on commit e032ddc

Please sign in to comment.