Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure using instance metadata #1

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Add the backend to `config.exs`:
utc_log: true
```

Configure the following example to suit your needs:
Following is a full example with the default values:

```elixir
config :logger, CloudWatch,
Expand All @@ -46,10 +46,22 @@ Configure the following example to suit your needs:
max_timeout: 60_000
```

The `endpoint` may be omitted from the configuration and will default to
`amazonaws.com`. The `max_buffer_size` controls when `cloud_watch` will flush
the buffer in bytes. You may specify anything up to a maximum of 1,048,576
bytes. If omitted, it will default to 10,485 bytes.
CloudWatch flushes the buffer when it has collected `max_buffer_size` bytes of
messages or `max_timeout` milliseconds have elapsed. `max_buffer_size` can be
anything up to a maximum of 1,048,576 bytes. If omitted, it will default to
10,485 bytes.

CloudWatch supports getting AWS credentials and other defaults from
[EC2 instance metadata](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html)

In that case, a minimal configuration is:

```elixir
config :logger, CloudWatch,
log_group_name: "api"
```

`log_stream_name` defaults to the instance id.

## Alternative AWS client library: ExAws

Expand Down
221 changes: 170 additions & 51 deletions lib/cloud_watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,32 @@ defmodule CloudWatch do
alias CloudWatch.InputLogEvent
alias CloudWatch.AwsProxy

def init(_) do
state = configure(Application.get_env(:logger, CloudWatch, []))
@spec init(module()) :: {:ok, term()} | {:error, term()}
def init(__MODULE__) do
# Because this is a plugin to Logger, we can't rely on application
# dependencies to be started before this is called
{:ok, _} = Application.ensure_all_started(:hackney)

env = Application.get_env(:logger, __MODULE__, [])
state = configure(env)

# If AWS keys are not defined statically, get them from the instance metadata.
# This may fail while the instance is starting up, so retry quickly.
# Otherwise refresh every 10 minutes, as the keys expire periodically.
unless state.access_key_id do
if state.client do
Process.send_after(self(), :refresh_creds, 300_000)
else
Process.send_after(self(), :refresh_creds, 200)
end
end

Process.send_after(self(), :flush, state.max_timeout)
{:ok, state}
end

def handle_call({:configure, opts}, _) do
Application.put_env(:logger, __MODULE__, opts)
{:ok, :ok, configure(opts)}
end

Expand All @@ -25,21 +44,23 @@ defmodule CloudWatch do

def handle_event({level, _gl, {Logger, msg, ts, md}}, state) do
case Logger.compare_levels(level, state.level) do
:lt -> {:ok, state}
:lt ->
{:ok, state}
_ ->
%{buffer: buffer, buffer_size: buffer_size} = state

message = state.format
|> Logger.Formatter.format(level, msg, ts, md)
|> IO.chardata_to_string

buffer = List.insert_at(buffer, -1, %InputLogEvent{message: message, timestamp: ts})
state
|> Map.merge(%{buffer: buffer, buffer_size: buffer_size + byte_size(message) + 26})
|> flush()

flush(%{state | buffer: buffer, buffer_size: buffer_size + byte_size(message) + 26})
end
end

def handle_event(:flush, state) do
{:ok, Map.merge(state, %{buffer: [], buffer_size: 0})}
{:ok, %{state | buffer: [], buffer_size: 0}}
end

def handle_info(:flush, state) do
Expand All @@ -48,6 +69,16 @@ defmodule CloudWatch do
{:ok, flushed_state}
end

def handle_info(:refresh_creds, state) do
state = configure_aws(state)
if state.client do
Process.send_after(self(), :refresh_creds, 300_000)
else
Process.send_after(self(), :refresh_creds, 200)
end
{:ok, state}
end

def handle_info(_msg, state) do
{:ok, state}
end
Expand All @@ -60,60 +91,148 @@ defmodule CloudWatch do
:ok
end

defp configure(opts) do
opts = Keyword.merge(Application.get_env(:logger, CloudWatch, []), opts)
format = Logger.Formatter.compile(Keyword.get(opts, :format, @default_format))
level = Keyword.get(opts, :level, @default_level)
log_group_name = Keyword.get(opts, :log_group_name)
log_stream_name = Keyword.get(opts, :log_stream_name)
max_buffer_size = Keyword.get(opts, :max_buffer_size, @default_max_buffer_size)
max_timeout = Keyword.get(opts, :max_timeout, @default_max_timeout)
@spec configure(Keyword.t) :: Map.t
def configure(opts) do
state = %{
access_key_id: opts[:access_key_id],
secret_access_key: opts[:secret_access_key],
region: opts[:region],
endpoint: opts[:endpoint],
client: nil,
buffer: [], buffer_size: 0,
level: opts[:level] || @default_level,
format: Logger.Formatter.compile(opts[:format] || @default_format),
log_group_name: opts[:log_group_name],
log_stream_name: opts[:log_stream_name],
max_buffer_size: opts[:max_buffer_size] || @default_max_buffer_size,
max_timeout: opts[:max_timeout] || @default_max_timeout,
sequence_token: nil, flushed_at: nil
}

# AWS configuration, only if needed by the AWS library
region = Keyword.get(opts, :region)
access_key_id = Keyword.get(opts, :access_key_id)
endpoint = Keyword.get(opts, :endpoint, @default_endpoint)
secret_access_key = Keyword.get(opts, :secret_access_key)
client = AwsProxy.client(access_key_id, secret_access_key, region, endpoint)
%{buffer: [], buffer_size: 0, client: client, format: format, level: level, log_group_name: log_group_name,
log_stream_name: log_stream_name, max_buffer_size: max_buffer_size, max_timeout: max_timeout,
sequence_token: nil, flushed_at: nil}
if state.access_key_id do
# Static AWS config
%{state | client: AwsProxy.client(state.access_key_id, state.secret_access_key, state.region, state.endpoint)}
else
configure_aws(state)
end
end

defp flush(_state, _opts \\ [force: false])
def configure_aws(state) do
case System.get_env("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") do
nil ->
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
case get_metadata("http://169.254.169.254/latest/meta-data/iam/security-credentials/") do
{:ok, ""} ->
state
{:ok, role} ->
{:ok, json} = get_metadata("http://169.254.169.254/latest/meta-data/iam/security-credentials/" <> role)
{:ok, creds} = Poison.decode(json)
access_key_id = Map.get(creds, "AccessKeyId")
secret_access_key = Map.get(creds, "SecretAccessKey")
region = state.region || metadata_region()
endpoint = state.endpoint || metadata_endpoint() || @default_endpoint
client = AwsProxy.client(access_key_id, secret_access_key, region, endpoint)

log_stream_name = state.log_stream_name || metadata_instance_id()

%{state | client: client, log_stream_name: log_stream_name}
_ ->
state
end
uri ->
# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
# This is untested
case get_metadata("http://169.254.170.2" <> uri) do
{:ok, json} ->
{:ok, creds} = Poison.decode(json)
access_key_id = Map.get(creds, "AccessKeyId")
secret_access_key = Map.get(creds, "SecretAccessKey")
region = state.region
endpoint = state.endpoint || @default_endpoint
client = AwsProxy.client(access_key_id, secret_access_key, region, endpoint)
%{state | client: client}
_ ->
state
end
end
end

defp flush(state, opts \\ [force: false])

defp flush(%{buffer: buffer, buffer_size: buffer_size, max_buffer_size: max_buffer_size} = state, [force: false])
when buffer_size < max_buffer_size and length(buffer) < 10_000 do
{:ok, state}
end

defp flush(%{buffer: []} = state, _opts), do: {:ok, state}

defp flush(%{buffer: []} = state, _opts), do: {:ok, state}

# Client not configured yet
defp flush(%{client: nil} = state, _opts), do: {:ok, state}

defp flush(state, opts) do
case AwsProxy.put_log_events(state.client, %{logEvents: Enum.sort_by(state.buffer, &(&1.timestamp)),
logGroupName: state.log_group_name, logStreamName: state.log_stream_name, sequenceToken: state.sequence_token}) do
{:ok, %{"nextSequenceToken" => next_sequence_token}, _} ->
{:ok, Map.merge(state, %{buffer: [], buffer_size: 0, sequence_token: next_sequence_token})}
{:error, {"DataAlreadyAcceptedException", "The given batch of log events has already been accepted. The next batch can be sent with sequenceToken: " <> next_sequence_token}} ->
state
|> Map.put(:sequence_token, next_sequence_token)
|> flush(opts)
{:error, {"InvalidSequenceTokenException", "The given sequenceToken is invalid. The next expected sequenceToken is: " <> next_sequence_token}} ->
state
|> Map.put(:sequence_token, next_sequence_token)
|> flush(opts)
{:error, {"ResourceNotFoundException", "The specified log group does not exist."}} ->
AwsProxy.create_log_group(state.client, %{logGroupName: state.log_group_name})
AwsProxy.create_log_stream(state.client, %{logGroupName: state.log_group_name,
logStreamName: state.log_stream_name})
flush(state, opts)
{:error, {"ResourceNotFoundException", "The specified log stream does not exist."}} ->
AwsProxy.create_log_stream(state.client, %{logGroupName: state.log_group_name,
logStreamName: state.log_stream_name})
flush(state, opts)
{:error, %HTTPoison.Error{id: nil, reason: reason}} when reason in [:closed, :connect_timeout, :timeout] ->
state
|> flush(opts)
events = %{logEvents: Enum.sort_by(state.buffer, &(&1.timestamp)),
logGroupName: state.log_group_name, logStreamName: state.log_stream_name,
sequenceToken: state.sequence_token}

case AwsProxy.put_log_events(state.client, events) do
{:ok, %{"nextSequenceToken" => next_sequence_token}, _} ->
{:ok, %{state | buffer: [], buffer_size: 0, sequence_token: next_sequence_token}}
{:error, {"DataAlreadyAcceptedException",
"The given batch of log events has already been accepted. The next batch can be sent with sequenceToken: " <> next_sequence_token}} ->
flush(%{state | sequence_token: next_sequence_token}, opts)
{:error, {"InvalidSequenceTokenException",
"The given sequenceToken is invalid. The next expected sequenceToken is: " <> next_sequence_token}} ->
flush(%{state | sequence_token: next_sequence_token}, opts)
{:error, {"ResourceNotFoundException", "The specified log group does not exist."}} ->
{:ok, _, _} = AwsProxy.create_log_group(state.client, %{logGroupName: state.log_group_name})
{:ok, _, _} = AwsProxy.create_log_stream(state.client, %{logGroupName: state.log_group_name,
logStreamName: state.log_stream_name})
flush(state, opts)
{:error, {"ResourceNotFoundException", "The specified log stream does not exist."}} ->
{:ok, _, _} = AwsProxy.create_log_stream(state.client, %{logGroupName: state.log_group_name,
logStreamName: state.log_stream_name})
flush(state, opts)
{:error, %HTTPoison.Error{id: nil, reason: reason}} when reason in [:closed, :connect_timeout, :timeout] ->
flush(state, opts)
end
end

def get_metadata(url) do
case :hackney.request(:get, url, [], "", []) do
{:ok, 200, _resp_headers, client_ref} ->
:hackney.body(client_ref)
_ ->
nil
end
end

def get_metadata!(url) do
case :hackney.request(:get, url, [], "", []) do
{:ok, 200, _resp_headers, client_ref} ->
{:ok, body} = :hackney.body(client_ref)
body
_ ->
nil
end
end

def metadata_endpoint do
get_metadata!("http://169.254.169.254/latest/meta-data/services/domain")
end

def metadata_instance_id do
get_metadata!("http://169.254.169.254/latest/meta-data/instance-id")
end

def metadata_region do
url = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
case :hackney.request(:get, url, [], "", []) do
{:ok, 200, _resp_headers, client_ref} ->
{:ok, body} = :hackney.body(client_ref)
String.slice(body, Range.new(0, -2))
_ ->
nil
end
end

end
2 changes: 1 addition & 1 deletion lib/cloud_watch/aws_proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule CloudWatch.AwsProxy do
cond do
Code.ensure_loaded?(AWS) ->
# AWS CloudWatch Logs implemented using aws-elixir
# See https://github.com/jkakar/aws-elixir
# https://github.com/aws-beam/aws-elixir
#
# AWS credentials are configured in CloudWatch
def client(access_key_id, secret_access_key, region, endpoint) do
Expand Down
1 change: 1 addition & 0 deletions lib/cloud_watch/input_log_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule CloudWatch.InputLogEvent do
|> Kernel.-(@epoch)
|> Kernel.*(1000)
|> Kernel.+(milliseconds)

%{message: message, timestamp: timestamp}
|> Poison.Encoder.encode(options)
|> IO.chardata_to_string
Expand Down
9 changes: 6 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule CloudWatch.Mixfile do
#
# Type "mix help compile.app" for more information
def application do
[applications: [:logger]]
[extra_applications: [:logger]]
end

# This makes sure your factory and any other modules in test/support are compiled
Expand All @@ -35,8 +35,11 @@ defmodule CloudWatch.Mixfile do
#
# Type "mix help deps" for more examples and options
defp deps do
[{:aws, "~> 0.5.0", optional: true},
{:httpoison, "~> 0.11.1"},
[
{:aws, "~> 0.5.0", optional: true},
{:httpoison, "~> 0.11"},
{:poison, "~> 3.1"},
{:hackney, "~> 1.8"},
{:credo, "~> 0.4.13", only: :dev},
{:mock, "~> 0.2.0", only: :test},
{:ex_doc, ">= 0.0.0", only: :dev}]
Expand Down
Loading