Skip to content

Commit

Permalink
Performance optimizations
Browse files Browse the repository at this point in the history
* Explicitly maintain buffer_length, as length(buffer) for large lists is expensive
* Add new messages to the head of the list, instead of the tail
  • Loading branch information
pmenhart committed Oct 18, 2018
1 parent 9302821 commit 1d949ce
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions lib/cloud_watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule CloudWatch do
@default_level :info
@default_max_buffer_size 10_485
@default_max_timeout 60_000
@max_buffer_size 10_000

alias CloudWatch.InputLogEvent
alias CloudWatch.AwsProxy
Expand Down Expand Up @@ -72,6 +73,7 @@ defmodule CloudWatch do
client = AwsProxy.client(access_key_id, secret_access_key, region, endpoint)
%{
buffer: [],
buffer_length: 0, # for a large list, this is less expensive than length(buffer)
buffer_size: 0,
client: client,
format: format,
Expand All @@ -87,21 +89,22 @@ defmodule CloudWatch do
end

defp purge_buffer(state) do
%{state | buffer: [], buffer_size: 0}
%{state | buffer: [], buffer_length: 0, buffer_size: 0}
end

defp add_message(%{buffer: buffer, buffer_size: buffer_size} = state, level, msg, ts, md) do
defp add_message(%{buffer: buffer, buffer_length: buffer_length, buffer_size: buffer_size} = state, level, msg, ts, md) do
message = state.format
|> Logger.Formatter.format(level, msg, ts, md)
|> IO.chardata_to_string
buffer = List.insert_at(buffer, -1, %InputLogEvent{message: message, timestamp: ts})
%{state | buffer: buffer, buffer_size: buffer_size + byte_size(message) + 26}
#buffer = List.insert_at(buffer, -1, %InputLogEvent{message: message, timestamp: ts}) # TODO: performance impact of adding at the end???
buffer = [%InputLogEvent{message: message, timestamp: ts} | buffer] # buffer order is not relevant, we'll reverse or sort later if needed
%{state | buffer: buffer, buffer_length: buffer_length + 1, buffer_size: buffer_size + byte_size(message) + 26}
end

defp flush(_state, _opts \\ [force: false])

defp flush(%{buffer: buffer, buffer_size: buffer_size, max_buffer_size: max_buffer_size} = state, [force: false])
when buffer_size < max_buffer_size and length(buffer) < 10_000 do
defp flush(%{buffer_length: buffer_length, buffer_size: buffer_size, max_buffer_size: max_buffer_size} = state, [force: false])
when buffer_size < max_buffer_size and buffer_length < @max_buffer_size do
{:ok, state}
end

Expand All @@ -114,14 +117,14 @@ defmodule CloudWatch do
do_flush(state, opts, log_group_name, log_stream_name)
end

defp do_flush(state, opts, log_group_name, log_stream_name) do
events = %{logEvents: Enum.sort_by(state.buffer, &(&1.timestamp)),
defp do_flush(%{buffer: buffer, buffer_length: buffer_length} = state, opts, log_group_name, log_stream_name) do
# TODO: worst case for quicksort is when the collection is (almost) sorted or reverse-sorted. That is likely the case here!
events = %{logEvents: Enum.sort_by(buffer, &(&1.timestamp)),
logGroupName: log_group_name, logStreamName: log_stream_name, sequenceToken: state.sequence_token}
case AwsProxy.put_log_events(state.client, events) do
{:ok, %{"nextSequenceToken" => next_sequence_token}, _} ->
msg_count = length(state.buffer)
{:ok, state |> purge_buffer() |> Map.put(:sequence_token, next_sequence_token)
|> add_internal_info("CloudWatch Log flushed buffer (#{inspect msg_count} messages)")}
|> add_internal_info("CloudWatch Log flushed buffer (#{inspect buffer_length} messages)")}
{:error, {"DataAlreadyAcceptedException", "The given batch of log events has already been accepted. The next batch can be sent with sequenceToken: " <> next_sequence_token}} ->
state
|> Map.put(:sequence_token, next_sequence_token)
Expand All @@ -147,12 +150,11 @@ defmodule CloudWatch do
# AWS limit is 5 requests per second per log stream. We are supposed to re-try after a delay
if state.purge_buffer_if_throttled do
# Safe option: delay the transfer by removing all messages from the buffer (some messages will be lost!).
lost_msg_count = length(state.buffer)
{
:ok,
state
|> purge_buffer()
|> add_internal_error("CloudWatch Log ThrottlingException: #{inspect lost_msg_count} messages were lost!}")
|> add_internal_error("CloudWatch Log ThrottlingException: #{inspect buffer_length} messages were lost!}")
}
else
# Sleeping here is a quick and dirty hack with possible unwanted consequences
Expand Down

0 comments on commit 1d949ce

Please sign in to comment.