From 892ceec163fd425227238e9cdabf31ab029fb510 Mon Sep 17 00:00:00 2001 From: sndyuk Date: Sat, 3 Oct 2020 23:34:03 +0900 Subject: [PATCH] Add options for Fluent / Fluency appenders: #50 * "encoder" option becomes optional. If you don't set encoder, the log message outputs pure message. * "messageFieldKeyName" option has been added: Set it if you want to change the default message field key name(= "message"). Fix bug: Messages were lost sometimes when the messages had been waiting for flushing on shutting down an application. Fix bug: Fluent appender: Do not raise an error when a fluentd server is down. --- .gitignore | 1 + README.md | 11 ++++ src/{main => java9}/java/module-info.java | 7 +-- .../logback/more/appenders/AwsAppender.java | 6 +- .../appenders/CloudWatchLogbackAppender.java | 2 +- .../more/appenders/DataFluentAppender.java | 23 ++++++- .../appenders/FluencyLogbackAppender.java | 9 ++- .../more/appenders/FluentdAppenderBase.java | 21 +++---- .../appenders/KinesisStreamAppenderBase.java | 2 +- .../KinesisStreamLogbackAppender.java | 2 +- .../more/appenders/LogbackAppenderTest.java | 62 +++++++------------ src/test/resources/logback-appenders-aws.xml | 3 +- .../resources/logback-appenders-fluentd.xml | 32 +++++++--- src/test/resources/logback-appenders-std.xml | 3 +- src/test/resources/logback.xml | 1 - 15 files changed, 107 insertions(+), 78 deletions(-) rename src/{main => java9}/java/module-info.java (71%) diff --git a/.gitignore b/.gitignore index 9901976..a1392e2 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ server.ec2* AwsCredentials.properties .directory +/fluent diff --git a/README.md b/README.md index 5c77825..27eb13e 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,14 @@ Just add it to your dependency then you can use the slf4j module in your applica ### Latest changes +##### Version 1.8.1 + +* Add options for Fluent / Fluency appenders: + * "encoder" option becomes optional. If you don't set encoder, the log message outputs pure message. + * "messageFieldKeyName" option has been added: Set it if you want to change the default message field key name(= "message"). +* Fix bug: Messages were lost sometimes when the messages had been waiting for flushing on shutting down an application. +* Fix bug: Fluent appender: Do not raise an error when a fluentd server is down. + ##### Version 1.8.0 * Upgrade Fluency version from 2.2.1 to 2.4.1 https://github.com/sndyuk/logback-more-appenders/pull/48 @@ -134,5 +142,8 @@ You can find the sample configuration files here: - [logback.xml](https://github.com/sndyuk/logback-more-appenders/blob/master/src/test/resources/logback.xml) +### + + ### License [Apache License, Version 2.0](LICENSE) diff --git a/src/main/java/module-info.java b/src/java9/java/module-info.java similarity index 71% rename from src/main/java/module-info.java rename to src/java9/java/module-info.java index d18ff6a..0e54327 100644 --- a/src/main/java/module-info.java +++ b/src/java9/java/module-info.java @@ -1,8 +1,7 @@ open module org.slf4j { - requires transitive slf4j.api; - requires transitive logback.core; - requires transitive logback.classic; - + requires static slf4j.api; + requires static logback.core; + requires static logback.classic; requires static logback.access; requires static fluent.logger; requires static fluency.core; diff --git a/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java b/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java index 415fdad..f851338 100644 --- a/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java +++ b/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java @@ -13,7 +13,7 @@ */ package ch.qos.logback.more.appenders; -import ch.qos.logback.core.UnsynchronizedAppenderBase; +import ch.qos.logback.core.AppenderBase; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; @@ -21,7 +21,7 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider; -public abstract class AwsAppender extends UnsynchronizedAppenderBase { +public abstract class AwsAppender extends AppenderBase { protected AwsConfig config; protected AWSCredentialsProvider credentialsProvider; @@ -30,7 +30,6 @@ public abstract class AwsAppender extends UnsynchronizedAppenderBase { @Override public void start() { try { - super.start(); if (config.getCredentialFilePath() != null && config.getCredentialFilePath().length() > 0) { this.credentials = new PropertiesCredentials(getClass().getClassLoader() @@ -40,6 +39,7 @@ public void start() { } else { this.credentialsProvider = DefaultAWSCredentialsProviderChain.getInstance(); } + super.start(); } catch (Exception e) { addWarn("Could not initialize " + AwsAppender.class.getCanonicalName() + " ( will try to initialize again later ): " + e); diff --git a/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java b/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java index 546991a..f8a4ee8 100644 --- a/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java +++ b/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java @@ -74,12 +74,12 @@ public void setEncoder(Encoder encoder) { @Override public void start() { - super.start(); if (logGroupName == null || logGroupName.length() == 0 || logStreamName == null) { throw new IllegalArgumentException("logGroupName and logStreamName must be defined."); } this.emitter = new IntervalEmitter(emitInterval, new CloudWatchEventMapper(), new CloudWatchIntervalAppender()); + super.start(); } @Override diff --git a/src/main/java/ch/qos/logback/more/appenders/DataFluentAppender.java b/src/main/java/ch/qos/logback/more/appenders/DataFluentAppender.java index c197cd6..6fa03b8 100644 --- a/src/main/java/ch/qos/logback/more/appenders/DataFluentAppender.java +++ b/src/main/java/ch/qos/logback/more/appenders/DataFluentAppender.java @@ -20,6 +20,7 @@ import ch.qos.logback.core.Layout; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.LayoutWrappingEncoder; +import org.fluentd.logger.sender.ExponentialDelayReconnector; public class DataFluentAppender extends FluentdAppenderBase { private FluentLogger fluentLogger; @@ -39,6 +40,8 @@ public void setEncoder(Encoder encoder) { this.encoder = encoder; } + public void setMessageFieldKeyName(String messageFieldKeyName) { this.messageFieldKeyName = messageFieldKeyName; } + public void addAdditionalField(Field field) { if (additionalFields == null) { additionalFields = new HashMap(); @@ -51,8 +54,8 @@ public void addAdditionalField(Field field) { @Override public void start() { + fluentLogger = FluentLogger.getLogger(label != null ? tag : null, remoteHost, port, getTimeout(), getBufferCapacity()); super.start(); - fluentLogger = FluentLogger.getLogger(label != null ? tag : null, remoteHost, port); } @@ -85,6 +88,8 @@ protected void append(E event) { private String remoteHost; private int port; private boolean useEventTime; + private Integer timeout; + private Integer bufferCapacity; public String getTag() { return tag; @@ -125,4 +130,20 @@ public boolean isUseEventTime() { public void setUseEventTime(boolean useEventTime) { this.useEventTime = useEventTime; } + + public void setTimeout(Integer timeout) { + this.timeout = timeout; + } + + public int getTimeout() { + return timeout != null ? timeout : 1000; + } + + public void setBufferCapacity(Integer bufferCapacity) { + this.bufferCapacity = bufferCapacity; + } + + public int getBufferCapacity() { + return bufferCapacity != null ? bufferCapacity : 16777216; + } } diff --git a/src/main/java/ch/qos/logback/more/appenders/FluencyLogbackAppender.java b/src/main/java/ch/qos/logback/more/appenders/FluencyLogbackAppender.java index c359f92..ac09abd 100644 --- a/src/main/java/ch/qos/logback/more/appenders/FluencyLogbackAppender.java +++ b/src/main/java/ch/qos/logback/more/appenders/FluencyLogbackAppender.java @@ -52,6 +52,8 @@ public void setEncoder(Encoder encoder) { this.encoder = encoder; } + public void setMessageFieldKeyName(String messageFieldKeyName) { this.messageFieldKeyName = messageFieldKeyName; } + public void addAdditionalField(Field field) { if (additionalFields == null) { additionalFields = new HashMap(); @@ -64,8 +66,6 @@ public void addAdditionalField(Field field) { @Override public void start() { - super.start(); - try { FluencyBuilderForFluentd builder = configureFluency(); if (remoteHost != null && port > 0 && (remoteServers == null || remoteServers.getRemoteServers().size() == 0)) { @@ -73,6 +73,7 @@ public void start() { } else { this.fluency = builder.build(configureServers()); } + super.start(); } catch (Exception e) { throw new RuntimeException(e); } @@ -81,7 +82,6 @@ public void start() { @Override protected void append(E event) { Map data = createData(event); - try { String tag = this.tag == null ? "" : this.tag; if (this.isUseEventTime()) { @@ -111,6 +111,9 @@ public void stop() { super.stop(); } finally { try { + fluency.flush(); + long maxWaitMillis = Math.min((waitUntilBufferFlushed != null ? waitUntilBufferFlushed : 1) + (waitUntilFlusherTerminated != null ? waitUntilFlusherTerminated : 1), 5) * 1000; + Thread.sleep(maxWaitMillis); fluency.close(); } catch (Exception e) { // pass diff --git a/src/main/java/ch/qos/logback/more/appenders/FluentdAppenderBase.java b/src/main/java/ch/qos/logback/more/appenders/FluentdAppenderBase.java index c450e75..1d0f8fa 100644 --- a/src/main/java/ch/qos/logback/more/appenders/FluentdAppenderBase.java +++ b/src/main/java/ch/qos/logback/more/appenders/FluentdAppenderBase.java @@ -13,20 +13,19 @@ */ package ch.qos.logback.more.appenders; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import org.slf4j.Marker; import ch.qos.logback.classic.pattern.CallerDataConverter; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ThrowableProxyUtil; -import ch.qos.logback.core.UnsynchronizedAppenderBase; -import ch.qos.logback.core.encoder.EchoEncoder; +import ch.qos.logback.core.AppenderBase; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.more.appenders.marker.MapMarker; +import org.slf4j.Marker; -abstract class FluentdAppenderBase extends UnsynchronizedAppenderBase { +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +abstract class FluentdAppenderBase extends AppenderBase { private static final String DATA_MESSAGE = "message"; private static final String DATA_LOGGER = "logger"; private static final String DATA_THREAD = "thread"; @@ -35,16 +34,16 @@ abstract class FluentdAppenderBase extends UnsynchronizedAppenderBase { private static final String DATA_CALLER = "caller"; private static final String DATA_THROWABLE = "throwable"; - protected Encoder encoder = new EchoEncoder(); + protected Encoder encoder; protected Map additionalFields; protected boolean flattenMapMarker; + protected String messageFieldKeyName = DATA_MESSAGE; protected Map createData(E event) { Map data = new HashMap(); - if (event instanceof ILoggingEvent) { ILoggingEvent loggingEvent = (ILoggingEvent) event; - data.put(DATA_MESSAGE, loggingEvent.getFormattedMessage()); + data.put(messageFieldKeyName, encoder != null ? encoder.encode(event) : loggingEvent.getFormattedMessage()); data.put(DATA_LOGGER, loggingEvent.getLoggerName()); data.put(DATA_THREAD, loggingEvent.getThreadName()); data.put(DATA_LEVEL, loggingEvent.getLevel().levelStr); @@ -77,7 +76,7 @@ protected Map createData(E event) { data.put(entry.getKey(), entry.getValue()); } } else { - data.put(DATA_MESSAGE, encoder.encode(event)); + data.put(messageFieldKeyName, encoder != null ? encoder.encode(event) : event.toString()); } if (additionalFields != null) { diff --git a/src/main/java/ch/qos/logback/more/appenders/KinesisStreamAppenderBase.java b/src/main/java/ch/qos/logback/more/appenders/KinesisStreamAppenderBase.java index 4314733..999116c 100644 --- a/src/main/java/ch/qos/logback/more/appenders/KinesisStreamAppenderBase.java +++ b/src/main/java/ch/qos/logback/more/appenders/KinesisStreamAppenderBase.java @@ -34,10 +34,10 @@ public abstract class KinesisStreamAppenderBase extends AwsAppender { @Override public void start() { - super.start(); if (streamName == null || streamName.length() == 0) { throw new IllegalArgumentException("streamName must be defined."); } + super.start(); } @Override diff --git a/src/main/java/ch/qos/logback/more/appenders/KinesisStreamLogbackAppender.java b/src/main/java/ch/qos/logback/more/appenders/KinesisStreamLogbackAppender.java index 534bc77..25592f2 100644 --- a/src/main/java/ch/qos/logback/more/appenders/KinesisStreamLogbackAppender.java +++ b/src/main/java/ch/qos/logback/more/appenders/KinesisStreamLogbackAppender.java @@ -67,9 +67,9 @@ public void setEncoder(Encoder encoder) { @Override public void start() { - super.start(); this.emitter = new IntervalEmitter(emitInterval, new KinesisEventMapper(), new KinesisIntervalAppender()); + super.start(); } @Override diff --git a/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java b/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java index b57bbcc..808d4d3 100644 --- a/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java +++ b/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java @@ -15,88 +15,75 @@ */ package ch.qos.logback.more.appenders; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.more.appenders.marker.MapMarker; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.*; + import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.more.appenders.marker.MapMarker; public class LogbackAppenderTest { private static final Logger LOG = LoggerFactory.getLogger(LogbackAppenderTest.class); - @Before - public void before() { + @BeforeClass + public static void before() { LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); if (!lc.isStarted()) { lc.start(); } } - @After - public void after() { + @AfterClass + public static void after() throws InterruptedException { LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); lc.stop(); + + Thread.sleep(1500); // Wait a moment because these log is being appended asynchronous... } @Test - public void logSimple() throws InterruptedException { - + public void logSimple() { LOG.debug("Test the logger 1."); LOG.debug("Test the logger 2."); - - Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } @Test - public void logMdc() throws InterruptedException { + public void logMdc() { MDC.put("req.requestURI", "/hello/world.js"); LOG.debug("Test the logger 1."); LOG.debug("Test the logger 2."); - - Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } @Test - public void logMarker() throws InterruptedException { + public void logMarker() { Marker sendEmailMarker = MarkerFactory.getMarker("SEND_EMAIL"); LOG.debug(sendEmailMarker, "Test the marker 1."); LOG.debug(sendEmailMarker, "Test the marker 2."); - - Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } @Test - public void logNestedMarker() throws InterruptedException { + public void logNestedMarker() { Marker notifyMarker = MarkerFactory.getMarker("NOTIFY"); Marker sendEmailMarker = MarkerFactory.getMarker("SEND_EMAIL"); sendEmailMarker.add(notifyMarker); LOG.debug(sendEmailMarker, "Test the nested marker 1."); LOG.debug(sendEmailMarker, "Test the nested marker 2."); - - Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } @Test - public void logThrowable() throws InterruptedException { + public void logThrowable() { LOG.info("Without Exception."); LOG.error("Test the checked Exception.", new IOException("Connection something")); LOG.warn("Test the unchecked Exception.", new IllegalStateException("Oh your state")); - - Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } @Test - public void logMapMarker() throws InterruptedException { - + public void logMapMarker() { Map map = new HashMap(); map.put("key1", "value1"); map.put("key2", "value2"); @@ -104,12 +91,10 @@ public void logMapMarker() throws InterruptedException { MapMarker mapMarker = new MapMarker("MAP_MARKER", map); LOG.debug(mapMarker, "Test the marker map."); - - Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } @Test - public void logNestedMapMarker() throws InterruptedException { + public void logNestedMapMarker() { Marker notifyMarker = MarkerFactory.getMarker("NOTIFY"); Map map = new HashMap(); map.put("key1", "value1"); @@ -118,16 +103,11 @@ public void logNestedMapMarker() throws InterruptedException { MapMarker mapMarker = new MapMarker("MAP_MARKER", map); notifyMarker.add(mapMarker); LOG.debug(notifyMarker, "Test the nested marker map."); - - - Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } @Test - public void logDecidedByAppendersMarkerFilter() throws InterruptedException { + public void logDecidedByAppendersMarkerFilter() { Marker alertMarker = MarkerFactory.getMarker("SECURITY_ALERT"); LOG.debug(alertMarker, "Test alert filter."); - - Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } } diff --git a/src/test/resources/logback-appenders-aws.xml b/src/test/resources/logback-appenders-aws.xml index 861e215..38d8de5 100644 --- a/src/test/resources/logback-appenders-aws.xml +++ b/src/test/resources/logback-appenders-aws.xml @@ -1,5 +1,4 @@ - @@ -118,7 +117,7 @@ If the worker takes longer than this time it will exit, discarding any remaining items in the queue. 10000 millis --> - 100 + 1000 diff --git a/src/test/resources/logback-appenders-fluentd.xml b/src/test/resources/logback-appenders-fluentd.xml index 4e86d05..27c9ed0 100644 --- a/src/test/resources/logback-appenders-fluentd.xml +++ b/src/test/resources/logback-appenders-fluentd.xml @@ -1,5 +1,4 @@ - - + @@ -11,6 +10,7 @@ debug + localhost 24224 @@ -23,12 +23,22 @@ foo2 bar2 + + + 16777216 + 10000 + false + + + + msg + @@ -40,7 +50,7 @@ If the worker takes longer than this time it will exit, discarding any remaining items in the queue. 10000 millis --> - 10000 + 1000 @@ -50,12 +60,14 @@ debug + + localhost 24224 - + @@ -79,7 +92,7 @@ true - /tmp + 2097152 16777216 268435456 @@ -92,12 +105,17 @@ true false - + false + + + + msg + @@ -109,7 +127,7 @@ If the worker takes longer than this time it will exit, discarding any remaining items in the queue. 10000 millis --> - 10000 + 1000 diff --git a/src/test/resources/logback-appenders-std.xml b/src/test/resources/logback-appenders-std.xml index b6a6a39..cb35dbb 100644 --- a/src/test/resources/logback-appenders-std.xml +++ b/src/test/resources/logback-appenders-std.xml @@ -1,5 +1,4 @@ - @@ -35,7 +34,7 @@ "requestURI": "%X{req.requestURI}", "message": "%msg" }\n]]> - + true diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 97a1a68..e48dacb 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -1,5 +1,4 @@ -