Skip to content

Commit

Permalink
Add options for Fluent / Fluency appenders: #50
Browse files Browse the repository at this point in the history
  * "encoder" option becomes optional. If you don't set encoder, the log message outputs pure message.
  * "messageFieldKeyName" option has been added: Set it if you want to change the default message field key name(= "message").
Fix bug: Messages were lost sometimes when the messages had been waiting for flushing on shutting down an application.
Fix bug: Fluent appender: Do not raise an error when a fluentd server is down.
  • Loading branch information
sndyuk committed Oct 3, 2020
1 parent a3ac554 commit 892ceec
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 78 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ server.ec2*
AwsCredentials.properties
.directory

/fluent
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ Just add it to your dependency then you can use the slf4j module in your applica

### Latest changes

##### Version 1.8.1

* Add options for Fluent / Fluency appenders:
* "encoder" option becomes optional. If you don't set encoder, the log message outputs pure message.
* "messageFieldKeyName" option has been added: Set it if you want to change the default message field key name(= "message").
* Fix bug: Messages were lost sometimes when the messages had been waiting for flushing on shutting down an application.
* Fix bug: Fluent appender: Do not raise an error when a fluentd server is down.

##### Version 1.8.0

* Upgrade Fluency version from 2.2.1 to 2.4.1 https://github.com/sndyuk/logback-more-appenders/pull/48
Expand Down Expand Up @@ -134,5 +142,8 @@ You can find the sample configuration files here:

