Skip to content

Commit

Permalink
Allow STDOUT overrides for AgentBasedEnvironments (#147)
Browse files Browse the repository at this point in the history
* Allow for STDOUT overrides when using an AgentBasedEnvironment

* replace double space indents with 4 space indents

* read WRITE_TO_STDOUT env var to the Configuration object

* add AWS_EMF_ prefix to the stdout docs

* add blump in README for how the new environment variable will work

* run spotlessApply

---------

Co-authored-by: Paul Schellenberg <[email protected]>
  • Loading branch information
PaulJSchellenberg and Paul Schellenberg authored Jun 13, 2024
1 parent 6b97f76 commit 2713006
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 20 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,24 @@ config.setAgentEndpoint("udp://127.0.0.1:1000");
AWS_EMF_AGENT_ENDPOINT="udp://127.0.0.1:1000"
```

**WriteToStdout**: For agent-based platforms, setting this configuration to `true` will make the `MetricsLogger` write to `stdout` rather than sending them to the agent. The default value for this configuration is `false`. This configuration has no effect for non-agent-based platforms.

If an `EnvironmentOverride` is provided, this configuration will apply to the overriden environment if the environment is an agent-based platform

Example:

```java
// in process
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;

Configuration config = EnvironmentConfigurationProvider.getConfig();
config.setShouldWriteToStdout(true);

// environment
AWS_EMF_WRITE_TO_STDOUT="true"
```

## Thread-safety

### Internal Synchronization
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ With Docker images, using the `awslogs` log driver will send your container logs
## ECS and Fargate

With ECS and Fargate, you can use the `awsfirelens` (recommended) or `awslogs` log driver to have your logs sent to CloudWatch Logs on your behalf. After configuring the options for your preferred log driver, you may write your EMF logs to STDOUT and they will be processed.
To write your EMF logs to STDOUT, set the environment variable `AWS_EMF_WRITE_TO_STDOUT=true`

[`awsfirelens` documentation](https://github.com/aws/amazon-cloudwatch-logs-for-fluent-bit)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class Configuration {
/** Queue length for asynchronous sinks. */
@Setter @Getter int asyncBufferSize = Constants.DEFAULT_ASYNC_BUFFER_SIZE;

@Setter private boolean shouldWriteToStdout;

public Optional<String> getServiceName() {
return getStringOptional(serviceName);
}
Expand Down Expand Up @@ -92,4 +94,8 @@ private Optional<String> getStringOptional(String value) {
}
return Optional.of(value);
}

public boolean shouldWriteToStdout() {
return shouldWriteToStdout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public class ConfigurationKeys {
public static final String AGENT_ENDPOINT = "AGENT_ENDPOINT";
public static final String ENVIRONMENT_OVERRIDE = "ENVIRONMENT";
public static final String ASYNC_BUFFER_SIZE = "ASYNC_BUFFER_SIZE";
public static final String WRITE_TO_STDOUT = "WRITE_TO_STDOUT";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ static Configuration createConfig() {
getEnvVar(ConfigurationKeys.AGENT_ENDPOINT),
getEnvironmentOverride(),
getIntOrDefault(
ConfigurationKeys.ASYNC_BUFFER_SIZE, Constants.DEFAULT_ASYNC_BUFFER_SIZE));
ConfigurationKeys.ASYNC_BUFFER_SIZE, Constants.DEFAULT_ASYNC_BUFFER_SIZE),
Boolean.parseBoolean(getEnvVar(ConfigurationKeys.WRITE_TO_STDOUT)));
}

private static Environments getEnvironmentOverride() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.cloudwatchlogs.emf.Constants;
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.sinks.AgentSink;
import software.amazon.cloudwatchlogs.emf.sinks.ConsoleSink;
import software.amazon.cloudwatchlogs.emf.sinks.Endpoint;
import software.amazon.cloudwatchlogs.emf.sinks.ISink;
import software.amazon.cloudwatchlogs.emf.sinks.SocketClientFactory;
Expand Down Expand Up @@ -67,27 +68,31 @@ public String getLogStreamName() {
@Override
public ISink getSink() {
if (sink == null) {
Endpoint endpoint;
if (config.getAgentEndpoint().isPresent()) {
endpoint = Endpoint.fromURL(config.getAgentEndpoint().get());
if (config.shouldWriteToStdout()) {
sink = new ConsoleSink();
} else {
log.info(
"Endpoint is not defined. Using default: {}",
Endpoint.DEFAULT_TCP_ENDPOINT);
endpoint = Endpoint.DEFAULT_TCP_ENDPOINT;
Endpoint endpoint;
if (config.getAgentEndpoint().isPresent()) {
endpoint = Endpoint.fromURL(config.getAgentEndpoint().get());
} else {
log.info(
"Endpoint is not defined. Using default: {}",
Endpoint.DEFAULT_TCP_ENDPOINT);
endpoint = Endpoint.DEFAULT_TCP_ENDPOINT;
}
sink =
new AgentSink(
getLogGroupName(),
getLogStreamName(),
endpoint,
new SocketClientFactory(),
config.getAsyncBufferSize(),
() ->
new FibonacciRetryStrategy(
Constants.MIN_BACKOFF_MILLIS,
Constants.MAX_BACKOFF_MILLIS,
Constants.MAX_BACKOFF_JITTER));
}
sink =
new AgentSink(
getLogGroupName(),
getLogStreamName(),
endpoint,
new SocketClientFactory(),
config.getAsyncBufferSize(),
() ->
new FibonacciRetryStrategy(
Constants.MIN_BACKOFF_MILLIS,
Constants.MAX_BACKOFF_MILLIS,
Constants.MAX_BACKOFF_JITTER));
}
return sink;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void getGetConfig() {
putEnv("AWS_EMF_AGENT_ENDPOINT", "Endpoint");
putEnv("AWS_EMF_ENVIRONMENT", "Agent");
putEnv("AWS_EMF_ASYNC_BUFFER_SIZE", "9999");
putEnv("AWS_EMF_WRITE_TO_STDOUT", "true");

Configuration config = EnvironmentConfigurationProvider.createConfig();

Expand All @@ -50,6 +51,7 @@ public void getGetConfig() {
assertEquals("Endpoint", config.getAgentEndpoint().get());
assertEquals(Environments.Agent, config.getEnvironmentOverride());
assertEquals(9999, config.getAsyncBufferSize());
assertTrue(config.shouldWriteToStdout());
}

@Test
Expand All @@ -59,10 +61,20 @@ public void invalidEnvironmentValuesFallbackToExpectedDefaults() {

// act
putEnv("AWS_EMF_ASYNC_BUFFER_SIZE", "NaN");
putEnv("AWS_EMF_WRITE_TO_STDOUT", "notABool");

// assert
Configuration config = EnvironmentConfigurationProvider.createConfig();
assertEquals(100, config.getAsyncBufferSize());
assertFalse(config.shouldWriteToStdout());
}

@Test
public void emptyEnvironmentValuesFallbackToExpectedDefaults() {
// assert
Configuration config = EnvironmentConfigurationProvider.createConfig();
assertEquals(100, config.getAsyncBufferSize());
assertFalse(config.shouldWriteToStdout());
}

private void putEnv(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package software.amazon.cloudwatchlogs.emf.environment;

import static org.junit.Assert.assertEquals;
import static org.powermock.api.mockito.PowerMockito.mock;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.config.SystemWrapper;
import software.amazon.cloudwatchlogs.emf.model.MetricsContext;
import software.amazon.cloudwatchlogs.emf.sinks.AgentSink;
import software.amazon.cloudwatchlogs.emf.sinks.ConsoleSink;
import software.amazon.cloudwatchlogs.emf.sinks.Endpoint;
import software.amazon.cloudwatchlogs.emf.sinks.ISink;

@RunWith(PowerMockRunner.class)
@PrepareForTest({SystemWrapper.class, AgentBasedEnvironment.class})
public class AgentBasedEnvironmentTest {
public static class AgentBasedEnvironmentTestImplementation extends AgentBasedEnvironment {
protected AgentBasedEnvironmentTestImplementation(Configuration config) {
super(config);
}

@Override
public boolean probe() {
return false;
}

@Override
public String getType() {
return null;
}

@Override
public void configureContext(MetricsContext context) {}
}

private Configuration configuration;

@Before
public void setup() {
this.configuration = new Configuration();
}

@Test
public void testGetSinkWithDefaultEndpoint() throws Exception {
AgentSink mockedSink = mock(AgentSink.class);
PowerMockito.whenNew(AgentSink.class)
.withAnyArguments()
.then(
invocation -> {
Endpoint endpoint = invocation.getArgument(2);
assertEquals(Endpoint.DEFAULT_TCP_ENDPOINT, endpoint);
return mockedSink;
});

AgentBasedEnvironment env = new AgentBasedEnvironmentTestImplementation(configuration);
ISink sink = env.getSink();

assertEquals(mockedSink, sink);
}

@Test
public void testGetSinkWithConfiguredEndpoint() throws Exception {
String endpointUrl = "http://configured-endpoint:1234";
configuration.setAgentEndpoint(endpointUrl);
AgentSink mockedSink = mock(AgentSink.class);
PowerMockito.whenNew(AgentSink.class)
.withAnyArguments()
.then(
invocation -> {
Endpoint endpoint = invocation.getArgument(2);
assertEquals(Endpoint.fromURL(endpointUrl), endpoint);
return mockedSink;
});

AgentBasedEnvironment env = new AgentBasedEnvironmentTestImplementation(configuration);
ISink sink = env.getSink();

assertEquals(mockedSink, sink);
}

@Test
public void testGetSinkOverrideToStdOut() {
configuration.setShouldWriteToStdout(true);

AgentBasedEnvironment env = new AgentBasedEnvironmentTestImplementation(configuration);
ISink sink = env.getSink();

assertEquals(ConsoleSink.class, sink.getClass());
}

@Test
public void testGetSinkOverrideToStdOutFailFastOnImproperOverride() throws Exception {
configuration.setShouldWriteToStdout(false);

testGetSinkWithDefaultEndpoint();
}
}

0 comments on commit 2713006

Please sign in to comment.