diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb index ae519377..ab90a8a5 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb @@ -19,12 +19,9 @@ def initialize(datastore_core:, sqs_client:, lambda_client:) @lambda_client = lambda_client end - # AWS requires the value be in this range: - # https://docs.aws.amazon.com/lambda/latest/api/API_ScalingConfig.html#API_ScalingConfig_Contents - MAXIMUM_CONCURRENCY = 1000 MINIMUM_CONCURRENCY = 2 - def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, event_source_mapping_uuids:) + def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, indexer_function_name:) queue_attributes = get_queue_attributes(queue_urls) queue_arns = queue_attributes.fetch(:queue_arns) num_messages = queue_attributes.fetch(:total_messages) @@ -43,7 +40,7 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, even cpu_utilization = get_max_cpu_utilization cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0 - current_concurrency = get_total_concurrency(event_source_mapping_uuids) + current_concurrency = get_concurrency(indexer_function_name) if current_concurrency.nil? details_logger.log_unset @@ -79,9 +76,10 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, even end if new_target_concurrency && new_target_concurrency != current_concurrency - update_event_source_mapping( - event_source_mapping_uuids: event_source_mapping_uuids, - concurrency: new_target_concurrency + update_concurrency( + indexer_function_name: indexer_function_name, + concurrency: new_target_concurrency, + maximum_concurrency: maximum_concurrency ) end end @@ -116,29 +114,18 @@ def get_queue_attributes(queue_urls) } end - def get_total_concurrency(event_source_mapping_uuids) - maximum_concurrencies = event_source_mapping_uuids.map do |event_source_mapping_uuid| - @lambda_client.get_event_source_mapping( - uuid: event_source_mapping_uuid - ).scaling_config&.maximum_concurrency - end.compact - maximum_concurrencies.empty? ? nil : maximum_concurrencies.sum + def get_concurrency(indexer_function_name) + @lambda_client.get_function_concurrency( + function_name: indexer_function_name + ).reserved_concurrent_executions end - def update_event_source_mapping(concurrency:, event_source_mapping_uuids:) - concurrency_per_queue = concurrency / event_source_mapping_uuids.length - - target_concurrency = - concurrency_per_queue.clamp(MINIMUM_CONCURRENCY, MAXIMUM_CONCURRENCY) - - event_source_mapping_uuids.map do |event_source_mapping_uuid| - @lambda_client.update_event_source_mapping( - uuid: event_source_mapping_uuid, - scaling_config: { - maximum_concurrency: target_concurrency - } - ) - end + def update_concurrency(indexer_function_name:, concurrency:, maximum_concurrency:) + target_concurrency = concurrency.clamp(MINIMUM_CONCURRENCY, maximum_concurrency) + @lambda_client.put_function_concurrency( + function_name: indexer_function_name, + reserved_concurrent_executions: target_concurrency + ) end end end diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb index d6164277..f9d550f3 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb @@ -25,7 +25,8 @@ def handle_request(event:, context:) queue_urls: event.fetch("queue_urls"), min_cpu_target: event.fetch("min_cpu_target"), max_cpu_target: event.fetch("max_cpu_target"), - event_source_mapping_uuids: event.fetch("event_source_mapping_uuids") + maximum_concurrency: event.fetch("maximum_concurrency"), + indexer_function_name: event.fetch("indexer_function_name") ) end end diff --git a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs index 87af4f97..29a25142 100644 --- a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs +++ b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs @@ -7,14 +7,14 @@ module ElasticGraph lambda_client: Aws::Lambda::Client ) -> void - MAXIMUM_CONCURRENCY: ::Integer MINIMUM_CONCURRENCY: ::Integer def tune_indexer_concurrency: ( queue_urls: ::Array[::String], min_cpu_target: ::Integer, max_cpu_target: ::Integer, - event_source_mapping_uuids: ::Array[::String] + maximum_concurrency: ::Integer, + indexer_function_name: ::String ) -> void private @@ -26,11 +26,12 @@ module ElasticGraph def get_max_cpu_utilization: () -> ::Float def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] } - def get_total_concurrency: (::Array[::String]) -> ::Integer? + def get_concurrency: (::String) -> ::Integer? - def update_event_source_mapping: ( + def update_concurrency: ( + indexer_function_name: ::String, concurrency: ::Integer, - event_source_mapping_uuids: ::Array[::String] + maximum_concurrency: ::Integer ) -> void end end diff --git a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb index 1fab6150..1e949061 100644 --- a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb +++ b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb @@ -17,10 +17,11 @@ class IndexerAutoscalerLambda include BuildsIndexerAutoscalerLambda describe "#tune_indexer_concurrency" do - let(:event_source_mapping_uuid) { "uuid123" } + let(:indexer_function_name) { "indexer-lambda" } let(:min_cpu_target) { 70 } let(:max_cpu_target) { 80 } let(:cpu_midpoint) { 75 } + let(:maximum_concurrency) { 1000 } it "1.5x the concurrency when the CPU usage is significantly below the minimum target" do lambda_client = lambda_client_with_concurrency(200) @@ -32,7 +33,7 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) - expect(updated_concurrencies_requested_from(lambda_client)).to eq [300] # 200 * 1.5 + expect(updated_concurrency_requested_from(lambda_client)).to eq [300] # 200 * 1.5 end it "increases concurrency by a factor CPU usage when CPU is slightly below the minimum target" do @@ -46,11 +47,11 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) - expect(updated_concurrencies_requested_from(lambda_client)).to eq [300] # 200 + 50% + expect(updated_concurrency_requested_from(lambda_client)).to eq [300] # 200 + 50% end it "sets concurrency to the max when it cannot be increased anymore when CPU usage is under the limit" do - current_concurrency = ConcurrencyScaler::MAXIMUM_CONCURRENCY - 1 + current_concurrency = maximum_concurrency - 1 lambda_client = lambda_client_with_concurrency(current_concurrency) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(10), @@ -60,7 +61,7 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) - expect(updated_concurrencies_requested_from(lambda_client)).to eq [ConcurrencyScaler::MAXIMUM_CONCURRENCY] + expect(updated_concurrency_requested_from(lambda_client)).to eq [1000] # maximum_concurrency = 1000 end it "decreases concurrency by a factor of the CPU when the CPU usage is over the limit" do @@ -74,11 +75,11 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) - expect(updated_concurrencies_requested_from(lambda_client)).to eq [400] # 500 - 20% + expect(updated_concurrency_requested_from(lambda_client)).to eq [400] # 500 - 20% end - it "sets concurrency to the min when it cannot be decreased anymore when CPU utilization is over the limit" do - current_concurrency = ConcurrencyScaler::MINIMUM_CONCURRENCY + 1 + it "leaves concurrency unchanged when it cannot be decreased anymore when CPU utilization is over the limit" do + current_concurrency = 0 lambda_client = lambda_client_with_concurrency(current_concurrency) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(100), @@ -88,7 +89,7 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) - expect(updated_concurrencies_requested_from(lambda_client)).to eq [ConcurrencyScaler::MINIMUM_CONCURRENCY] + expect(updated_concurrency_requested_from(lambda_client)).to eq [] end it "does not adjust concurrency when the CPU is within the target range" do @@ -103,7 +104,7 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) end - expect(updated_concurrencies_requested_from(lambda_client)).to eq [] + expect(updated_concurrency_requested_from(lambda_client)).to eq [] end it "decreases the concurrency when at least one of the node's CPU is over the limit" do @@ -121,7 +122,7 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) expect(high_cpu_usage).to be > max_cpu_target - expect(updated_concurrencies_requested_from(lambda_client)).to eq [460] # 500 - 8% since 81/75 = 1.08 + expect(updated_concurrency_requested_from(lambda_client)).to eq [460] # 500 - 8% since 81/75 = 1.08 end it "sets concurrency to the min when there are no messages in the queue" do @@ -135,7 +136,7 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) - expect(updated_concurrencies_requested_from(lambda_client)).to eq [ConcurrencyScaler::MINIMUM_CONCURRENCY] + expect(updated_concurrency_requested_from(lambda_client)).to eq [2] # 2 is the minimum end it "leaves concurrency unset if it is currently unset" do @@ -150,51 +151,15 @@ class IndexerAutoscalerLambda tune_indexer_concurrency(concurrency_scaler) - expect(updated_concurrencies_requested_from(lambda_client)).to eq [] - end - - it "supports setting the concurrency on multiple sqs queues" do - current_concurrency = 500 - cpu_usage = 60.0 - lambda_client = lambda_client_with_concurrency(current_concurrency) - - queue_urls = [ - "https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name1", - "https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name2" - ] - - event_source_mapping_uuids = [ - "event_source_mapping_uuid1", - "event_source_mapping_uuid2" - ] - - concurrency_scaler = build_concurrency_scaler( - datastore_client: datastore_client_with_cpu_usage(cpu_usage), - sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client - ) - - tune_indexer_concurrency( - concurrency_scaler, - queue_urls: queue_urls, - event_source_mapping_uuids: event_source_mapping_uuids - ) - - # Each event source mapping started with a concurrency of 500 (for a total of 1000). - # Adding 25% (since the midpoint of our target range is 25% higher than our usage of 60) - # gives us 1250 total concurrency. Dividing evenly across queues gives us 625 each. - expect(updated_concurrencies_requested_from( - lambda_client, - event_source_mapping_uuids: event_source_mapping_uuids - )).to eq [625, 625] + expect(updated_concurrency_requested_from(lambda_client)).to eq [] end end - def updated_concurrencies_requested_from(lambda_client, event_source_mapping_uuids: [event_source_mapping_uuid]) + def updated_concurrency_requested_from(lambda_client) lambda_client.api_requests.filter_map do |req| - if req.fetch(:operation_name) == :update_event_source_mapping - expect(event_source_mapping_uuids).to include(req.dig(:params, :uuid)) - req.dig(:params, :scaling_config, :maximum_concurrency) + if req.fetch(:operation_name) == :put_function_concurrency + expect(indexer_function_name).to include(req.dig(:params, :function_name)) + req.dig(:params, :reserved_concurrent_executions) end end end @@ -233,21 +198,17 @@ def sqs_client_with_number_of_messages(num_messages) def lambda_client_with_concurrency(concurrency) ::Aws::Lambda::Client.new(stub_responses: true).tap do |lambda_client| - lambda_client.stub_responses(:get_event_source_mapping, { - uuid: event_source_mapping_uuid, - scaling_config: { - maximum_concurrency: concurrency - } + lambda_client.stub_responses(:get_function_concurrency, { + reserved_concurrent_executions: concurrency }) end end - # If the concurrency on the event source mapping is not set, the scaling_config on the Lambda client will be nil. + # If the lambda is using unreserved concurrency, reserved_concurrent_executions on the Lambda client will be nil. def lambda_client_without_concurrency ::Aws::Lambda::Client.new(stub_responses: true).tap do |lambda_client| - lambda_client.stub_responses(:get_event_source_mapping, { - uuid: event_source_mapping_uuid, - scaling_config: nil + lambda_client.stub_responses(:get_function_concurrency, { + reserved_concurrent_executions: nil }) end end @@ -260,16 +221,13 @@ def build_concurrency_scaler(datastore_client:, sqs_client:, lambda_client:) ).concurrency_scaler end - def tune_indexer_concurrency( - concurrency_scaler, - queue_urls: ["https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name"], - event_source_mapping_uuids: [event_source_mapping_uuid] - ) + def tune_indexer_concurrency(concurrency_scaler) concurrency_scaler.tune_indexer_concurrency( - queue_urls: queue_urls, + queue_urls: ["https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name"], min_cpu_target: min_cpu_target, max_cpu_target: max_cpu_target, - event_source_mapping_uuids: event_source_mapping_uuids + maximum_concurrency: maximum_concurrency, + indexer_function_name: indexer_function_name ) end end diff --git a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb index c32bb5cc..971e911c 100644 --- a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb +++ b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb @@ -36,7 +36,8 @@ "queue_urls" => ["https://sqs.us-west-2.amazonaws.com/000000000/some-eg-app-queue-name"], "min_cpu_target" => 70, "max_cpu_target" => 80, - "event_source_mapping_uuids" => ["12345678-1234-1234-1234-123456789012"] + "maximum_concurrency" => 1000, + "indexer_function_name" => "some-eg-app-indexer" } lambda_function.handle_request(event: event, context: {}) end