- [logback.xml](https://github.com/sndyuk/logback-more-appenders/blob/master/src/test/resources/logback.xml)

###


### License
[Apache License, Version 2.0](LICENSE)
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
open module org.slf4j {
requires transitive slf4j.api;
requires transitive logback.core;
requires transitive logback.classic;

requires static slf4j.api;
requires static logback.core;
requires static logback.classic;
requires static logback.access;
requires static fluent.logger;
requires static fluency.core;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/ch/qos/logback/more/appenders/AwsAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
*/
package ch.qos.logback.more.appenders;

import ch.qos.logback.core.UnsynchronizedAppenderBase;
import ch.qos.logback.core.AppenderBase;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;


public abstract class AwsAppender<E> extends UnsynchronizedAppenderBase<E> {
public abstract class AwsAppender<E> extends AppenderBase<E> {

protected AwsConfig config;
protected AWSCredentialsProvider credentialsProvider;
Expand All @@ -30,7 +30,6 @@ public abstract class AwsAppender<E> extends UnsynchronizedAppenderBase<E> {
@Override
public void start() {
try {
super.start();
if (config.getCredentialFilePath() != null
&& config.getCredentialFilePath().length() > 0) {
this.credentials = new PropertiesCredentials(getClass().getClassLoader()
Expand All @@ -40,6 +39,7 @@ public void start() {
} else {
this.credentialsProvider = DefaultAWSCredentialsProviderChain.getInstance();
}
super.start();
} catch (Exception e) {
addWarn("Could not initialize " + AwsAppender.class.getCanonicalName()
+ " ( will try to initialize again later ): " + e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ public void setEncoder(Encoder<E> encoder) {

@Override
public void start() {
super.start();
if (logGroupName == null || logGroupName.length() == 0 || logStreamName == null) {
throw new IllegalArgumentException("logGroupName and logStreamName must be defined.");
}
this.emitter = new IntervalEmitter<E, InputLogEvent>(emitInterval,
new CloudWatchEventMapper(), new CloudWatchIntervalAppender());
super.start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.LayoutWrappingEncoder;
import org.fluentd.logger.sender.ExponentialDelayReconnector;

public class DataFluentAppender<E> extends FluentdAppenderBase<E> {
private FluentLogger fluentLogger;
Expand All @@ -39,6 +40,8 @@ public void setEncoder(Encoder<E> encoder) {
this.encoder = encoder;
}

public void setMessageFieldKeyName(String messageFieldKeyName) { this.messageFieldKeyName = messageFieldKeyName; }

public void addAdditionalField(Field field) {
if (additionalFields == null) {
additionalFields = new HashMap<String, String>();
Expand All @@ -51,8 +54,8 @@ public void addAdditionalField(Field field) {

@Override
public void start() {
fluentLogger = FluentLogger.getLogger(label != null ? tag : null, remoteHost, port, getTimeout(), getBufferCapacity());
super.start();
fluentLogger = FluentLogger.getLogger(label != null ? tag : null, remoteHost, port);
}


Expand Down Expand Up @@ -85,6 +88,8 @@ protected void append(E event) {
private String remoteHost;
private int port;
private boolean useEventTime;
private Integer timeout;
private Integer bufferCapacity;

public String getTag() {
return tag;
Expand Down Expand Up @@ -125,4 +130,20 @@ public boolean isUseEventTime() {
public void setUseEventTime(boolean useEventTime) {
this.useEventTime = useEventTime;
}

public void setTimeout(Integer timeout) {
this.timeout = timeout;
}

public int getTimeout() {
return timeout != null ? timeout : 1000;
}

public void setBufferCapacity(Integer bufferCapacity) {
this.bufferCapacity = bufferCapacity;
}

public int getBufferCapacity() {
return bufferCapacity != null ? bufferCapacity : 16777216;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public void setEncoder(Encoder<E> encoder) {
this.encoder = encoder;
}

public void setMessageFieldKeyName(String messageFieldKeyName) { this.messageFieldKeyName = messageFieldKeyName; }

public void addAdditionalField(Field field) {
if (additionalFields == null) {
additionalFields = new HashMap<String, String>();
Expand All @@ -64,15 +66,14 @@ public void addAdditionalField(Field field) {

@Override
public void start() {
super.start();

try {
FluencyBuilderForFluentd builder = configureFluency();
if (remoteHost != null && port > 0 && (remoteServers == null || remoteServers.getRemoteServers().size() == 0)) {
this.fluency = builder.build(remoteHost, port);
} else {
this.fluency = builder.build(configureServers());
}
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -81,7 +82,6 @@ public void start() {
@Override
protected void append(E event) {
Map<String, Object> data = createData(event);

try {
String tag = this.tag == null ? "" : this.tag;
if (this.isUseEventTime()) {
Expand Down Expand Up @@ -111,6 +111,9 @@ public void stop() {
super.stop();
} finally {
try {
fluency.flush();
long maxWaitMillis = Math.min((waitUntilBufferFlushed != null ? waitUntilBufferFlushed : 1) + (waitUntilFlusherTerminated != null ? waitUntilFlusherTerminated : 1), 5) * 1000;
Thread.sleep(maxWaitMillis);
fluency.close();
} catch (Exception e) {
// pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@
*/
package ch.qos.logback.more.appenders;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Marker;
import ch.qos.logback.classic.pattern.CallerDataConverter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.ThrowableProxyUtil;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import ch.qos.logback.core.encoder.EchoEncoder;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.more.appenders.marker.MapMarker;
import org.slf4j.Marker;

abstract class FluentdAppenderBase<E> extends UnsynchronizedAppenderBase<E> {
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

abstract class FluentdAppenderBase<E> extends AppenderBase<E> {
private static final String DATA_MESSAGE = "message";
private static final String DATA_LOGGER = "logger";
private static final String DATA_THREAD = "thread";
Expand All @@ -35,16 +34,16 @@ abstract class FluentdAppenderBase<E> extends UnsynchronizedAppenderBase<E> {
private static final String DATA_CALLER = "caller";
private static final String DATA_THROWABLE = "throwable";

protected Encoder<E> encoder = new EchoEncoder<E>();
protected Encoder<E> encoder;
protected Map<String, String> additionalFields;
protected boolean flattenMapMarker;
protected String messageFieldKeyName = DATA_MESSAGE;

protected Map<String, Object> createData(E event) {
Map<String, Object> data = new HashMap<String, Object>();

if (event instanceof ILoggingEvent) {
ILoggingEvent loggingEvent = (ILoggingEvent) event;
data.put(DATA_MESSAGE, loggingEvent.getFormattedMessage());
data.put(messageFieldKeyName, encoder != null ? encoder.encode(event) : loggingEvent.getFormattedMessage());
data.put(DATA_LOGGER, loggingEvent.getLoggerName());
data.put(DATA_THREAD, loggingEvent.getThreadName());
data.put(DATA_LEVEL, loggingEvent.getLevel().levelStr);
Expand Down Expand Up @@ -77,7 +76,7 @@ protected Map<String, Object> createData(E event) {
data.put(entry.getKey(), entry.getValue());
}
} else {
data.put(DATA_MESSAGE, encoder.encode(event));
data.put(messageFieldKeyName, encoder != null ? encoder.encode(event) : event.toString());
}

if (additionalFields != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public abstract class KinesisStreamAppenderBase<E> extends AwsAppender<E> {

@Override
public void start() {
super.start();
if (streamName == null || streamName.length() == 0) {
throw new IllegalArgumentException("streamName must be defined.");
}
super.start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public void setEncoder(Encoder<ILoggingEvent> encoder) {

@Override
public void start() {
super.start();
this.emitter = new IntervalEmitter<ILoggingEvent, PutRecordsRequestEntry>(emitInterval,
new KinesisEventMapper(), new KinesisIntervalAppender());
super.start();
}

@Override
Expand Down
Loading

0 comments on commit 892ceec

Please sign in to comment.