From aaa10c6063f8c2ca6f7b13c5cbe3375d27a8ec96 Mon Sep 17 00:00:00 2001 From: Jared Nance Date: Thu, 5 Aug 2021 20:47:38 -0500 Subject: [PATCH] Add support for asynchronous flushing (#82) --- .gitignore | 1 + README.md | 26 +- build.gradle | 3 +- buildspecs/buildspec.canary.yml | 2 +- buildspecs/buildspec.release.yml | 2 +- buildspecs/buildspec.yml | 2 +- examples/agent/src/main/java/agent/App.java | 17 +- examples/ecs-firelens/src/main/java/App.java | 16 + .../emf/MetricsLoggerIntegrationTest.java | 31 +- .../amazon/cloudwatchlogs/emf/Constants.java | 23 ++ .../emf/config/Configuration.java | 7 +- .../emf/config/ConfigurationKeys.java | 1 + .../EnvironmentConfigurationProvider.java | 41 ++- .../environment/AgentBasedEnvironment.java | 11 +- .../emf/environment/DefaultEnvironment.java | 4 +- .../emf/environment/EC2Environment.java | 2 +- .../emf/environment/ECSEnvironment.java | 6 +- .../emf/environment/LambdaEnvironment.java | 2 +- .../emf/environment/LocalEnvironment.java | 2 +- .../emf/environment/ResourceFetcher.java | 2 +- .../emf/logger/MetricsLogger.java | 6 + .../cloudwatchlogs/emf/sinks/AgentSink.java | 94 +++++- .../cloudwatchlogs/emf/sinks/ConsoleSink.java | 6 + .../cloudwatchlogs/emf/sinks/ISink.java | 11 + .../cloudwatchlogs/emf/sinks/MultiSink.java | 13 +- .../cloudwatchlogs/emf/sinks/TCPClient.java | 19 +- .../sinks/retry/FibonacciRetryStrategy.java | 32 ++ .../emf/sinks/retry/RetryStrategy.java | 10 + .../emf/config/ConfigurationTest.java | 7 +- .../EnvironmentConfigurationProviderTest.java | 21 +- .../emf/sinks/AgentSinkTest.java | 287 ++++++++++++++++-- .../emf/sinks/MultiSinkTest.java | 68 +++++ .../cloudwatchlogs/emf/sinks/SinkShunt.java | 6 + .../retry/FibonacciRetryStrategyTest.java | 37 +++ 34 files changed, 727 insertions(+), 91 deletions(-) create mode 100644 src/main/java/software/amazon/cloudwatchlogs/emf/sinks/retry/FibonacciRetryStrategy.java create mode 100644 src/main/java/software/amazon/cloudwatchlogs/emf/sinks/retry/RetryStrategy.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/sinks/MultiSinkTest.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/sinks/retry/FibonacciRetryStrategyTest.java diff --git a/.gitignore b/.gitignore index 337f47af..0bad7ec3 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ build out .settings .temp +bin diff --git a/README.md b/README.md index 735888cd..16b9937f 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ Generate CloudWatch metrics embedded within structured log events. The embedded metrics will be extracted so that you can visualize and alarm on them for real-time incident detection. This allows you to monitor aggregated values while preserving the detailed log event context that generates them. - [Use Cases](#use-cases) - [Usage](#usage) +- [Graceful Shutdown](#graceful-shutdown) - [API](#api) - [Examples](#examples) - [Development](#development) @@ -25,7 +26,6 @@ Generate CloudWatch metrics embedded within structured log events. The embedded To use a metric logger, you need to manually create and flush the logger. - ```java import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger; import software.amazon.cloudwatchlogs.emf.model.DimensionSet; @@ -44,6 +44,30 @@ class Example { You can find the artifact location and examples of how to include it in your project at [Maven Central](https://search.maven.org/artifact/software.amazon.cloudwatchlogs/aws-embedded-metrics) +## Graceful Shutdown + +**Since:** 2.0.0-beta-1 + +In any environment, other than AWS Lambda, we recommend running an out-of-process agent (the CloudWatch Agent or +FireLens / Fluent-Bit) to collect the EMF events. When using an out-of-process agent, this package will buffer the data +asynchronously in process to handle any transient communication issues with the agent. This means that when the `MetricsLogger` +gets flushed, data may not be safely persisted yet. To gracefully shutdown the environment, you can call shutdown on the +environment's sink. A full example can be found in the [`examples`](examples) directory. + +```java +// create an environment singleton, this should be re-used across loggers +DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig()); + +MetricsLogger logger = new MetricsLogger(environment); +logger.setDimensions(DimensionSet.of("Operation", "ProcessRecords")); +logger.putMetric("ExampleMetric", 100, Unit.MILLISECONDS); +logger.putProperty("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8"); +logger.flush(); + +// flush the sink, waiting up to 10s before giving up +environment.getSink().shutdown().orTimeout(10_000L, TimeUnit.MILLISECONDS); +``` + ## API ### MetricsLogger diff --git a/build.gradle b/build.gradle index 48de992e..b0d65d66 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ allprojects { targetCompatibility = '1.8' } - version = '1.0.4' + version = '2.0.0-beta-1' } java { @@ -64,6 +64,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.1' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.11.1' implementation 'org.slf4j:slf4j-api:1.7.30' + implementation 'org.javatuples:javatuples:1.2' // Use JUnit test framework testImplementation 'software.amazon.awssdk:cloudwatch:2.13.54' diff --git a/buildspecs/buildspec.canary.yml b/buildspecs/buildspec.canary.yml index 67f5b41a..35e33c5f 100644 --- a/buildspecs/buildspec.canary.yml +++ b/buildspecs/buildspec.canary.yml @@ -9,7 +9,7 @@ env: phases: install: runtime-versions: - java: corretto8 + java: corretto11 commands: # start docker # https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files diff --git a/buildspecs/buildspec.release.yml b/buildspecs/buildspec.release.yml index 71af4f41..6ac0735d 100644 --- a/buildspecs/buildspec.release.yml +++ b/buildspecs/buildspec.release.yml @@ -11,7 +11,7 @@ env: phases: install: runtime-versions: - java: corretto8 + java: corretto11 build: commands: - ./gradlew publish diff --git a/buildspecs/buildspec.yml b/buildspecs/buildspec.yml index 98124813..31b5b135 100644 --- a/buildspecs/buildspec.yml +++ b/buildspecs/buildspec.yml @@ -9,7 +9,7 @@ env: phases: install: runtime-versions: - java: corretto8 + java: corretto11 commands: # start docker # https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files diff --git a/examples/agent/src/main/java/agent/App.java b/examples/agent/src/main/java/agent/App.java index 355975ee..dd7878c3 100644 --- a/examples/agent/src/main/java/agent/App.java +++ b/examples/agent/src/main/java/agent/App.java @@ -1,14 +1,27 @@ package agent; +import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider; +import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment; +import software.amazon.cloudwatchlogs.emf.environment.Environment; import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger; import software.amazon.cloudwatchlogs.emf.model.DimensionSet; import software.amazon.cloudwatchlogs.emf.model.Unit; +import java.util.concurrent.TimeUnit; + public class App { public static void main(String[] args) { - MetricsLogger logger = new MetricsLogger(); - logger.putDimensions(DimensionSet.of("Operation", "Agent")); + DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig()); + emitMetric(environment); + emitMetric(environment); + emitMetric(environment); + environment.getSink().shutdown().orTimeout(360_000L, TimeUnit.MILLISECONDS); + } + + private static void emitMetric(Environment environment) { + MetricsLogger logger = new MetricsLogger(environment); + logger.setDimensions(DimensionSet.of("Operation", "Agent")); logger.putMetric("ExampleMetric", 100, Unit.MILLISECONDS); logger.putProperty("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8"); logger.flush(); diff --git a/examples/ecs-firelens/src/main/java/App.java b/examples/ecs-firelens/src/main/java/App.java index e5fc97f2..ee72a451 100644 --- a/examples/ecs-firelens/src/main/java/App.java +++ b/examples/ecs-firelens/src/main/java/App.java @@ -17,17 +17,25 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; +import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider; +import software.amazon.cloudwatchlogs.emf.environment.ECSEnvironment; +import software.amazon.cloudwatchlogs.emf.environment.Environment; import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger; import software.amazon.cloudwatchlogs.emf.model.Unit; +import sun.misc.Signal; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; public class App { + private static final Environment env = new ECSEnvironment(EnvironmentConfigurationProvider.getConfig()); + public static void main(String[] args) throws Exception { + registerShutdownHook(); int portNumber = 8000; HttpServer server = HttpServer.create(new InetSocketAddress(8000), 0); @@ -37,6 +45,14 @@ public static void main(String[] args) throws Exception { server.start(); } + private static void registerShutdownHook() { + // https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/ + Signal.handle(new Signal("TERM"), sig -> { + env.getSink().shutdown().orTimeout(1_000L, TimeUnit.MILLISECONDS); + System.exit(0); + }); + } + static class SimpleHandler implements HttpHandler { @Override public void handle(HttpExchange he) throws IOException { diff --git a/src/integration-test/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerIntegrationTest.java b/src/integration-test/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerIntegrationTest.java index 7899fa31..1471fec7 100644 --- a/src/integration-test/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerIntegrationTest.java +++ b/src/integration-test/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerIntegrationTest.java @@ -31,7 +31,8 @@ import software.amazon.awssdk.services.cloudwatch.model.Statistic; import software.amazon.cloudwatchlogs.emf.config.Configuration; import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider; -import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; +import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment; +import software.amazon.cloudwatchlogs.emf.environment.Environment; import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger; import software.amazon.cloudwatchlogs.emf.model.DimensionSet; import software.amazon.cloudwatchlogs.emf.model.Unit; @@ -56,56 +57,64 @@ public void setUp() { @Test(timeout = 120_000) public void testSingleFlushOverTCP() throws InterruptedException { + Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig()); String metricName = "TCP-SingleFlush"; int expectedSamples = 1; config.setAgentEndpoint("tcp://127.0.0.1:25888"); - logMetric(metricName); + logMetric(env, metricName); + env.getSink().shutdown().join(); assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples)); } @Test(timeout = 300_000) public void testMultipleFlushesOverTCP() throws InterruptedException { + Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig()); String metricName = "TCP-MultipleFlushes"; int expectedSamples = 3; config.setAgentEndpoint("tcp://127.0.0.1:25888"); - logMetric(metricName); - logMetric(metricName); + logMetric(env, metricName); + logMetric(env, metricName); Thread.sleep(500); - logMetric(metricName); + logMetric(env, metricName); + env.getSink().shutdown().join(); assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples)); } @Test(timeout = 120_000) public void testSingleFlushOverUDP() throws InterruptedException { + Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig()); String metricName = "UDP-SingleFlush"; int expectedSamples = 1; config.setAgentEndpoint("udp://127.0.0.1:25888"); - logMetric(metricName); + logMetric(env, metricName); + env.getSink().shutdown().join(); assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples)); } @Test(timeout = 300_000) public void testMultipleFlushOverUDP() throws InterruptedException { + Environment env = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig()); String metricName = "UDP-MultipleFlush"; int expectedSamples = 3; config.setAgentEndpoint("udp://127.0.0.1:25888"); - logMetric(metricName); - logMetric(metricName); + logMetric(env, metricName); + logMetric(env, metricName); Thread.sleep(500); - logMetric(metricName); + logMetric(env, metricName); + env.getSink().shutdown().join(); assertTrue(retryUntilSucceed(() -> buildRequest(metricName), expectedSamples)); } - private void logMetric(String metricName) { - MetricsLogger logger = new MetricsLogger(new EnvironmentProvider()); + private void logMetric(Environment env, String metricName) { + MetricsLogger logger = new MetricsLogger(env); logger.putDimensions(dimensions); logger.putMetric(metricName, 100, Unit.MILLISECONDS); logger.flush(); diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java b/src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java index 64faecf7..c314afcb 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java @@ -24,4 +24,27 @@ public class Constants { public static final int MAX_METRICS_PER_EVENT = 100; public static final int MAX_DATAPOINTS_PER_METRIC = 100; + + /** + * The max number of messages to hold in memory in case of transient socket errors. The maximum + * message size is 256 KB meaning the maximum size of this buffer would be 25,600 MB + */ + public static final int DEFAULT_ASYNC_BUFFER_SIZE = 100; + + /** + * How many times to retry an individual message. We eventually give up vs. retrying + * indefinitely in case there is something inherent to the message that is causing the failures. + * Giving up results in data loss, but also helps us reduce the risk of a poison pill blocking + * all process telemetry. + */ + public static final int MAX_ATTEMPTS_PER_MESSAGE = 100; + + /** Starting backoff millis when a transient socket failure is encountered. */ + public static final int MIN_BACKOFF_MILLIS = 50; + + /** Max backoff millis when a transient socket failure is encountered. */ + public static final int MAX_BACKOFF_MILLIS = 2000; + + /** Maximum amount of random jitter to apply to retries */ + public static final int MAX_BACKOFF_JITTER = 20; } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/config/Configuration.java b/src/main/java/software/amazon/cloudwatchlogs/emf/config/Configuration.java index 1584c5d7..a65b0e71 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/config/Configuration.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/config/Configuration.java @@ -18,8 +18,10 @@ import java.util.Optional; import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import software.amazon.cloudwatchlogs.emf.Constants; import software.amazon.cloudwatchlogs.emf.environment.Environments; import software.amazon.cloudwatchlogs.emf.util.StringUtils; @@ -54,6 +56,9 @@ public class Configuration { */ @Setter Environments environmentOverride; + /** Queue length for asynchronous sinks. */ + @Setter @Getter int asyncBufferSize = Constants.DEFAULT_ASYNC_BUFFER_SIZE; + public Optional getServiceName() { return getStringOptional(serviceName); } @@ -85,6 +90,6 @@ private Optional getStringOptional(String value) { if (StringUtils.isNullOrEmpty(value)) { return Optional.empty(); } - return Optional.ofNullable(value); + return Optional.of(value); } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationKeys.java b/src/main/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationKeys.java index 97e5a50a..e6247497 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationKeys.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationKeys.java @@ -27,4 +27,5 @@ public class ConfigurationKeys { public static final String LOG_STREAM_NAME = "LOG_STREAM_NAME"; public static final String AGENT_ENDPOINT = "AGENT_ENDPOINT"; public static final String ENVIRONMENT_OVERRIDE = "ENVIRONMENT"; + public static final String ASYNC_BUFFER_SIZE = "ASYNC_BUFFER_SIZE"; } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProvider.java b/src/main/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProvider.java index 7f2c4783..910e6e67 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProvider.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProvider.java @@ -16,6 +16,7 @@ package software.amazon.cloudwatchlogs.emf.config; +import software.amazon.cloudwatchlogs.emf.Constants; import software.amazon.cloudwatchlogs.emf.environment.Environments; import software.amazon.cloudwatchlogs.emf.util.StringUtils; @@ -27,21 +28,21 @@ protected EnvironmentConfigurationProvider() {} public static Configuration getConfig() { if (config == null) { - config = - new Configuration( - getEnvVar(ConfigurationKeys.SERVICE_NAME), - getEnvVar(ConfigurationKeys.SERVICE_TYPE), - getEnvVar(ConfigurationKeys.LOG_GROUP_NAME), - getEnvVar(ConfigurationKeys.LOG_STREAM_NAME), - getEnvVar(ConfigurationKeys.AGENT_ENDPOINT), - getEnvironmentOverride()); + config = createConfig(); } return config; } - private static String getEnvVar(String key) { - String name = String.join("", ConfigurationKeys.ENV_VAR_PREFIX, "_", key); - return getEnv(name); + static Configuration createConfig() { + return new Configuration( + getEnvVar(ConfigurationKeys.SERVICE_NAME), + getEnvVar(ConfigurationKeys.SERVICE_TYPE), + getEnvVar(ConfigurationKeys.LOG_GROUP_NAME), + getEnvVar(ConfigurationKeys.LOG_STREAM_NAME), + getEnvVar(ConfigurationKeys.AGENT_ENDPOINT), + getEnvironmentOverride(), + getIntOrDefault( + ConfigurationKeys.ASYNC_BUFFER_SIZE, Constants.DEFAULT_ASYNC_BUFFER_SIZE)); } private static Environments getEnvironmentOverride() { @@ -57,6 +58,24 @@ private static Environments getEnvironmentOverride() { } } + private static int getIntOrDefault(String key, int defaultValue) { + String value = getEnvVar(key); + if (StringUtils.isNullOrEmpty(value)) { + return defaultValue; + } + + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return defaultValue; + } + } + + private static String getEnvVar(String key) { + String name = String.join("", ConfigurationKeys.ENV_VAR_PREFIX, "_", key); + return getEnv(name); + } + private static String getEnv(String name) { return SystemWrapper.getenv(name); } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/AgentBasedEnvironment.java b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/AgentBasedEnvironment.java index d642c1c1..f401dd1e 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/AgentBasedEnvironment.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/AgentBasedEnvironment.java @@ -23,10 +23,11 @@ import software.amazon.cloudwatchlogs.emf.sinks.Endpoint; import software.amazon.cloudwatchlogs.emf.sinks.ISink; import software.amazon.cloudwatchlogs.emf.sinks.SocketClientFactory; +import software.amazon.cloudwatchlogs.emf.sinks.retry.FibonacciRetryStrategy; @Slf4j public abstract class AgentBasedEnvironment implements Environment { - private Configuration config; + private final Configuration config; private ISink sink; public AgentBasedEnvironment(Configuration config) { @@ -68,7 +69,13 @@ public ISink getSink() { getLogGroupName(), getLogStreamName(), endpoint, - new SocketClientFactory()); + new SocketClientFactory(), + config.getAsyncBufferSize(), + () -> + new FibonacciRetryStrategy( + Constants.MIN_BACKOFF_MILLIS, + Constants.MAX_BACKOFF_MILLIS, + Constants.MAX_BACKOFF_JITTER)); } return sink; } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/DefaultEnvironment.java b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/DefaultEnvironment.java index e3846ce0..eadb2d1b 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/DefaultEnvironment.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/DefaultEnvironment.java @@ -22,10 +22,10 @@ import software.amazon.cloudwatchlogs.emf.model.MetricsContext; @Slf4j -class DefaultEnvironment extends AgentBasedEnvironment { +public class DefaultEnvironment extends AgentBasedEnvironment { private Configuration config; - DefaultEnvironment(Configuration config) { + public DefaultEnvironment(Configuration config) { super(config); this.config = config; } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/EC2Environment.java b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/EC2Environment.java index 6a0f3252..6c650c1f 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/EC2Environment.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/EC2Environment.java @@ -26,7 +26,7 @@ import software.amazon.cloudwatchlogs.emf.model.MetricsContext; @Slf4j -class EC2Environment extends AgentBasedEnvironment { +public class EC2Environment extends AgentBasedEnvironment { private Configuration config; private EC2Metadata metadata; private ResourceFetcher fetcher; diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/ECSEnvironment.java b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/ECSEnvironment.java index 394fe24f..4256b7d2 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/ECSEnvironment.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/ECSEnvironment.java @@ -32,7 +32,7 @@ import software.amazon.cloudwatchlogs.emf.util.StringUtils; @Slf4j -class ECSEnvironment extends AgentBasedEnvironment { +public class ECSEnvironment extends AgentBasedEnvironment { private Configuration config; private ECSMetadata metadata; private ResourceFetcher fetcher; @@ -43,6 +43,10 @@ class ECSEnvironment extends AgentBasedEnvironment { private static final String FLUENT_HOST = "FLUENT_HOST"; private static final String ENVIRONMENT_TYPE = "AWS::ECS::Container"; + public ECSEnvironment(Configuration config) { + this(config, new ResourceFetcher()); + } + ECSEnvironment(Configuration config, ResourceFetcher fetcher) { super(config); this.config = config; diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/LambdaEnvironment.java b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/LambdaEnvironment.java index 269500fe..3a17ab3f 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/LambdaEnvironment.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/LambdaEnvironment.java @@ -23,7 +23,7 @@ import software.amazon.cloudwatchlogs.emf.sinks.ISink; /** An environment stands for the AWS Lambda environment. */ -class LambdaEnvironment implements Environment { +public class LambdaEnvironment implements Environment { private static final String AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV"; private static final String LAMBDA_FUNCTION_NAME = "AWS_LAMBDA_FUNCTION_NAME"; private static final String LAMBDA_FUNCTION_VERSION = "AWS_LAMBDA_FUNCTION_VERSION"; diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/LocalEnvironment.java b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/LocalEnvironment.java index b0ab7f35..09dc8c50 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/LocalEnvironment.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/LocalEnvironment.java @@ -24,7 +24,7 @@ import software.amazon.cloudwatchlogs.emf.sinks.ISink; @Slf4j -class LocalEnvironment implements Environment { +public class LocalEnvironment implements Environment { private ISink sink; private Configuration config; diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/ResourceFetcher.java b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/ResourceFetcher.java index f1faa813..9cd2534b 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/environment/ResourceFetcher.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/environment/ResourceFetcher.java @@ -29,7 +29,7 @@ import software.amazon.cloudwatchlogs.emf.util.Jackson; @Slf4j -class ResourceFetcher { +public class ResourceFetcher { /** Fetch a json object from a given uri and deserialize it to the specified class: clazz. */ T fetch(URI endpoint, Class clazz) { diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java index e637ed4c..13c5a48b 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java @@ -40,6 +40,12 @@ public MetricsLogger() { this(new EnvironmentProvider()); } + public MetricsLogger(Environment environment) { + context = new MetricsContext(); + environmentFuture = CompletableFuture.completedFuture(environment); + environmentProvider = null; // TODO: should do some refactoring here + } + public MetricsLogger(EnvironmentProvider environmentProvider) { this(environmentProvider, new MetricsContext()); } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSink.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSink.java index eea4e28e..f061c38d 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSink.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSink.java @@ -17,28 +17,61 @@ package software.amazon.cloudwatchlogs.emf.sinks; import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.cloudwatchlogs.emf.Constants; +import software.amazon.cloudwatchlogs.emf.exception.EMFClientException; import software.amazon.cloudwatchlogs.emf.model.MetricsContext; +import software.amazon.cloudwatchlogs.emf.sinks.retry.RetryStrategy; import software.amazon.cloudwatchlogs.emf.util.StringUtils; -/** An sink connecting to CloudWatch Agent. */ +/** An sink connecting to an agent over a socket. */ @Slf4j public class AgentSink implements ISink { private final String logGroupName; private final String logStreamName; private final SocketClient client; + private final ExecutorService executor; + private final Supplier retryStrategyFactory; + private final LinkedBlockingQueue queue; public AgentSink( String logGroupName, String logStreamName, Endpoint endpoint, - SocketClientFactory clientFactory) { + SocketClientFactory clientFactory, + int asyncQueueDepth, + Supplier retryStrategy) { this.logGroupName = logGroupName; this.logStreamName = logStreamName; client = clientFactory.getClient(endpoint); + queue = new LinkedBlockingQueue<>(asyncQueueDepth); + executor = createSingleThreadedExecutor(); + this.retryStrategyFactory = retryStrategy; + } + + private ExecutorService createSingleThreadedExecutor() { + return new ThreadPoolExecutor( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + queue, + new ThreadPoolExecutor.DiscardOldestPolicy()); } public void accept(MetricsContext context) { + if (executor.isShutdown()) { + throw new EMFClientException( + "Attempted to write data to a sink that has been previously shutdown."); + } + if (!StringUtils.isNullOrEmpty(logGroupName)) { context.putMetadata("LogGroupName", logGroupName); } @@ -49,10 +82,65 @@ public void accept(MetricsContext context) { try { for (String event : context.serialize()) { - client.sendMessage(event + "\n"); + executor.submit(new Sender(event, client, retryStrategyFactory)); } } catch (JsonProcessingException e) { log.error("Failed to serialize the metrics with the exception: ", e); } } + + @Override + public CompletableFuture shutdown() { + executor.shutdown(); + return CompletableFuture.supplyAsync( + () -> { + try { + while ((!executor.awaitTermination(1000, TimeUnit.MILLISECONDS))) { + // we add 1 because we assume that at least one task is running if the + // queue is blocked + log.debug( + "Waiting for graceful shutdown to complete. {} tasks pending.", + queue.size() + 1); + } + } catch (InterruptedException e) { + log.warn("Thread terminated while awaiting shutdown."); + } + return null; + }); + } + + @AllArgsConstructor + private static class Sender implements Runnable { + private final String event; + private final SocketClient client; + private final Supplier retryStrategyFactory; + + @Override + public void run() { + if (!StringUtils.isNullOrEmpty(event)) { + try { + sendMessageForMaxAttempts(); + } catch (InterruptedException e) { + log.warn("Thread was interrupted while sending EMF event."); + } + } + } + + private void sendMessageForMaxAttempts() throws InterruptedException { + RetryStrategy backoff = null; + + for (int i = 0; i < Constants.MAX_ATTEMPTS_PER_MESSAGE; i++) { + try { + client.sendMessage(event + "\n"); + return; + } catch (Exception e) { + log.debug( + "Failed to write the message to the socket. Backing off and trying again.", + e); + backoff = backoff != null ? backoff : retryStrategyFactory.get(); + Thread.sleep(backoff.next()); + } + } + } + } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ConsoleSink.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ConsoleSink.java index 3501a000..f35bbd0d 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ConsoleSink.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ConsoleSink.java @@ -17,6 +17,7 @@ package software.amazon.cloudwatchlogs.emf.sinks; import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.concurrent.CompletableFuture; import lombok.Builder; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -41,4 +42,9 @@ public void accept(MetricsContext context) { log.error("Failed to serialize a MetricsContext: ", e); } } + + @Override + public CompletableFuture shutdown() { + return CompletableFuture.completedFuture(null); + } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ISink.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ISink.java index dd78fa28..dc1216ec 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ISink.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/ISink.java @@ -16,6 +16,7 @@ package software.amazon.cloudwatchlogs.emf.sinks; +import java.util.concurrent.CompletableFuture; import software.amazon.cloudwatchlogs.emf.model.MetricsContext; /** Interface for sinking log items to CloudWatch. */ @@ -27,4 +28,14 @@ public interface ISink { * @param context MetricsContext */ void accept(MetricsContext context); + + /** + * Shutdown the sink. The returned {@link CompletableFuture} will be completed when all queued + * events have been flushed. After this is called, no more metrics can be sent through this sink + * and attempting to continue to re-use the sink will result in undefined behavior. + * + * @return a future that completes when the shutdown has completed successfully and all pending + * messages have been sent to the destination. + */ + CompletableFuture shutdown(); } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/MultiSink.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/MultiSink.java index b40b6ae4..3d0fa59c 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/MultiSink.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/MultiSink.java @@ -17,6 +17,7 @@ package software.amazon.cloudwatchlogs.emf.sinks; import java.util.List; +import java.util.concurrent.CompletableFuture; import lombok.Builder; import lombok.NonNull; import lombok.Singular; @@ -29,7 +30,7 @@ */ @Builder public class MultiSink implements ISink { - @Singular @NonNull private List sinks; + @Singular @NonNull private final List sinks; @Override public void accept(MetricsContext context) { @@ -37,4 +38,14 @@ public void accept(MetricsContext context) { sink.accept(context); } } + + @Override + public CompletableFuture shutdown() { + @SuppressWarnings("rawtypes") + final CompletableFuture[] list = new CompletableFuture[sinks.size()]; + for (int i = 0; i < sinks.size(); i++) { + list[i] = sinks.get(i).shutdown(); + } + return CompletableFuture.allOf(list); + } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java index 225a360e..3ae7f43a 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java @@ -31,17 +31,17 @@ public class TCPClient implements SocketClient { private boolean shouldConnect = true; public TCPClient(Endpoint endpoint) { - socket = createSocket(); this.endpoint = endpoint; } private void connect() { try { + socket = createSocket(); socket.connect(new InetSocketAddress(endpoint.getHost(), endpoint.getPort())); shouldConnect = false; } catch (Exception e) { - log.error("Failed to connect to the socket due to the exception: ", e); shouldConnect = true; + throw new RuntimeException("Failed to connect to the socket.", e); } } @@ -51,7 +51,7 @@ protected Socket createSocket() { @Override public synchronized void sendMessage(String message) { - if (socket.isClosed() || shouldConnect) { + if (socket == null || socket.isClosed() || shouldConnect) { connect(); } @@ -59,19 +59,16 @@ public synchronized void sendMessage(String message) { try { os = socket.getOutputStream(); } catch (IOException e) { - log.error("Failed to open output stream: ", e); - connect(); - return; + shouldConnect = true; + throw new RuntimeException( + "Failed to write message to the socket. Failed to open output stream.", e); } try { os.write(message.getBytes()); - } catch (IOException e) { - log.error("Could not send write request due to IOException: ", e); - connect(); } catch (Exception e) { - log.error("Could not send write request due to Exception: ", e); - connect(); + shouldConnect = true; + throw new RuntimeException("Failed to write message to the socket.", e); } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/retry/FibonacciRetryStrategy.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/retry/FibonacciRetryStrategy.java new file mode 100644 index 00000000..d3328163 --- /dev/null +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/retry/FibonacciRetryStrategy.java @@ -0,0 +1,32 @@ +package software.amazon.cloudwatchlogs.emf.sinks.retry; + +import java.util.concurrent.ThreadLocalRandom; +import org.javatuples.Pair; + +/** + * A Fibonacci sequence with an upper bound. Once the upper limit is hit, all subsequent calls to + * `next()` will return the provided limit. + */ +public class FibonacciRetryStrategy implements RetryStrategy { + private final int maxJitter; + private final int upperBound; + + Pair cursor; + + public FibonacciRetryStrategy(int start, int upperBound, int maxJitter) { + cursor = new Pair<>(start, start); + this.upperBound = upperBound; + this.maxJitter = maxJitter; + } + + public int next() { + int nextValue = cursor.getValue0() + cursor.getValue1(); + if (cursor.getValue1() >= upperBound) { + cursor = new Pair<>(upperBound, upperBound); + } else { + cursor = new Pair<>(cursor.getValue1(), nextValue); + } + int jitter = maxJitter > 0 ? ThreadLocalRandom.current().nextInt(maxJitter) : 0; + return cursor.getValue0() + jitter; + } +} diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/retry/RetryStrategy.java b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/retry/RetryStrategy.java new file mode 100644 index 00000000..6f6bb596 --- /dev/null +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/sinks/retry/RetryStrategy.java @@ -0,0 +1,10 @@ +package software.amazon.cloudwatchlogs.emf.sinks.retry; + +public interface RetryStrategy { + /** + * Gets the amount of time to wait in millis before retrying + * + * @return millis to wait before retrying + */ + int next(); +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationTest.java index 05ee4470..1859190a 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/config/ConfigurationTest.java @@ -35,7 +35,7 @@ public void setUp() { } @Test - public void testReturnEmptyIfNotSet() { + public void testReturnEmptyOrDefaultIfNotSet() { assertFalse(config.getAgentEndpoint().isPresent()); assertFalse(config.getLogGroupName().isPresent()); assertFalse(config.getLogStreamName().isPresent()); @@ -43,6 +43,7 @@ public void testReturnEmptyIfNotSet() { assertFalse(config.getServiceName().isPresent()); assertEquals(config.getEnvironmentOverride(), Environments.Unknown); + assertEquals(config.getAsyncBufferSize(), 100); } @Test @@ -70,12 +71,15 @@ public void testReturnCorrectValueAfterSet() { String expectedServiceType = faker.letterify("????"); String expectedServiceName = faker.letterify("????"); Environments expectedEnvironment = Environments.Agent; + int expectedAsyncBufferSize = faker.number().randomDigit(); + config.setAgentEndpoint(expectedEndpoint); config.setLogGroupName(expectedLogGroupName); config.setLogStreamName(expectedLogStreamName); config.setServiceType(expectedServiceType); config.setServiceName(expectedServiceName); config.setEnvironmentOverride(expectedEnvironment); + config.setAsyncBufferSize(expectedAsyncBufferSize); assertEquals(config.getAgentEndpoint().get(), expectedEndpoint); assertEquals(config.getLogGroupName().get(), expectedLogGroupName); @@ -83,5 +87,6 @@ public void testReturnCorrectValueAfterSet() { assertEquals(config.getServiceType().get(), expectedServiceType); assertEquals(config.getServiceName().get(), expectedServiceName); assertEquals(config.getEnvironmentOverride(), expectedEnvironment); + assertEquals(config.getAsyncBufferSize(), expectedAsyncBufferSize); } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProviderTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProviderTest.java index ac3f2b95..28c3b017 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProviderTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/config/EnvironmentConfigurationProviderTest.java @@ -18,7 +18,6 @@ import static org.junit.Assert.*; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; @@ -40,14 +39,30 @@ public void getGetConfig() { putEnv("AWS_EMF_LOG_STREAM_NAME", "TestLogStream"); putEnv("AWS_EMF_AGENT_ENDPOINT", "Endpoint"); putEnv("AWS_EMF_ENVIRONMENT", "Agent"); - Configuration config = EnvironmentConfigurationProvider.getConfig(); + putEnv("AWS_EMF_ASYNC_BUFFER_SIZE", "9999"); + + Configuration config = EnvironmentConfigurationProvider.createConfig(); assertEquals(config.getServiceName().get(), "TestServiceName"); assertEquals(config.getServiceType().get(), "TestServiceType"); assertEquals(config.getLogGroupName().get(), "TestLogGroup"); assertEquals(config.getLogStreamName().get(), "TestLogStream"); assertEquals(config.getAgentEndpoint().get(), "Endpoint"); - Assert.assertEquals(config.getEnvironmentOverride(), Environments.Agent); + assertEquals(config.getEnvironmentOverride(), Environments.Agent); + assertEquals(config.getAsyncBufferSize(), 9999); + } + + @Test + public void invalidEnvironmentValuesFallbackToExpectedDefaults() { + // arrange + PowerMockito.mockStatic(SystemWrapper.class); + + // act + putEnv("AWS_EMF_ASYNC_BUFFER_SIZE", "NaN"); + + // assert + Configuration config = EnvironmentConfigurationProvider.createConfig(); + assertEquals(100, config.getAsyncBufferSize()); } private void putEnv(String key, String value) { diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSinkTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSinkTest.java index 51ba7868..54efb24a 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSinkTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/AgentSinkTest.java @@ -17,6 +17,7 @@ package software.amazon.cloudwatchlogs.emf.sinks; import static junit.framework.TestCase.*; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -24,44 +25,22 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; import java.util.Map; -import org.junit.Before; +import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; +import software.amazon.cloudwatchlogs.emf.Constants; +import software.amazon.cloudwatchlogs.emf.exception.EMFClientException; import software.amazon.cloudwatchlogs.emf.model.MetricsContext; +import software.amazon.cloudwatchlogs.emf.sinks.retry.RetryStrategy; @SuppressWarnings("unchecked") public class AgentSinkTest { - private SocketClientFactory factory; - private TestClient client; - - class TestClient implements SocketClient { - - private String message; - - @Override - public void sendMessage(String message) { - this.message = message; - } - - public String getMessage() { - return this.message; - } - - @Override - public void close() {} - } - - @Before - public void setUp() { - factory = mock(SocketClientFactory.class); - - client = new TestClient(); - when(factory.getClient(any())).thenReturn(client); - } - @Test public void testAccept() throws JsonProcessingException { + // arrange + Fixture fixture = new Fixture(); String prop = "TestProp"; String propValue = "TestPropValue"; String logGroupName = "TestLogGroup"; @@ -73,14 +52,24 @@ public void testAccept() throws JsonProcessingException { mc.putMetric("Time", 10); AgentSink sink = - new AgentSink(logGroupName, logStreamName, Endpoint.DEFAULT_TCP_ENDPOINT, factory); + new AgentSink( + logGroupName, + logStreamName, + Endpoint.DEFAULT_TCP_ENDPOINT, + fixture.factory, + 1, + InstantRetryStrategy::new); + // act sink.accept(mc); + sink.shutdown().join(); + // assert ObjectMapper objectMapper = new ObjectMapper(); Map emf_map = objectMapper.readValue( - client.getMessage(), new TypeReference>() {}); + fixture.client.getMessages().get(0), + new TypeReference>() {}); Map metadata = (Map) emf_map.get("_aws"); assertEquals(emf_map.get(prop), propValue); @@ -91,19 +80,251 @@ public void testAccept() throws JsonProcessingException { @Test public void testEmptyLogGroupName() throws JsonProcessingException { + // arrange + Fixture fixture = new Fixture(); String logGroupName = ""; - AgentSink sink = new AgentSink(logGroupName, null, Endpoint.DEFAULT_TCP_ENDPOINT, factory); + AgentSink sink = + new AgentSink( + logGroupName, + null, + Endpoint.DEFAULT_TCP_ENDPOINT, + fixture.factory, + 1, + InstantRetryStrategy::new); MetricsContext mc = new MetricsContext(); mc.putMetric("Time", 10); + // act sink.accept(mc); + sink.shutdown().join(); + + // assert ObjectMapper objectMapper = new ObjectMapper(); Map emf_map = objectMapper.readValue( - client.getMessage(), new TypeReference>() {}); + fixture.client.getMessages().get(0), new TypeReference<>() {}); Map metadata = (Map) emf_map.get("_aws"); assertFalse(metadata.containsKey("LogGroupName")); assertFalse(metadata.containsKey("LogStreamName")); } + + @Test + public void testFailuresAreRetried() { + // arrange + Fixture fixture = new Fixture(); + fixture.client.messagesToFail = Constants.MAX_ATTEMPTS_PER_MESSAGE - 1; + AgentSink sink = + new AgentSink( + "", + null, + Endpoint.DEFAULT_TCP_ENDPOINT, + fixture.factory, + 1, + InstantRetryStrategy::new); + + MetricsContext mc = new MetricsContext(); + mc.putMetric("Time", 10); + + // act + sink.accept(mc); + sink.shutdown().join(); + + // assert + assertEquals(Constants.MAX_ATTEMPTS_PER_MESSAGE - 1, fixture.client.messagesFailed); + assertEquals(1, fixture.client.messagesSent); + } + + @Test + public void testFailuresAreRetriedWithMaximumLimit() { + // arrange + Fixture fixture = new Fixture(); + fixture.client.messagesToFail = Constants.MAX_ATTEMPTS_PER_MESSAGE + 1; + AgentSink sink = + new AgentSink( + "", + null, + Endpoint.DEFAULT_TCP_ENDPOINT, + fixture.factory, + 1, + InstantRetryStrategy::new); + + MetricsContext mc = new MetricsContext(); + mc.putMetric("Time", 10); + + // act + sink.accept(mc); + sink.shutdown().join(); + + // assert + assertEquals(Constants.MAX_ATTEMPTS_PER_MESSAGE, fixture.client.messagesFailed); + assertEquals(0, fixture.client.messagesSent); + } + + @Test + public void failedMessagesAreQueued() { + // arrange + Fixture fixture = new Fixture(); + fixture.client.messagesToFail = Constants.MAX_ATTEMPTS_PER_MESSAGE * 2; + AgentSink sink = + new AgentSink( + "", + null, + Endpoint.DEFAULT_TCP_ENDPOINT, + fixture.factory, + 1, + InstantRetryStrategy::new); + + MetricsContext mc = new MetricsContext(); + mc.putMetric("Time", 10); + + // act + sink.accept(mc); + sink.accept(mc); + + sink.shutdown().join(); + + // assert + assertEquals(Constants.MAX_ATTEMPTS_PER_MESSAGE * 2, fixture.client.messagesFailed); + assertEquals(0, fixture.client.messagesSent); + } + + @Test + public void queuedMessagesAreBounded() { + // arrange + Fixture fixture = new Fixture(); + fixture.client.messagesToFail = Constants.MAX_ATTEMPTS_PER_MESSAGE * 3; + AgentSink sink = + new AgentSink( + "", + null, + Endpoint.DEFAULT_TCP_ENDPOINT, + fixture.factory, + 2, + InstantRetryStrategy::new); + + MetricsContext mc = new MetricsContext(); + mc.putMetric("Time", 10); + + // act + sink.accept(mc); + sink.accept(mc); + + sink.shutdown().join(); + + // assert + assertEquals(Constants.MAX_ATTEMPTS_PER_MESSAGE * 2, fixture.client.messagesFailed); + assertEquals(0, fixture.client.messagesSent); + } + + @Test + public void oldestMessagesAreDropped() { + // arrange + Fixture fixture = new Fixture(); + AgentSink sink = + new AgentSink( + "", + null, + Endpoint.DEFAULT_TCP_ENDPOINT, + fixture.factory, + 1, + InstantRetryStrategy::new); + + // prevent any message from being sent by the client yet + fixture.client.lock.lock(); + + // two different payloads + // we'll fill the queue with the first one and then insert the second + MetricsContext send = new MetricsContext(); + send.putMetric("SEND", 10); + + MetricsContext shouldDrop = new MetricsContext(); + shouldDrop.putMetric("DROP", 10); + + // act + sink.accept(send); + sink.accept( + shouldDrop); // this goes in second because the first message will be pulled off the + // queue immediately + sink.accept(send); // this one should overwrite the previous message + fixture.client.lock.unlock(); + sink.shutdown().join(); + + // assert + assertEquals(0, fixture.client.messagesFailed); + assertEquals(2, fixture.client.messagesSent); + fixture.client.messages.forEach(message -> assertFalse(message.contains("DONT_SEND"))); + } + + @Test + public void cannotEnqueueDataAfterShuttingDownSink() { + // arrange + Fixture fixture = new Fixture(); + AgentSink sink = + new AgentSink( + "", + null, + Endpoint.DEFAULT_TCP_ENDPOINT, + fixture.factory, + 1, + InstantRetryStrategy::new); + + // act + sink.shutdown(); + + // assert + assertThrows(EMFClientException.class, () -> sink.accept(new MetricsContext())); + } + + class Fixture { + SocketClientFactory factory; + TestClient client; + + Fixture() { + factory = mock(SocketClientFactory.class); + client = new TestClient(); + when(factory.getClient(any())).thenReturn(client); + } + } + + class TestClient implements SocketClient { + + private final ArrayList messages = new ArrayList<>(); + + // use this lock to control concurrency and block writing / retires + // to the socket + private final ReentrantLock lock = new ReentrantLock(); + + private int messagesSent = 0; + private int messagesFailed = 0; + private int messagesToFail = 0; + + @Override + public void sendMessage(String message) { + if (messagesToFail > messagesFailed) { + messagesFailed++; + throw new RuntimeException("Failed to send message"); + } else { + messagesSent++; + lock.lock(); + this.messages.add(message); + lock.unlock(); + } + } + + public ArrayList getMessages() { + return this.messages; + } + + @Override + public void close() {} + } + + class InstantRetryStrategy implements RetryStrategy { + + @Override + public int next() { + return 0; + } + } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/MultiSinkTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/MultiSinkTest.java new file mode 100644 index 00000000..103fd346 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/MultiSinkTest.java @@ -0,0 +1,68 @@ +package software.amazon.cloudwatchlogs.emf.sinks; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CompletableFuture; +import lombok.Getter; +import org.junit.Test; +import software.amazon.cloudwatchlogs.emf.model.MetricsContext; + +public class MultiSinkTest { + @Test + public void shutdownClosesAllComponentSinks() { + // arrange + TestSink sink1 = new TestSink(); + TestSink sink2 = new TestSink(); + MultiSink multiSink = MultiSink.builder().sink(sink1).sink(sink2).build(); + + // act + CompletableFuture future = multiSink.shutdown(); + + // assert + assertTrue(future.isDone()); + assertEquals(1, sink1.getShutdowns()); + assertEquals(1, sink2.getShutdowns()); + } + + @Test + public void shutdownCompletesExceptionallyIfComponentSinkCompletesExceptionally() { + // arrange + CompletableFuture failedResult = + CompletableFuture.failedFuture(new RuntimeException()); + TestSink sink1 = new TestSink(); + TestSink sink2 = new TestSink(failedResult); + MultiSink multiSink = MultiSink.builder().sink(sink1).sink(sink2).build(); + + // act + CompletableFuture future = multiSink.shutdown(); + + // assert + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); + assertEquals(1, sink1.getShutdowns()); + assertEquals(1, sink2.getShutdowns()); + } + + private static class TestSink implements ISink { + private final CompletableFuture shutdownResult; + @Getter int shutdowns = 0; + + TestSink() { + this.shutdownResult = CompletableFuture.completedFuture(null); + } + + TestSink(CompletableFuture shutdownResult) { + this.shutdownResult = shutdownResult; + } + + @Override + public void accept(MetricsContext context) {} + + @Override + public CompletableFuture shutdown() { + shutdowns += 1; + return shutdownResult; + } + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/SinkShunt.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/SinkShunt.java index 9b0bf18d..6f6be098 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/SinkShunt.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/SinkShunt.java @@ -17,6 +17,7 @@ package software.amazon.cloudwatchlogs.emf.sinks; import java.util.List; +import java.util.concurrent.CompletableFuture; import software.amazon.cloudwatchlogs.emf.model.MetricsContext; public class SinkShunt implements ISink { @@ -35,6 +36,11 @@ public void accept(MetricsContext context) { } } + @Override + public CompletableFuture shutdown() { + return CompletableFuture.completedFuture(null); + } + public MetricsContext getContext() { return context; } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/retry/FibonacciRetryStrategyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/retry/FibonacciRetryStrategyTest.java new file mode 100644 index 00000000..342497af --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/retry/FibonacciRetryStrategyTest.java @@ -0,0 +1,37 @@ +package software.amazon.cloudwatchlogs.emf.sinks.retry; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; +import software.amazon.cloudwatchlogs.emf.Constants; + +public class FibonacciRetryStrategyTest { + @Test + public void testDefaultRetryMatchesExpectedSequence() { + // arrange + int start = Constants.MIN_BACKOFF_MILLIS; + int max = Constants.MAX_BACKOFF_MILLIS; + FibonacciRetryStrategy strategy = new FibonacciRetryStrategy(start, max, 0); + + List expectedSequence = + Arrays.asList(50, 100, 150, 250, 400, 650, 1050, 1700, 2000); + List actualSequence = new ArrayList<>(); + + // act + int last = start; + while (last < max) { + last = strategy.next(); + actualSequence.add(last); + } + + // assert + assertEquals(expectedSequence, actualSequence); + + // verify subsequent calls don't exceed max bounds + assertEquals(Constants.MAX_BACKOFF_MILLIS, strategy.next()); + assertEquals(Constants.MAX_BACKOFF_MILLIS, strategy.next()); + } +}