From 237aa98dd4017f7a6271f73a8b5fe7d3952251de Mon Sep 17 00:00:00 2001 From: Xinyu Bao <71293855+Stephen-Bao@users.noreply.github.com> Date: Wed, 24 Aug 2022 23:21:20 -0400 Subject: [PATCH] Thread safety enhancements (#111) * Introduced changes to make the library thread-safe. * Added thread-safety tests for the library. * Added jmh benchmarking for ReadWriteLock & StampedLock * Adjusted the code format * Made some changes to test cases to introduce more concurrency * Finished concurrency test case revision * Changed benchmark to use fixed batch size and updated the results * Added benchmark section in README * Added documentation on synchronization policy * Adjusted the code format * retry integ test * code format * made a minor change * rerun integ test * Added thread-safety measures to newly merged features * Moving literals to constants Co-authored-by: Stephen-Bao --- README.md | 26 + build.gradle | 7 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../emf/MetricsLoggerBenchmark.java | 349 +++++++++++++ .../emf/logger/MetricsLogger.java | 88 +++- .../cloudwatchlogs/emf/model/Metadata.java | 4 +- .../emf/model/MetricDirective.java | 27 +- .../cloudwatchlogs/emf/model/RootNode.java | 3 +- .../logger/MetricsLoggerThreadSafetyTest.java | 472 ++++++++++++++++++ .../MetricDirectiveThreadSafetyTest.java | 101 ++++ .../emf/model/MetricsContextTest.java | 10 + .../model/MetricsContextThreadSafetyTest.java | 55 ++ .../emf/sinks/GroupedSinkShunt.java | 57 +++ 13 files changed, 1163 insertions(+), 38 deletions(-) create mode 100644 src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java create mode 100644 src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java diff --git a/README.md b/README.md index fee65ca5..596c59fc 100644 --- a/README.md +++ b/README.md @@ -346,6 +346,24 @@ config.setAgentEndpoint("udp://127.0.0.1:1000"); AWS_EMF_AGENT_ENDPOINT="udp://127.0.0.1:1000" ``` +## Thread-safety + +### Internal Synchronization + +The MetricsLogger class is thread-safe. Specifically, the generalized multi-threading use cases for this library are: + +1. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into one or more async contexts where new metrics or metadata can be added concurrently; Join the async contexts (e.g. Future.get()) and flush the metrics. +2. Collect some metrics or metadata on a single MetricsLogger; Pass the logger into an async context; Flush from the async context concurrently. + +Thread-safety for the first use case is achieved by introducing concurrent internal data structures and atomic operations associated with these models, to ensure the access to shared mutable resources are always synchronized. + +Thread-safety for the second use case is achieved by using a ReentrantReadWriteLock. This lock is used to create an internal sync context for flush() method in multi-threading situations. `flush()` acquires write lock, while other methods (which have access to mutable shared data with `flush()`) acquires read lock. This makes sure `flush()` is always executed exclusively, while other methods can be executed concurrently. + +### Use Cases that are Not Covered + +With all the internal synchronization measures, however, there're still certain multi-threading use cases that are not covered by this library, which might require external synchronizations or other protection measures. +This is due to the fact that the execution order of APIs are not determined in async contexts. For example, if user needs to associate a given set of properties with a metric in each thread, the results are not guaranteed since the execution order of `putProperty()` is not determined across threads. In such cases, we recommend using a different MetricsLogger instance for different threads, so that no resources are shared and no thread-safety problem would ever happen. Note that this can often be simplified by using a ThreadLocal variable. + ## Examples Check out the [examples](https://github.com/awslabs/aws-embedded-metrics-java/tree/master/examples) directory to get started. @@ -392,6 +410,14 @@ To auto fix code style, run ./gradlew :spotlessApply ``` +### Benchmark + +We use [JMH](https://github.com/openjdk/jmh) as our framework for concurrency performance benchmarking. Benchmarks can be run by: +``` +./gradlew jmh +``` +To run a single benchmark, consider using JMH plugins. For example, [JMH plugin for IntelliJ IDEA](https://github.com/artyushov/idea-jmh-plugin) + ## License This project is licensed under the Apache-2.0 License. diff --git a/build.gradle b/build.gradle index db0a2026..b21f21f3 100644 --- a/build.gradle +++ b/build.gradle @@ -18,6 +18,7 @@ plugins { id 'com.diffplug.spotless' version '5.8.2' id 'maven-publish' id 'signing' + id "me.champeau.jmh" version "0.6.6" } group "software.amazon.cloudwatchlogs" @@ -78,6 +79,10 @@ dependencies { testImplementation 'software.amazon.awssdk:cloudwatch:2.13.54' testCompileOnly 'org.projectlombok:lombok:1.18.12' testAnnotationProcessor 'org.projectlombok:lombok:1.18.12' + + implementation 'org.openjdk.jmh:jmh-core:1.29' + implementation 'org.openjdk.jmh:jmh-generator-annprocess:1.29' + jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.29' } spotless { @@ -124,7 +129,7 @@ tasks.withType(JavaCompile) { } tasks.named('wrapper') { - gradleVersion = '6.5.1' + gradleVersion = '7.4.2' distributionType = Wrapper.DistributionType.ALL } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ac33e994..92f06b50 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.5.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java new file mode 100644 index 00000000..7b273a6c --- /dev/null +++ b/src/jmh/java/software/amazon/cloudwatchlogs/emf/MetricsLoggerBenchmark.java @@ -0,0 +1,349 @@ +package software.amazon.cloudwatchlogs.emf; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.*; +import software.amazon.cloudwatchlogs.emf.environment.Environment; +import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; +import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger; +import software.amazon.cloudwatchlogs.emf.model.DimensionSet; +import software.amazon.cloudwatchlogs.emf.sinks.SinkShunt; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1) +public class MetricsLoggerBenchmark { + private MetricsLogger logger; + private EnvironmentProvider envProvider; + private SinkShunt sink; + private Environment environment; + + @Setup + public void setUp() { + envProvider = mock(EnvironmentProvider.class); + environment = mock(Environment.class); + sink = new SinkShunt(); + + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(sink); + logger = new MetricsLogger(envProvider); + } + + /** + * Publishing 10000 metrics with single thread. no lock: 0.844 ms/op; RW lock: 0.896 ms/op; S + * lock: 0.884 ms/op + */ + @Benchmark + public void measurePutMetric() { + logger = new MetricsLogger(envProvider); // 0.024 ms/op + + // should make this op dominate running time + for (int i = 0; i < 10000; i++) { + logger.putMetric("Metric-" + i, i); + } + } + + /** Flush with single thread. no lock: 0.148 ms/op; RW lock: 0.148 ms/op; S lock: 0.147 ms/op */ + @Benchmark + public void measureFlush() { + logger = new MetricsLogger(envProvider); + + logger.flush(); + + sink.shutdown(); + } + + /** + * Invoke all methods 100 times with single thread. no lock: 6.946 ms/op; RW lock: 6.988 ms/op; + * S lock: 6.823 ms/op + */ + @Benchmark + public void measureAllMethods() { + logger = new MetricsLogger(envProvider); + + for (int j = 0; j < 100; j++) { + logger.putMetadata("MetaData-" + j, j); + logger.putProperty("Property-" + j, j); + logger.putDimensions(DimensionSet.of("Dim-" + j, String.valueOf(j))); + logger.putMetric("Metric-" + j, j); + logger.flush(); + } + + sink.shutdown(); + } + + /** + * Each thread publishes 1000 metrics, 10 threads in total. no lock: 0.949 ms/op; RW lock: 3.823 + * ms/op; S lock: 3.078 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith10Threads() throws InterruptedException { + measurePutMetricWithNThreads(10); + } + + /** + * Each thread publishes 1000 metrics, 20 threads in total. no lock: 1.860 ms/op; RW lock: 9.806 + * ms/op; S lock: 7.929 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith20Threads() throws InterruptedException { + measurePutMetricWithNThreads(20); + } + + /** + * Each thread publishes 1000 metrics, 50 threads in total. no lock: 6.548 ms/op; RW lock: + * 28.754 ms/op; S lock: 24.700 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith50Threads() throws InterruptedException { + measurePutMetricWithNThreads(50); + } + + /** + * Each thread publishes 1000 metrics, 200 threads in total. no lock: 37.662 ms/op; RW lock: + * 135.824 ms/op; S lock: 114.467 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measurePutMetricWith200Threads() throws InterruptedException { + measurePutMetricWithNThreads(200); + } + + /** + * Each thread publishes 1000 metrics, 500 threads in total. no lock: 90.148 ms/op; RW lock: + * 345.197 ms/op; S lock: 287.908 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measurePutMetricWith500Threads() throws InterruptedException { + measurePutMetricWithNThreads(500); + } + + /** + * Each thread flushes 100 times, 10 threads in total. no lock: 12.900 ms/op; RW lock: 25.015 + * ms/op; S lock: 24.778 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith10Threads() throws InterruptedException { + measureFlushWithNThreads(10); + } + + /** + * Each thread flushes 100 times, 20 threads in total. no lock: 20.824 ms/op; RW lock: 47.123 + * ms/op; S lock: 48.511 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith20Threads() throws InterruptedException { + measureFlushWithNThreads(20); + } + + /** + * Each thread flushes 100 times, 50 threads in total. no lock: 77.463 ms/op; RW lock: 121.857 + * ms/op; S lock: 125.212 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith50Threads() throws InterruptedException { + measureFlushWithNThreads(50); + } + + /** + * Each thread flushes 100 times, 200 threads in total. no lock: 390.252 ms/op; RW lock: 474.439 + * ms/op; S lock: 488.809 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureFlushWith200Threads() throws InterruptedException { + measureFlushWithNThreads(200); + } + + /** + * Each thread flushes 100 times, 500 threads in total. no lock: 300.280 ms/op; RW lock: + * 1161.098 ms/op; S lock: 1247.972 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureFlushWith500Threads() throws InterruptedException { + measureFlushWithNThreads(500); + } + + /** + * Each thread executes all methods 100 times, 10 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 7.215 ms/op; RW lock: + * 32.159; S lock: 34.226 + * + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith10Threads() throws InterruptedException { + measureAllMethodsWithNThreads(10); + } + + /** + * Each thread executes all methods 100 times, 20 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 11.833 ms/op; RW lock: + * 60.510 ms/op; S lock: 75.125 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith20Threads() throws InterruptedException { + measureAllMethodsWithNThreads(20); + } + + /** + * Each thread executes all methods 100 times, 50 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 36.051 ms/op; RW lock: + * 150.022 ms/op; S lock: 244.934 ms/op + * + * @throws InterruptedException + */ + @Benchmark + public void measureAllMethodsWith50Threads() throws InterruptedException { + measureAllMethodsWithNThreads(50); + } + + /** + * Each thread executes all methods 100 times, 200 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 108.775 ms/op; RW lock: + * 629.826 ms/op; S lock: 1220.959 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureAllMethodsWith200Threads() throws InterruptedException { + measureAllMethodsWithNThreads(200); + } + + /** + * Each thread executes all methods 100 times, 500 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 335.183 ms/op; RW lock: + * 1741.003 ms/op; S lock: 4192.327 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 10) + @Measurement(time = 10) + public void measureAllMethodsWith500Threads() throws InterruptedException { + measureAllMethodsWithNThreads(500); + } + + /** + * Each thread executes all methods 100 times, 1000 threads in total. no lock (need to sync + * getAllDimensions() & getAllDimensionKeys() in MetricsDirective): 575.339 ms/op; RW lock: + * 3230.403 ms/op; S lock: 13519.459 ms/op + * + * @throws InterruptedException + */ + @Benchmark + @Warmup(time = 20) + @Measurement(time = 20) + public void measureAllMethodsWith1000Threads() throws InterruptedException { + measureAllMethodsWithNThreads(1000); + } + + private void measurePutMetricWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + int batchSize = 1000; + + for (int i = 0; i < n; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.putMetric("Metric-" + j, j); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + } + + private void measureFlushWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + int batchSize = 100; + + for (int i = 0; i < n; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.flush(); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + sink.shutdown(); + } + + private void measureAllMethodsWithNThreads(int n) throws InterruptedException { + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[n]; + int batchSize = 100; + + for (int i = 0; i < n; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + for (int j = batchSize * id; j < batchSize * id + batchSize; j++) { + logger.putMetric("Metric-" + j, j); + logger.putProperty("Property-" + j, j); + logger.putMetadata("MetaData-" + j, j); + logger.setDimensions( + DimensionSet.of("Dim-" + j, String.valueOf(j))); + + logger.flush(); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + sink.shutdown(); + } +} diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java index 459014b1..8f9d6773 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLogger.java @@ -18,6 +18,8 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -37,6 +39,13 @@ public class MetricsLogger { private MetricsContext context; private CompletableFuture environmentFuture; private EnvironmentProvider environmentProvider; + /** + * This lock is used to create an internal sync context for flush() method in multi-threaded + * situations. Flush() acquires write lock, other methods (accessing mutable shared data with + * flush()) acquires read lock. This makes sure flush() is executed exclusively, while other + * methods can be executed concurrently. + */ + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); @Getter @Setter private boolean flushPreserveDimensions = true; @@ -72,13 +81,18 @@ public void flush() { environment = environmentProvider.getDefaultEnvironment(); } - ISink sink = environment.getSink(); - configureContextForEnvironment(context, environment); - sink.accept(context); - context = - flushPreserveDimensions - ? context.createCopyWithContext() - : context.createCopyWithContextWithoutDimensions(); + rwl.writeLock().lock(); + try { + ISink sink = environment.getSink(); + configureContextForEnvironment(context, environment); + sink.accept(context); + context = + flushPreserveDimensions + ? context.createCopyWithContext() + : context.createCopyWithContextWithoutDimensions(); + } finally { + rwl.writeLock().unlock(); + } } /** @@ -91,8 +105,11 @@ public void flush() { * @return the current logger */ public MetricsLogger putProperty(String key, Object value) { - this.context.putProperty(key, value); - return this; + return applyReadLock( + () -> { + this.context.putProperty(key, value); + return this; + }); } /** @@ -107,8 +124,11 @@ public MetricsLogger putProperty(String key, Object value) { * @return the current logger */ public MetricsLogger putDimensions(DimensionSet dimensions) { - context.putDimension(dimensions); - return this; + return applyReadLock( + () -> { + context.putDimension(dimensions); + return this; + }); } /** @@ -121,8 +141,11 @@ public MetricsLogger putDimensions(DimensionSet dimensions) { * @return the current logger */ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { - context.setDimensions(dimensionSets); - return this; + return applyReadLock( + () -> { + context.setDimensions(dimensionSets); + return this; + }); } /** @@ -134,8 +157,11 @@ public MetricsLogger setDimensions(DimensionSet... dimensionSets) { * @return the current logger */ public MetricsLogger setDimensions(boolean useDefault, DimensionSet... dimensionSets) { - context.setDimensions(useDefault, dimensionSets); - return this; + return applyReadLock( + () -> { + context.setDimensions(useDefault, dimensionSets); + return this; + }); } /** @@ -146,8 +172,11 @@ public MetricsLogger setDimensions(boolean useDefault, DimensionSet... dimension * @return the current logger */ public MetricsLogger resetDimensions(boolean useDefault) { - context.resetDimensions(useDefault); - return this; + return applyReadLock( + () -> { + context.resetDimensions(useDefault); + return this; + }); } /** @@ -161,8 +190,11 @@ public MetricsLogger resetDimensions(boolean useDefault) { * @return the current logger */ public MetricsLogger putMetric(String key, double value, Unit unit) { - this.context.putMetric(key, value, unit); - return this; + return applyReadLock( + () -> { + this.context.putMetric(key, value, unit); + return this; + }); } /** @@ -175,7 +207,7 @@ public MetricsLogger putMetric(String key, double value, Unit unit) { * @return the current logger */ public MetricsLogger putMetric(String key, double value) { - this.context.putMetric(key, value, Unit.NONE); + this.putMetric(key, value, Unit.NONE); return this; } @@ -190,8 +222,11 @@ public MetricsLogger putMetric(String key, double value) { * @return the current logger */ public MetricsLogger putMetadata(String key, Object value) { - this.context.putMetadata(key, value); - return this; + return applyReadLock( + () -> { + this.context.putMetadata(key, value); + return this; + }); } /** @@ -227,4 +262,13 @@ private void configureContextForEnvironment(MetricsContext context, Environment context.setDefaultDimensions(defaultDimension); environment.configureContext(context); } + + private MetricsLogger applyReadLock(Supplier any) { + rwl.readLock().lock(); + try { + return any.get(); + } finally { + rwl.readLock().unlock(); + } + } } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java index 638850e5..3ebe48ae 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/Metadata.java @@ -23,9 +23,9 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.time.Instant; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; @@ -56,7 +56,7 @@ class Metadata { Metadata() { cloudWatchMetrics = new ArrayList<>(); timestamp = Instant.now(); - customFields = new HashMap<>(); + customFields = new ConcurrentHashMap<>(); } /** diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java index ef49d8d9..e2d8e6d1 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricDirective.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import lombok.*; @@ -45,8 +46,8 @@ class MetricDirective { MetricDirective() { namespace = "aws-embedded-metrics"; - metrics = new HashMap<>(); - dimensions = new ArrayList<>(); + metrics = new ConcurrentHashMap<>(); + dimensions = Collections.synchronizedList(new ArrayList<>()); defaultDimensions = new DimensionSet(); shouldUseDefaultDimension = true; } @@ -59,7 +60,7 @@ class MetricDirective { void putDimensionSet(DimensionSet dimensionSet) { // Duplicate dimensions sets are removed before being added to the end of the collection. // This ensures only latest dimension value is used as a target member on the root EMF node. - // This operation is O(n^2), but acceptable given sets are capped at 10 dimensions + // This operation is O(n^2), but acceptable given sets are capped at 30 dimensions dimensions.removeIf(dim -> dim.getDimensionKeys().equals(dimensionSet.getDimensionKeys())); dimensions.add(dimensionSet); } @@ -69,11 +70,15 @@ void putMetric(String key, double value) { } void putMetric(String key, double value, Unit unit) { - if (metrics.containsKey(key)) { - metrics.get(key).addValue(value); - } else { - metrics.put(key, new MetricDefinition(key, unit, value)); - } + metrics.compute( + key, + (k, v) -> { + if (v == null) return new MetricDefinition(key, unit, value); + else { + v.addValue(value); + return v; + } + }); } @JsonProperty("Metrics") @@ -95,7 +100,7 @@ List> getAllDimensionKeys() { */ void setDimensions(List dimensionSets) { shouldUseDefaultDimension = false; - dimensions = new ArrayList<>(dimensionSets); + dimensions = Collections.synchronizedList(new ArrayList<>(dimensionSets)); } /** @@ -106,7 +111,7 @@ void setDimensions(List dimensionSets) { */ void setDimensions(boolean useDefault, List dimensionSets) { shouldUseDefaultDimension = useDefault; - dimensions = new ArrayList<>(dimensionSets); + dimensions = Collections.synchronizedList(new ArrayList<>(dimensionSets)); } /** @@ -116,7 +121,7 @@ void setDimensions(boolean useDefault, List dimensionSets) { */ void resetDimensions(boolean useDefault) { shouldUseDefaultDimension = useDefault; - dimensions = new ArrayList<>(); + dimensions = Collections.synchronizedList(new ArrayList<>()); } /** diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java index 50d74e70..fb790322 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/RootNode.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.With; @@ -45,7 +46,7 @@ class RootNode { RootNode() { aws = new Metadata(); - properties = new HashMap<>(); + properties = new ConcurrentHashMap<>(); objectMapper.setFilterProvider(filterProvider); } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java new file mode 100644 index 00000000..f5afc128 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/logger/MetricsLoggerThreadSafetyTest.java @@ -0,0 +1,472 @@ +package software.amazon.cloudwatchlogs.emf.logger; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import software.amazon.cloudwatchlogs.emf.environment.Environment; +import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider; +import software.amazon.cloudwatchlogs.emf.model.DimensionSet; +import software.amazon.cloudwatchlogs.emf.model.MetricsContext; +import software.amazon.cloudwatchlogs.emf.model.Unit; +import software.amazon.cloudwatchlogs.emf.serializers.UnitDeserializer; +import software.amazon.cloudwatchlogs.emf.serializers.UnitSerializer; +import software.amazon.cloudwatchlogs.emf.sinks.GroupedSinkShunt; +import software.amazon.cloudwatchlogs.emf.sinks.SinkShunt; + +public class MetricsLoggerThreadSafetyTest { + private volatile MetricsLogger logger; + private EnvironmentProvider envProvider; + private SinkShunt sink; + private Environment environment; + private volatile Throwable throwable = null; + + @Before + public void setUp() { + envProvider = mock(EnvironmentProvider.class); + environment = mock(Environment.class); + sink = new SinkShunt(); + + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(sink); + logger = new MetricsLogger(envProvider); + } + + @Test + public void testConcurrentPutProperty() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_PROPERTY = 1000; + + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_PROPERTY; j++) { + int propertyId = N_PUT_PROPERTY * id + j; + logger.putProperty( + "Property-" + propertyId, + String.valueOf(propertyId)); + } + } catch (Throwable e) { + throwable = e; // ensure no exceptions are thrown + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + for (int i = 0; i < N_THREAD * N_PUT_PROPERTY; i++) { + Assert.assertEquals(sink.getContext().getProperty("Property-" + i), String.valueOf(i)); + } + } + + @Test + public void testConcurrentPutDimension() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_DIMENSIONS = 100; + + logger = new MetricsLogger(envProvider); + // disable default dimensions + logger.resetDimensions(false); + + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_DIMENSIONS; j++) { + int dimensionId = N_PUT_DIMENSIONS * id + j; + logger.putDimensions( + DimensionSet.of( + String.valueOf(dimensionId), + String.valueOf(dimensionId))); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + + List dimensions = sink.getContext().getDimensions(); + // check size + Assert.assertEquals(sink.getContext().getDimensions().size(), N_THREAD * N_PUT_DIMENSIONS); + for (DimensionSet dim : dimensions) { + Assert.assertEquals( + dim.getDimensionKeys().size(), 1); // default dimensions are disabled + } + // check content + Collections.sort( + dimensions, + Comparator.comparingInt( + dim -> Integer.parseInt(dim.getDimensionKeys().iterator().next()))); + for (int i = 0; i < N_THREAD * N_PUT_DIMENSIONS; i++) { + Assert.assertEquals( + dimensions.get(i).getDimensionValue(String.valueOf(i)), String.valueOf(i)); + } + } + + @Test + public void testConcurrentPutDimensionAfterSetDimension() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_DIMENSIONS = 100; + + logger = new MetricsLogger(envProvider); + logger.setDimensions(DimensionSet.of("0", "0")); + long targetTimestampToRun = System.currentTimeMillis() + 500; + + Thread[] threads = new Thread[N_THREAD]; + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_DIMENSIONS; j++) { + int dimensionId = N_PUT_DIMENSIONS * id + j + 1; + logger.putDimensions( + DimensionSet.of( + String.valueOf(dimensionId), + String.valueOf(dimensionId))); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + logger.flush(); + + List dimensions = sink.getContext().getDimensions(); + // check size + Assert.assertEquals( + sink.getContext().getDimensions().size(), N_THREAD * N_PUT_DIMENSIONS + 1); + for (DimensionSet dim : dimensions) { + Assert.assertEquals( + dim.getDimensionKeys().size(), 1); // there are no default dimensions after set + } + // check content + Collections.sort( + dimensions, + Comparator.comparingInt( + dim -> Integer.parseInt(dim.getDimensionKeys().iterator().next()))); + for (int i = 0; i < N_THREAD * N_PUT_DIMENSIONS + 1; i++) { + Assert.assertEquals( + dimensions.get(i).getDimensionValue(String.valueOf(i)), String.valueOf(i)); + } + } + + @Test + public void testConcurrentFlush() throws InterruptedException, JsonProcessingException { + final int N_THREAD = 300; + + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 1000; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + // try to putMetric() and flush() at the same time + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + logger.putMetric("Metric-" + id, id); + logger.flush(); + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + ArrayList allMetrics = new ArrayList<>(); + for (List events : groupedSink.getLogEventList()) { + ArrayList metrics = parseAllMetrics(events); + allMetrics.addAll(metrics); + } + + assertEquals(allMetrics.size(), N_THREAD); + for (MetricDefinitionCopy metric : allMetrics) { + assertEquals(metric.getValues().size(), 1); + } + Collections.sort(allMetrics, Comparator.comparingDouble(m -> m.getValues().get(0))); + for (int i = 0; i < N_THREAD; i++) { + assertEquals(allMetrics.get(i).getName(), "Metric-" + i); + assertEquals(allMetrics.get(i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentFlushAndPutMetric() + throws InterruptedException, JsonProcessingException { + final int N_THREAD = 500; + final int N_PUT_METRIC = 1000; + + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + Random rand = new Random(); + + Thread[] threads = new Thread[N_THREAD]; + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + int randTime = rand.nextInt(1000); + threads[i] = + new Thread( + () -> { + try { + // half threads do putMetric(), half do flush() + // sleep to introduce more chaos in thread ordering + Thread.sleep(randTime); + if (id % 2 == 0) { + for (int j = id * N_PUT_METRIC / 2; + j < id * N_PUT_METRIC / 2 + N_PUT_METRIC; + j++) { + logger.putMetric("Metric-" + j, j); + } + } else { + logger.flush(); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + logger.flush(); + + ArrayList allMetrics = new ArrayList<>(); + for (List events : groupedSink.getLogEventList()) { + ArrayList metrics = parseAllMetrics(events); + allMetrics.addAll(metrics); + } + + assertEquals(allMetrics.size(), N_THREAD * N_PUT_METRIC / 2); + for (MetricDefinitionCopy metric : allMetrics) { + assertEquals(metric.getValues().size(), 1); + } + Collections.sort(allMetrics, Comparator.comparingDouble(m -> m.getValues().get(0))); + for (int i = 0; i < N_THREAD * N_PUT_METRIC / 2; i++) { + assertEquals(allMetrics.get(i).getName(), "Metric-" + i); + assertEquals(allMetrics.get(i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentFlushAndMethodsOtherThanPutMetric() throws InterruptedException { + final int N_THREAD = 600; + final int N_PUT_DIMENSIONS = 100; + final int N_PUT_PROPERTY = 100; + + GroupedSinkShunt groupedSink = new GroupedSinkShunt(); + when(envProvider.resolveEnvironment()) + .thenReturn(CompletableFuture.completedFuture(environment)); + when(environment.getSink()).thenReturn(groupedSink); + + logger = new MetricsLogger(envProvider); + logger.resetDimensions(false); + Random rand = new Random(); + + Thread[] threads = new Thread[N_THREAD]; + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + int randTime = rand.nextInt(1000); + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(randTime); + if (id < N_THREAD / 3) { + for (int j = id * N_PUT_DIMENSIONS; + j < id * N_PUT_DIMENSIONS + N_PUT_DIMENSIONS; + j++) { + logger.putDimensions( + DimensionSet.of( + String.valueOf(j), String.valueOf(j))); + } + } else if (id < N_THREAD / 3 * 2) { + for (int k = id * N_PUT_PROPERTY; + k < id * N_PUT_PROPERTY + N_PUT_PROPERTY; + k++) { + logger.putProperty("Property-" + k, k); + } + } else { + logger.flush(); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + logger.flush(); + + int contextNum = groupedSink.getContexts().size(); + MetricsContext finalContext = groupedSink.getContexts().get(contextNum - 1); + List dimensions = finalContext.getDimensions(); + + // check dimension size + assertEquals(dimensions.size(), N_THREAD * N_PUT_DIMENSIONS / 3); + for (DimensionSet dim : dimensions) { + Assert.assertEquals(dim.getDimensionKeys().size(), 1); // there are 3 default dimensions + } + // check dimension content + Collections.sort( + dimensions, + Comparator.comparingInt( + dim -> Integer.parseInt(dim.getDimensionKeys().iterator().next()))); + for (int i = 0; i < N_THREAD * N_PUT_DIMENSIONS / 3; i++) { + Assert.assertEquals( + dimensions.get(i).getDimensionValue(String.valueOf(i)), String.valueOf(i)); + } + + // check property + int propertyCnt = 0; + for (MetricsContext mc : groupedSink.getContexts()) { + for (int i = N_THREAD * N_PUT_PROPERTY / 3; + i < N_THREAD * N_PUT_PROPERTY / 3 * 2; + i++) { + propertyCnt += mc.getProperty("Property-" + i) == null ? 0 : 1; + } + } + assertEquals(propertyCnt, N_THREAD * N_PUT_PROPERTY / 3); + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + throwable = null; // reset throwable to prevent repeat throwing + } + + private Map parseRootNode(String event) throws JsonProcessingException { + return new JsonMapper().readValue(event, new TypeReference>() {}); + } + + @SuppressWarnings("unchecked") + // can parse all metrics even if metric number exceeds MAX_METRICS_PER_EVENT + private ArrayList parseAllMetrics(List events) + throws JsonProcessingException { + ArrayList metricDefinitions = new ArrayList<>(); + for (String event : events) { + Map rootNode = parseRootNode(event); + Map metadata = (Map) rootNode.get("_aws"); + + if (metadata == null) { + continue; + } + + ArrayList> metricDirectives = + (ArrayList>) metadata.get("CloudWatchMetrics"); + ArrayList> metrics = + (ArrayList>) metricDirectives.get(0).get("Metrics"); + + for (Map metric : metrics) { + String name = metric.get("Name"); + Unit unit = Unit.fromValue(metric.get("Unit")); + Object value = rootNode.get(name); + if (value instanceof ArrayList) { + metricDefinitions.add(new MetricDefinitionCopy(name, unit, (ArrayList) value)); + } else { + metricDefinitions.add(new MetricDefinitionCopy(name, unit, (double) value)); + } + } + } + + return metricDefinitions; + } + + @AllArgsConstructor + private static class MetricDefinitionCopy { + @NonNull + @Getter + @JsonProperty("Name") + private String name; + + @Getter + @JsonProperty("Unit") + @JsonSerialize(using = UnitSerializer.class) + @JsonDeserialize(using = UnitDeserializer.class) + private Unit unit; + + @JsonIgnore @NonNull @Getter private List values; + + MetricDefinitionCopy(String name) { + this(name, Unit.NONE, new ArrayList<>()); + } + + MetricDefinitionCopy(String name, double value) { + this(name, Unit.NONE, value); + } + + MetricDefinitionCopy(String name, Unit unit, double value) { + this(name, unit, new ArrayList<>(Arrays.asList(value))); + } + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java new file mode 100644 index 00000000..7eb1f873 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricDirectiveThreadSafetyTest.java @@ -0,0 +1,101 @@ +package software.amazon.cloudwatchlogs.emf.model; + +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import org.junit.After; +import org.junit.Test; + +public class MetricDirectiveThreadSafetyTest { + private volatile Throwable throwable = null; + + @Test + public void testConcurrentPutMetricWithDifferentKey() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_METRIC = 1000; + + MetricDirective metricDirective = new MetricDirective(); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = + System.currentTimeMillis() + + 500; // all threads should target running on this timestamp + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep( + targetTimestampToRun + - System.currentTimeMillis()); // try to make + // all threads + // run at same + // time + for (int j = 0; j < N_PUT_METRIC; j++) { + int metricId = N_PUT_METRIC * id + j; + metricDirective.putMetric("Metric-" + metricId, metricId); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertEquals(metricDirective.getAllMetrics().size(), N_THREAD * N_PUT_METRIC); + for (int i = 0; i < N_THREAD * N_PUT_METRIC; i++) { + assertEquals( + metricDirective.getMetrics().get("Metric-" + i).getValues().get(0), i, 1e-5); + } + } + + @Test + public void testConcurrentPutMetricWithSameKey() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_METRIC = 1000; + + MetricDirective metricDirective = new MetricDirective(); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_METRIC; j++) { + int metricId = N_PUT_METRIC * id + j; + metricDirective.putMetric("Metric", metricId); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertEquals(metricDirective.getAllMetrics().size(), 1); + MetricDefinition md = metricDirective.getAllMetrics().toArray(new MetricDefinition[0])[0]; + Collections.sort(md.getValues()); + for (int i = 0; i < N_THREAD * N_PUT_METRIC; i++) { + assertEquals(md.getValues().get(i), i, 1e-5); + } + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + throwable = null; + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java index 2b832c02..b1913c9a 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java @@ -167,6 +167,16 @@ public void testSetTimestamp() throws JsonProcessingException { assertEquals(now.toEpochMilli(), metadata.get("Timestamp")); } + @Test + public void testPutMetadata() { + MetricsContext mc = new MetricsContext(); + mc.putMetadata("Metadata", "MetadataValue"); + + Map customFields = mc.getRootNode().getAws().getCustomMetadata(); + assertEquals(customFields.size(), 1); + assertEquals(customFields.get("Metadata"), "MetadataValue"); + } + @SuppressWarnings("unchecked") private ArrayList parseMetrics(String event) throws JsonProcessingException { Map rootNode = parseRootNode(event); diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java new file mode 100644 index 00000000..6ea33883 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextThreadSafetyTest.java @@ -0,0 +1,55 @@ +package software.amazon.cloudwatchlogs.emf.model; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import org.junit.After; +import org.junit.Test; + +public class MetricsContextThreadSafetyTest { + private volatile Throwable throwable = null; + + @Test + public void testConcurrentPutMetaData() throws InterruptedException { + final int N_THREAD = 100; + final int N_PUT_METADATA = 1000; + + MetricsContext mc = new MetricsContext(); + Thread[] threads = new Thread[N_THREAD]; + long targetTimestampToRun = System.currentTimeMillis() + 500; + + for (int i = 0; i < N_THREAD; i++) { + final int id = i; + threads[i] = + new Thread( + () -> { + try { + Thread.sleep(targetTimestampToRun - System.currentTimeMillis()); + for (int j = 0; j < N_PUT_METADATA; j++) { + int metaDataId = N_PUT_METADATA * id + j; + mc.putMetadata("MetaData-" + metaDataId, metaDataId); + } + } catch (Throwable e) { + throwable = e; + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + Map metaData = mc.getRootNode().getAws().getCustomMetadata(); + assertEquals(metaData.size(), N_THREAD * N_PUT_METADATA); + for (int i = 0; i < N_THREAD * N_PUT_METADATA; i++) { + assertEquals(metaData.get("MetaData-" + i), i); + } + } + + @After + public void tearDown() throws Throwable { + if (throwable != null) throw throwable; + throwable = null; + } +} diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java new file mode 100644 index 00000000..d39776e3 --- /dev/null +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/sinks/GroupedSinkShunt.java @@ -0,0 +1,57 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.cloudwatchlogs.emf.sinks; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import software.amazon.cloudwatchlogs.emf.model.MetricsContext; + +/** + * A mocked sink which can preserve all flushed log events. Useful for testing the result of + * concurrent flushing. + */ +public class GroupedSinkShunt implements ISink { + + private List contexts = new ArrayList<>(); + + private List> logEventList = new ArrayList<>(); + + @Override + public void accept(MetricsContext context) { + this.contexts.add(context); + try { + List logEvent = context.serialize(); + logEventList.add(logEvent); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public CompletableFuture shutdown() { + return CompletableFuture.completedFuture(null); + } + + public List getContexts() { + return contexts; + } + + public List> getLogEventList() { + return this.logEventList; + } +}