Skip to content

Commit

Permalink
Add support for asynchronous flushing (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredcnance authored Aug 6, 2021
1 parent ab1f24a commit aaa10c6
Show file tree
Hide file tree
Showing 34 changed files with 727 additions and 91 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ build
out
.settings
.temp
bin
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Generate CloudWatch metrics embedded within structured log events. The embedded metrics will be extracted so that you can visualize and alarm on them for real-time incident detection. This allows you to monitor aggregated values while preserving the detailed log event context that generates them.
- [Use Cases](#use-cases)
- [Usage](#usage)
- [Graceful Shutdown](#graceful-shutdown)
- [API](#api)
- [Examples](#examples)
- [Development](#development)
Expand All @@ -25,7 +26,6 @@ Generate CloudWatch metrics embedded within structured log events. The embedded

To use a metric logger, you need to manually create and flush the logger.


```java
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
Expand All @@ -44,6 +44,30 @@ class Example {

You can find the artifact location and examples of how to include it in your project at [Maven Central](https://search.maven.org/artifact/software.amazon.cloudwatchlogs/aws-embedded-metrics)

## Graceful Shutdown

**Since:** 2.0.0-beta-1

In any environment, other than AWS Lambda, we recommend running an out-of-process agent (the CloudWatch Agent or
FireLens / Fluent-Bit) to collect the EMF events. When using an out-of-process agent, this package will buffer the data
asynchronously in process to handle any transient communication issues with the agent. This means that when the `MetricsLogger`
gets flushed, data may not be safely persisted yet. To gracefully shutdown the environment, you can call shutdown on the
environment's sink. A full example can be found in the [`examples`](examples) directory.

```java
// create an environment singleton, this should be re-used across loggers
DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());

MetricsLogger logger = new MetricsLogger(environment);
logger.setDimensions(DimensionSet.of("Operation", "ProcessRecords"));
logger.putMetric("ExampleMetric", 100, Unit.MILLISECONDS);
logger.putProperty("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8");
logger.flush();

// flush the sink, waiting up to 10s before giving up
environment.getSink().shutdown().orTimeout(10_000L, TimeUnit.MILLISECONDS);
```

## API

### MetricsLogger
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ allprojects {
targetCompatibility = '1.8'
}

version = '1.0.4'
version = '2.0.0-beta-1'
}

java {
Expand Down Expand Up @@ -64,6 +64,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.11.1'
implementation 'org.slf4j:slf4j-api:1.7.30'
implementation 'org.javatuples:javatuples:1.2'

// Use JUnit test framework
testImplementation 'software.amazon.awssdk:cloudwatch:2.13.54'
Expand Down
2 changes: 1 addition & 1 deletion buildspecs/buildspec.canary.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ env:
phases:
install:
runtime-versions:
java: corretto8
java: corretto11
commands:
# start docker
# https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files
Expand Down
2 changes: 1 addition & 1 deletion buildspecs/buildspec.release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ env:
phases:
install:
runtime-versions:
java: corretto8
java: corretto11
build:
commands:
- ./gradlew publish
2 changes: 1 addition & 1 deletion buildspecs/buildspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ env:
phases:
install:
runtime-versions:
java: corretto8
java: corretto11
commands:
# start docker
# https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files
Expand Down
17 changes: 15 additions & 2 deletions examples/agent/src/main/java/agent/App.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
package agent;

import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment;
import software.amazon.cloudwatchlogs.emf.environment.Environment;
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
import software.amazon.cloudwatchlogs.emf.model.Unit;

import java.util.concurrent.TimeUnit;

public class App {

public static void main(String[] args) {
MetricsLogger logger = new MetricsLogger();
logger.putDimensions(DimensionSet.of("Operation", "Agent"));
DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
emitMetric(environment);
emitMetric(environment);
emitMetric(environment);
environment.getSink().shutdown().orTimeout(360_000L, TimeUnit.MILLISECONDS);
}

private static void emitMetric(Environment environment) {
MetricsLogger logger = new MetricsLogger(environment);
logger.setDimensions(DimensionSet.of("Operation", "Agent"));
logger.putMetric("ExampleMetric", 100, Unit.MILLISECONDS);
logger.putProperty("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8");
logger.flush();
Expand Down
16 changes: 16 additions & 0 deletions examples/ecs-firelens/src/main/java/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
import software.amazon.cloudwatchlogs.emf.environment.ECSEnvironment;
import software.amazon.cloudwatchlogs.emf.environment.Environment;
import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider;
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.Unit;
import sun.misc.Signal;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public class App {

private static final Environment env = new ECSEnvironment(EnvironmentConfigurationProvider.getConfig());

public static void main(String[] args) throws Exception {
registerShutdownHook();

int portNumber = 8000;
HttpServer server = HttpServer.create(new InetSocketAddress(8000), 0);
Expand All @@ -37,6 +45,14 @@ public static void main(String[] args) throws Exception {
server.start();
}

private static void registerShutdownHook() {
// https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/
Signal.handle(new Signal("TERM"), sig -> {
env.getSink().shutdown().orTimeout(1_000L, TimeUnit.MILLISECONDS);
System.exit(0);
});
}

static class SimpleHandler implements HttpHandler {
@Override
public void handle(HttpExchange he) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import software.amazon.awssdk.services.cloudwatch.model.Statistic;
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider;
import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment;
import software.amazon.cloudwatchlogs.emf.environment.Environment;
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
import software.amazon.cloudwatchlogs.emf.model.Unit;
Expand All @@ -56,56 +57,64 @@ public void setUp() {

@Test(timeout = 120_000)
public void testSingleFlushOverTCP() throws InterruptedException {
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
String metricName = "TCP-SingleFlush";
int expectedSamples = 1;
config.setAgentEndpoint("tcp://127.0.0.1:25888");

logMetric(metricName);
logMetric(env, metricName);
env.getSink().shutdown().join();

assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
}

@Test(timeout = 300_000)
public void testMultipleFlushesOverTCP() throws InterruptedException {
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
String metricName = "TCP-MultipleFlushes";
int expectedSamples = 3;
config.setAgentEndpoint("tcp://127.0.0.1:25888");

logMetric(metricName);
logMetric(metricName);
logMetric(env, metricName);
logMetric(env, metricName);
Thread.sleep(500);
logMetric(metricName);
logMetric(env, metricName);
env.getSink().shutdown().join();

assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
}

@Test(timeout = 120_000)
public void testSingleFlushOverUDP() throws InterruptedException {
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
String metricName = "UDP-SingleFlush";
int expectedSamples = 1;
config.setAgentEndpoint("udp://127.0.0.1:25888");

logMetric(metricName);
logMetric(env, metricName);
env.getSink().shutdown().join();

assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
}

@Test(timeout = 300_000)
public void testMultipleFlushOverUDP() throws InterruptedException {
Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());
String metricName = "UDP-MultipleFlush";
int expectedSamples = 3;
config.setAgentEndpoint("udp://127.0.0.1:25888");

logMetric(metricName);
logMetric(metricName);
logMetric(env, metricName);
logMetric(env, metricName);
Thread.sleep(500);
logMetric(metricName);
logMetric(env, metricName);
env.getSink().shutdown().join();

assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples));
}

private void logMetric(String metricName) {
MetricsLogger logger = new MetricsLogger(new EnvironmentProvider());
private void logMetric(Environment env, String metricName) {
MetricsLogger logger = new MetricsLogger(env);
logger.putDimensions(dimensions);
logger.putMetric(metricName, 100, Unit.MILLISECONDS);
logger.flush();
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,27 @@ public class Constants {
public static final int MAX_METRICS_PER_EVENT = 100;

public static final int MAX_DATAPOINTS_PER_METRIC = 100;

/**
* The max number of messages to hold in memory in case of transient socket errors. The maximum
* message size is 256 KB meaning the maximum size of this buffer would be 25,600 MB
*/
public static final int DEFAULT_ASYNC_BUFFER_SIZE = 100;

/**
* How many times to retry an individual message. We eventually give up vs. retrying
* indefinitely in case there is something inherent to the message that is causing the failures.
* Giving up results in data loss, but also helps us reduce the risk of a poison pill blocking
* all process telemetry.
*/
public static final int MAX_ATTEMPTS_PER_MESSAGE = 100;

/** Starting backoff millis when a transient socket failure is encountered. */
public static final int MIN_BACKOFF_MILLIS = 50;

/** Max backoff millis when a transient socket failure is encountered. */
public static final int MAX_BACKOFF_MILLIS = 2000;

/** Maximum amount of random jitter to apply to retries */
public static final int MAX_BACKOFF_JITTER = 20;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.util.Optional;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import software.amazon.cloudwatchlogs.emf.Constants;
import software.amazon.cloudwatchlogs.emf.environment.Environments;
import software.amazon.cloudwatchlogs.emf.util.StringUtils;

Expand Down Expand Up @@ -54,6 +56,9 @@ public class Configuration {
*/
@Setter Environments environmentOverride;

/** Queue length for asynchronous sinks. */
@Setter @Getter int asyncBufferSize = Constants.DEFAULT_ASYNC_BUFFER_SIZE;

public Optional<String> getServiceName() {
return getStringOptional(serviceName);
}
Expand Down Expand Up @@ -85,6 +90,6 @@ private Optional<String> getStringOptional(String value) {
if (StringUtils.isNullOrEmpty(value)) {
return Optional.empty();
}
return Optional.ofNullable(value);
return Optional.of(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public class ConfigurationKeys {
public static final String LOG_STREAM_NAME = "LOG_STREAM_NAME";
public static final String AGENT_ENDPOINT = "AGENT_ENDPOINT";
public static final String ENVIRONMENT_OVERRIDE = "ENVIRONMENT";
public static final String ASYNC_BUFFER_SIZE = "ASYNC_BUFFER_SIZE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package software.amazon.cloudwatchlogs.emf.config;

import software.amazon.cloudwatchlogs.emf.Constants;
import software.amazon.cloudwatchlogs.emf.environment.Environments;
import software.amazon.cloudwatchlogs.emf.util.StringUtils;

Expand All @@ -27,21 +28,21 @@ protected EnvironmentConfigurationProvider() {}

public static Configuration getConfig() {
if (config == null) {
config =
new Configuration(
getEnvVar(ConfigurationKeys.SERVICE_NAME),
getEnvVar(ConfigurationKeys.SERVICE_TYPE),
getEnvVar(ConfigurationKeys.LOG_GROUP_NAME),
getEnvVar(ConfigurationKeys.LOG_STREAM_NAME),
getEnvVar(ConfigurationKeys.AGENT_ENDPOINT),
getEnvironmentOverride());
config = createConfig();
}
return config;
}

private static String getEnvVar(String key) {
String name = String.join("", ConfigurationKeys.ENV_VAR_PREFIX, "_", key);
return getEnv(name);
static Configuration createConfig() {
return new Configuration(
getEnvVar(ConfigurationKeys.SERVICE_NAME),
getEnvVar(ConfigurationKeys.SERVICE_TYPE),
getEnvVar(ConfigurationKeys.LOG_GROUP_NAME),
getEnvVar(ConfigurationKeys.LOG_STREAM_NAME),
getEnvVar(ConfigurationKeys.AGENT_ENDPOINT),
getEnvironmentOverride(),
getIntOrDefault(
ConfigurationKeys.ASYNC_BUFFER_SIZE, Constants.DEFAULT_ASYNC_BUFFER_SIZE));
}

private static Environments getEnvironmentOverride() {
Expand All @@ -57,6 +58,24 @@ private static Environments getEnvironmentOverride() {
}
}

private static int getIntOrDefault(String key, int defaultValue) {
String value = getEnvVar(key);
if (StringUtils.isNullOrEmpty(value)) {
return defaultValue;
}

try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return defaultValue;
}
}

private static String getEnvVar(String key) {
String name = String.join("", ConfigurationKeys.ENV_VAR_PREFIX, "_", key);
return getEnv(name);
}

private static String getEnv(String name) {
return SystemWrapper.getenv(name);
}
Expand Down
Loading

0 comments on commit aaa10c6

Please sign in to comment.