diff --git a/pom.xml b/pom.xml
index b2ca795..5a8823f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,13 @@
true
+
+ com.amazonaws
+ aws-java-sdk-logs
+ ${aws.version}
+ true
+
+
junit
junit
diff --git a/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java b/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java
new file mode 100644
index 0000000..7f2c48d
--- /dev/null
+++ b/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java
@@ -0,0 +1,51 @@
+package ch.qos.logback.more.appenders;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.PropertiesCredentials;
+import ch.qos.logback.core.UnsynchronizedAppenderBase;
+
+public abstract class AwsAppender extends UnsynchronizedAppenderBase {
+
+ protected AwsConfig config;
+ protected AWSCredentials credentials;
+
+ @Override
+ public void start() {
+ try {
+ super.start();
+ if (config.getCredentialFilePath() != null
+ && config.getCredentialFilePath().length() > 0) {
+ this.credentials = new PropertiesCredentials(getClass().getClassLoader()
+ .getResourceAsStream(config.getCredentialFilePath()));
+ } else {
+ this.credentials =
+ DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
+ }
+ } catch (Exception e) {
+ addWarn("Could not initialize " + AwsAppender.class.getCanonicalName()
+ + " ( will try to initialize again later ): " + e);
+ }
+ }
+
+ public static class AwsConfig {
+ private String credentialFilePath;
+ private String region;
+
+ public void setCredentialFilePath(String credentialFilePath) {
+ this.credentialFilePath = credentialFilePath;
+ }
+
+ public String getCredentialFilePath() {
+ return credentialFilePath;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+ }
+}
diff --git a/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java b/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java
new file mode 100644
index 0000000..eaadf8a
--- /dev/null
+++ b/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java
@@ -0,0 +1,215 @@
+package ch.qos.logback.more.appenders;
+
+import java.util.List;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.services.logs.AWSLogs;
+import com.amazonaws.services.logs.AWSLogsClientBuilder;
+import com.amazonaws.services.logs.model.CreateLogGroupRequest;
+import com.amazonaws.services.logs.model.CreateLogStreamRequest;
+import com.amazonaws.services.logs.model.DescribeLogGroupsRequest;
+import com.amazonaws.services.logs.model.DescribeLogGroupsResult;
+import com.amazonaws.services.logs.model.DescribeLogStreamsRequest;
+import com.amazonaws.services.logs.model.DescribeLogStreamsResult;
+import com.amazonaws.services.logs.model.InputLogEvent;
+import com.amazonaws.services.logs.model.LogGroup;
+import com.amazonaws.services.logs.model.LogStream;
+import com.amazonaws.services.logs.model.PutLogEventsRequest;
+import com.amazonaws.services.logs.model.PutLogEventsResult;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.encoder.EchoEncoder;
+import ch.qos.logback.core.encoder.Encoder;
+import ch.qos.logback.more.appenders.IntervalEmitter.EventMapper;
+import ch.qos.logback.more.appenders.IntervalEmitter.IntervalAppender;
+
+public class CloudWatchLogbackAppender extends AwsAppender {
+
+ private IntervalEmitter emitter;
+ private AWSLogs awsLogs;
+ private String logGroupName;
+ private StreamName logStreamName;
+ private boolean createLogDestination;
+ private long emitInterval = 10000;
+ private Encoder encoder = new EchoEncoder();
+
+ public void setAwsConfig(AwsConfig config) {
+ this.config = config;
+ }
+
+ public void setLogGroupName(String logGroupName) {
+ this.logGroupName = logGroupName;
+ }
+
+ public void setLogStreamName(String logStreamName) {
+ this.logStreamName = new StaticStreamName(logStreamName);
+ }
+
+ public void setLogStreamRolling(StreamName streamName) {
+ this.logStreamName = streamName;
+ }
+
+ public void setCreateLogDestination(boolean createLogDestination) {
+ this.createLogDestination = createLogDestination;
+ }
+
+ public void setEmitInterval(long emitInterval) {
+ this.emitInterval = emitInterval;
+ }
+
+ public void setEncoder(Encoder encoder) {
+ this.encoder = encoder;
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ if (logGroupName == null || logGroupName.length() == 0 || logStreamName == null) {
+ throw new IllegalArgumentException("logGroupName and logStreamName must be defined.");
+ }
+ this.awsLogs = AWSLogsClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(credentials))
+ .withRegion(config.getRegion()).build();
+ this.emitter = new IntervalEmitter(emitInterval,
+ new CloudWatchEventMapper(), new CloudWatchIntervalAppender());
+ }
+
+ @Override
+ public void stop() {
+ try {
+ emitter.emit();
+ } catch (Exception e) {
+ // Ignore
+ }
+ try {
+ super.stop();
+ } finally {
+ try {
+ awsLogs.shutdown();
+ } catch (Exception e) {
+ // pass
+ }
+ }
+ }
+
+ @Override
+ protected void append(E eventObject) {
+ emitter.append(eventObject);
+ }
+
+ private void ensureLogGroup() {
+ DescribeLogGroupsRequest request =
+ new DescribeLogGroupsRequest().withLogGroupNamePrefix(logGroupName);
+ DescribeLogGroupsResult result = awsLogs.describeLogGroups(request);
+ for (LogGroup group : result.getLogGroups()) {
+ if (logGroupName.equals(group.getLogGroupName())) {
+ return;
+ }
+ }
+ if (createLogDestination) {
+ awsLogs.createLogGroup(new CreateLogGroupRequest(logGroupName));
+ } else {
+ throw new IllegalStateException(
+ "The specified log group does not exist: " + logGroupName);
+ }
+ }
+
+ private String ensureLogStream(String name) {
+ DescribeLogStreamsRequest request = new DescribeLogStreamsRequest()
+ .withLogGroupName(logGroupName).withLogStreamNamePrefix(name);
+ DescribeLogStreamsResult result = awsLogs.describeLogStreams(request);
+ for (LogStream stream : result.getLogStreams()) {
+ if (name.equals(stream.getLogStreamName())) {
+ return stream.getUploadSequenceToken();
+ }
+ }
+ if (createLogDestination) {
+ awsLogs.createLogStream(new CreateLogStreamRequest(logGroupName, name));
+ return null;
+ } else {
+ throw new IllegalStateException(
+ "The specified log stream does not exist: " + logStreamName);
+ }
+ }
+
+ private final class CloudWatchEventMapper implements EventMapper {
+
+ @Override
+ public InputLogEvent map(E event) {
+ InputLogEvent logEvent = new InputLogEvent();
+ if (event instanceof ILoggingEvent) {
+ ILoggingEvent loggingEvent = (ILoggingEvent) event;
+ logEvent.setTimestamp(loggingEvent.getTimeStamp());
+ } else {
+ logEvent.setTimestamp(System.currentTimeMillis());
+ }
+ logEvent.setMessage(new String(encoder.encode(event)));
+ return logEvent;
+ }
+ }
+
+ private final class CloudWatchIntervalAppender implements IntervalAppender {
+ private String sequenceToken;
+ private boolean initialized = false;
+ private boolean switchingStream = false;
+ private String currentStreamName;
+
+ @Override
+ public boolean append(List events) {
+ if (!initialized) {
+ synchronized (this) {
+ if (!initialized) {
+ ensureLogGroup();
+ initialized = true;
+ }
+ }
+ return false;
+ }
+ if (switchingStream) {
+ return false;
+ }
+ String streamName = logStreamName.get(events);
+ if (!streamName.equals(currentStreamName)) {
+ switchingStream = true;
+ synchronized (this) {
+ if (switchingStream) {
+ sequenceToken = ensureLogStream(streamName);
+ currentStreamName = streamName;
+ switchingStream = false;
+ } else {
+ return false;
+ }
+ }
+ }
+ try {
+ PutLogEventsRequest request =
+ new PutLogEventsRequest(logGroupName, streamName, events);
+ if (sequenceToken != null) {
+ request.withSequenceToken(sequenceToken);
+ }
+ PutLogEventsResult result = awsLogs.putLogEvents(request);
+ sequenceToken = result.getNextSequenceToken();
+ return true;
+ } catch (RuntimeException e) {
+ sequenceToken = null;
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+ protected interface StreamName {
+ String get(List events);
+ }
+
+ public static class StaticStreamName implements StreamName {
+ private String name;
+
+ public StaticStreamName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String get(List events) {
+ return name;
+ }
+ }
+}
diff --git a/src/main/java/ch/qos/logback/more/appenders/CountBasedStreamName.java b/src/main/java/ch/qos/logback/more/appenders/CountBasedStreamName.java
new file mode 100644
index 0000000..0ca72b4
--- /dev/null
+++ b/src/main/java/ch/qos/logback/more/appenders/CountBasedStreamName.java
@@ -0,0 +1,36 @@
+package ch.qos.logback.more.appenders;
+
+import java.util.List;
+import java.util.UUID;
+import com.amazonaws.services.logs.model.InputLogEvent;
+import ch.qos.logback.more.appenders.CloudWatchLogbackAppender.StreamName;
+
+public class CountBasedStreamName implements StreamName {
+ private long count = 0;
+ private long limit = 1000;
+ private String baseName = "";
+ private String currentName;
+
+ public void setBaseName(String baseName) {
+ this.baseName = baseName;
+ }
+
+ public void setLimit(long limit) {
+ this.limit = limit;
+ this.count = limit + 1;
+ }
+
+ @Override
+ public String get(List events) {
+ count += events.size();
+ if (count > limit) {
+ synchronized (this) {
+ if (count > limit) {
+ currentName = baseName + UUID.randomUUID();
+ count = events.size();
+ }
+ }
+ }
+ return currentName;
+ }
+}
diff --git a/src/main/java/ch/qos/logback/more/appenders/DaemonAppender.java b/src/main/java/ch/qos/logback/more/appenders/DaemonAppender.java
deleted file mode 100644
index 310e794..0000000
--- a/src/main/java/ch/qos/logback/more/appenders/DaemonAppender.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright (c) 2012 sndyuk
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ch.qos.logback.more.appenders;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Use UnsynchronizedAppenderBase instead.
- */
-@Deprecated
-public abstract class DaemonAppender implements Runnable {
- private static ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
-
- private static final Logger LOG = LoggerFactory.getLogger(DaemonAppender.class);
-
- private AtomicBoolean start = new AtomicBoolean(false);
- private final BlockingQueue queue;
-
- DaemonAppender(int maxQueueSize) {
- this.queue = new LinkedBlockingQueue(maxQueueSize);
- }
-
- protected void execute() {
- if (THREAD_POOL.isShutdown()) {
- THREAD_POOL = Executors.newCachedThreadPool();
- }
- THREAD_POOL.execute(this);
- }
-
- void log(E eventObject) {
- if (!queue.offer(eventObject)) {
- LOG.warn("Message queue is full. Ignored the message:" + System.lineSeparator() + eventObject.toString());
- } else if (start.compareAndSet(false, true)) {
- execute();
- }
- }
-
- @Override
- public void run() {
-
- try {
- for (;;) {
- append(queue.take());
- }
- } catch (InterruptedException e) {
- // ignore the error and rerun.
- run();
- } catch (Exception e) {
- close();
- }
- }
-
- abstract protected void append(E rawData);
-
- protected void close() {
- synchronized (THREAD_POOL) {
- if (!THREAD_POOL.isShutdown()) {
- shutdownAndAwaitTermination(THREAD_POOL);
- }
- }
- }
-
- private static void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown();
- try {
- if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
- pool.shutdownNow();
- if (!pool.awaitTermination(60, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- pool.shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
-}
diff --git a/src/main/java/ch/qos/logback/more/appenders/DynamoDBLogbackAppender.java b/src/main/java/ch/qos/logback/more/appenders/DynamoDBLogbackAppender.java
index 860e199..e7dd802 100644
--- a/src/main/java/ch/qos/logback/more/appenders/DynamoDBLogbackAppender.java
+++ b/src/main/java/ch/qos/logback/more/appenders/DynamoDBLogbackAppender.java
@@ -13,59 +13,40 @@
*/
package ch.qos.logback.more.appenders;
-import static ch.qos.logback.core.CoreConstants.CODES_URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.PropertiesCredentials;
-import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.QueryRequest;
import com.amazonaws.services.dynamodbv2.model.QueryResult;
-import ch.qos.logback.core.Layout;
-import ch.qos.logback.core.UnsynchronizedAppenderBase;
import ch.qos.logback.core.encoder.EchoEncoder;
import ch.qos.logback.core.encoder.Encoder;
-import ch.qos.logback.core.encoder.LayoutWrappingEncoder;
-public class DynamoDBLogbackAppender extends UnsynchronizedAppenderBase {
+public class DynamoDBLogbackAppender extends AwsAppender {
private Encoder encoder = new EchoEncoder();
- @Deprecated
- public void setLayout(Layout layout) {
- addWarn("This appender no longer admits a layout as a sub-component, set an encoder instead.");
- addWarn("To ensure compatibility, wrapping your layout in LayoutWrappingEncoder.");
- addWarn("See also " + CODES_URL + "#layoutInsteadOfEncoder for details");
- LayoutWrappingEncoder lwe = new LayoutWrappingEncoder();
- lwe.setLayout(layout);
- lwe.setContext(context);
- this.encoder = lwe;
- }
-
public void setEncoder(Encoder encoder) {
this.encoder = encoder;
}
+ public void setAwsConfig(AwsConfig config) {
+ this.config = config;
+ }
+
@Override
public void start() {
try {
super.start();
- PropertiesCredentials credentials = new PropertiesCredentials(
- getClass().getClassLoader().getResourceAsStream(dynamodbCredentialFilePath));
this.dynamoClient = AmazonDynamoDBClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
- .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
- dynamodbEndpoint, dynamodbRegion))
- .build();
- this.id = getLastId(outputTableName, instanceName, dynamoClient);
+ .withRegion(config.getRegion()).build();
} catch (Exception e) {
- addWarn("Could not initialize " + DynamoDBLogbackAppender.class.getCanonicalName()
- + " ( will try to initialize again later ): " + e);
+ addError("Could not initialize " + DynamoDBLogbackAppender.class.getCanonicalName(), e);
}
}
@@ -87,8 +68,8 @@ protected void append(E event) {
if (this.id == -1) {
this.id = getLastId(outputTableName, instanceName, dynamoClient);
if (this.id == -1) {
- addWarn("Could not initialize " + DynamoDBLogbackAppender.class.getCanonicalName()
- + " ( will try to initialize again later ): ");
+ addError(
+ "Could not initialize " + DynamoDBLogbackAppender.class.getCanonicalName());
return;
}
}
@@ -97,7 +78,8 @@ protected void append(E event) {
data.put("id", new AttributeValue().withN(String.valueOf(++id)));
data.put("msg", new AttributeValue().withS(new String(encoder.encode(event))));
- PutItemRequest itemRequest = new PutItemRequest().withTableName(outputTableName).withItem(data);
+ PutItemRequest itemRequest =
+ new PutItemRequest().withTableName(outputTableName).withItem(data);
dynamoClient.putItem(itemRequest);
}
@@ -117,26 +99,11 @@ private static long getLastId(String tableName, String instanceName,
}
}
- private String dynamodbCredentialFilePath;
- private String dynamodbEndpoint;
- private String dynamodbRegion;
private String outputTableName;
private String instanceName;
private long id = -1;
private AmazonDynamoDB dynamoClient;
- public void setDynamodbCredentialFilePath(String dynamodbCredentialFilePath) {
- this.dynamodbCredentialFilePath = dynamodbCredentialFilePath;
- }
-
- public void setDynamodbEndpoint(String dynamodbEndpoint) {
- this.dynamodbEndpoint = dynamodbEndpoint;
- }
-
- public void setDynamodbRegion(String dynamodbRegion) {
- this.dynamodbRegion = dynamodbRegion;
- }
-
public void setOutputTableName(String outputTableName) {
this.outputTableName = outputTableName;
}
diff --git a/src/main/java/ch/qos/logback/more/appenders/IntervalEmitter.java b/src/main/java/ch/qos/logback/more/appenders/IntervalEmitter.java
new file mode 100644
index 0000000..8391c7f
--- /dev/null
+++ b/src/main/java/ch/qos/logback/more/appenders/IntervalEmitter.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2012 sndyuk
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package ch.qos.logback.more.appenders;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class IntervalEmitter {
+ private long lastEmit = -1;
+ private long maxInterval;
+ private final List events;
+ private final EventMapper eventMapper;
+ private final IntervalAppender appender;
+
+ IntervalEmitter(long maxInterval, EventMapper eventMapper, IntervalAppender appender) {
+ this.events = new ArrayList();
+ this.maxInterval = maxInterval;
+ this.eventMapper = eventMapper;
+ this.appender = appender;
+ }
+
+ void append(E event) {
+ events.add(eventMapper.map(event));
+
+ long now = System.currentTimeMillis();
+ if (now > lastEmit + maxInterval) {
+ emit();
+ lastEmit = now;
+ }
+ }
+
+ public void emit() {
+ if (appender.append(events)) {
+ events.clear();
+ }
+ }
+
+ public interface EventMapper {
+ R map(E event);
+ }
+
+ public interface IntervalAppender {
+ boolean append(List events);
+ }
+}
diff --git a/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java b/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java
index 7ba3b53..92d3578 100644
--- a/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java
+++ b/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java
@@ -18,18 +18,35 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
+import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.more.appenders.marker.MapMarker;
public class LogbackAppenderTest {
private static final Logger LOG = LoggerFactory.getLogger(LogbackAppenderTest.class);
+ @Before
+ public void before() {
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ if (!lc.isStarted()) {
+ lc.start();
+ }
+ }
+
+ @After
+ public void after() {
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ lc.stop();
+ }
+
@Test
public void logSimple() throws InterruptedException {
@@ -100,7 +117,10 @@ public void logNestedMapMarker() throws InterruptedException {
MapMarker mapMarker = new MapMarker("MAP_MARKER", map);
notifyMarker.add(mapMarker);
- LOG.debug(notifyMarker, "Test the nested marker map.");
+ for (int i = 0; i < 100; i++) {
+ LOG.debug(notifyMarker, "Test the nested marker map." + i);
+ }
+
Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous...
}
diff --git a/src/test/resources/logback-appenders.xml b/src/test/resources/logback-appenders.xml
index be888c1..aba3160 100644
--- a/src/test/resources/logback-appenders.xml
+++ b/src/test/resources/logback-appenders.xml
@@ -2,14 +2,56 @@
+
+
+
+
+ AwsCredentials.properties
+ ap-northeast-1
+
+ test-log
+
+
+
+ 100000
+
+
+
+ true
+
+ 100
+
+
+
+
+
+
+
+
+ 999
+
+ true
+
+ 100
+
+
+
+
-
- AwsCredentials.properties
-
- dynamodb.ap-northeast-1.amazonaws.com
- ap-northeast-1
+
+
+ AwsCredentials.properties
+ ap-northeast-1
+
+
AppLog
- 10000
+ 1000
+
@@ -74,6 +117,7 @@
+
debug
@@ -138,6 +182,7 @@
+
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
index 4fbc5bc..7a3b4b7 100644
--- a/src/test/resources/logback.xml
+++ b/src/test/resources/logback.xml
@@ -5,6 +5,7 @@
+