diff --git a/lib/logstash/inputs/sqs.rb b/lib/logstash/inputs/sqs.rb index ac289db..bb3a036 100644 --- a/lib/logstash/inputs/sqs.rb +++ b/lib/logstash/inputs/sqs.rb @@ -62,7 +62,7 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable include LogStash::PluginMixins::AwsConfig::V2 MAX_TIME_BEFORE_GIVING_UP = 60 - MAX_MESSAGES_TO_FETCH = 10 # Between 1-10 in the AWS-SDK doc + MAX_MESSAGES_TO_FETCH = 1 # Between 1-10 in the AWS-SDK doc SENT_TIMESTAMP = "SentTimestamp" SQS_ATTRIBUTES = [SENT_TIMESTAMP] BACKOFF_SLEEP_TIME = 1 @@ -107,7 +107,7 @@ def setup_queue end def polling_options - { + { :max_number_of_messages => MAX_MESSAGES_TO_FETCH, :attribute_names => SQS_ATTRIBUTES, :wait_time_seconds => @polling_frequency @@ -119,7 +119,7 @@ def decode_event(message) return event end end - + def add_sqs_data(event, message) event.set(@id_field, message.message_id) if @id_field event.set(@md5_field, message.md5_of_body) if @md5_field @@ -139,12 +139,10 @@ def run(output_queue) @logger.debug("Polling SQS queue", :polling_options => polling_options) run_with_backoff do - poller.poll(polling_options) do |messages, stats| + poller.poll(polling_options) do |message, stats| break if stop? - messages.each do |message| - output_queue << handle_message(message) - end + output_queue << handle_message(message) @logger.debug("SQS Stats:", :request_count => stats.request_count, :received_message_count => stats.received_message_count, @@ -169,7 +167,7 @@ def run_with_backoff(max_time = MAX_TIME_BEFORE_GIVING_UP, sleep_time = BACKOFF_ rescue Aws::SQS::Errors::ServiceError => e @logger.warn("Aws::SQS::Errors::ServiceError ... retrying SQS request with exponential backoff", :queue => @queue, :sleep_time => sleep_time, :error => e) sleep(next_sleep) - next_sleep = next_sleep > max_time ? sleep_time : sleep_time * BACKOFF_FACTOR + next_sleep = next_sleep > max_time ? sleep_time : sleep_time * BACKOFF_FACTOR retry end