Skip to content

Commit

Permalink
max max_messages configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
jlambert121 committed Sep 30, 2016
1 parent 8ed664f commit b297b77
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions lib/logstash/inputs/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
include LogStash::PluginMixins::AwsConfig::V2

MAX_TIME_BEFORE_GIVING_UP = 60
MAX_MESSAGES_TO_FETCH = 10 # Between 1-10 in the AWS-SDK doc
MAX_MESSAGES_TO_FETCH = 1 # Between 1-10 in the AWS-SDK doc
SENT_TIMESTAMP = "SentTimestamp"
SQS_ATTRIBUTES = [SENT_TIMESTAMP]
BACKOFF_SLEEP_TIME = 1
Expand Down Expand Up @@ -107,7 +107,7 @@ def setup_queue
end

def polling_options
{
{
:max_number_of_messages => MAX_MESSAGES_TO_FETCH,
:attribute_names => SQS_ATTRIBUTES,
:wait_time_seconds => @polling_frequency
Expand All @@ -119,7 +119,7 @@ def decode_event(message)
return event
end
end

def add_sqs_data(event, message)
event.set(@id_field, message.message_id) if @id_field
event.set(@md5_field, message.md5_of_body) if @md5_field
Expand All @@ -139,12 +139,10 @@ def run(output_queue)
@logger.debug("Polling SQS queue", :polling_options => polling_options)

run_with_backoff do
poller.poll(polling_options) do |messages, stats|
poller.poll(polling_options) do |message, stats|
break if stop?

messages.each do |message|
output_queue << handle_message(message)
end
output_queue << handle_message(message)

@logger.debug("SQS Stats:", :request_count => stats.request_count,
:received_message_count => stats.received_message_count,
Expand All @@ -169,7 +167,7 @@ def run_with_backoff(max_time = MAX_TIME_BEFORE_GIVING_UP, sleep_time = BACKOFF_
rescue Aws::SQS::Errors::ServiceError => e
@logger.warn("Aws::SQS::Errors::ServiceError ... retrying SQS request with exponential backoff", :queue => @queue, :sleep_time => sleep_time, :error => e)
sleep(next_sleep)
next_sleep = next_sleep > max_time ? sleep_time : sleep_time * BACKOFF_FACTOR
next_sleep = next_sleep > max_time ? sleep_time : sleep_time * BACKOFF_FACTOR

retry
end
Expand Down

0 comments on commit b297b77

Please sign in to comment.