-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enable some asynchronous logging (revised) #49
Open
cosmo0920
wants to merge
20
commits into
fluent:master
Choose a base branch
from
cosmo0920:asynchronous-logging
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
476ee0d
add getLogger(..., Sender) method to the factory; add an asynchronous…
1c57d42
add getLogger(..., Sender) method to the factory; add an asynchronous…
477f8b9
Unify override set/remove ErrorHandler implementation in AsyncRawSock…
cosmo0920 3054ddc
Fix wrongly rebased lines
cosmo0920 1f14952
Fix a typo
cosmo0920 e9af4d1
Remove needless null check
cosmo0920 ecb42a5
Don't static
cosmo0920 671450c
Rename flusher -> senderTask
cosmo0920 f8aa594
Remove unused getLogger overload
cosmo0920 2b06c14
Make sender.isConnected to be thread safe
cosmo0920 9454a59
Add test cases for AsyncRawSocketSender
cosmo0920 0018efc
Add test case for buffering and resending
cosmo0920 ef3307b
Try to add reconnect after buffer full test case for async sender
cosmo0920 52ed53f
Make more readable assertion constraints
cosmo0920 acb72b6
Ensure starting mock Fluentd
cosmo0920 7392010
Remove unused import
cosmo0920 4c27f76
Add experimental disclaimer note
cosmo0920 a0fa629
Add a newline in javadoc
cosmo0920 af5eac6
Return true always
cosmo0920 046b798
Adjust test cases
cosmo0920 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
139 changes: 139 additions & 0 deletions
139
src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
|
||
package org.fluentd.logger.sender; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
|
||
import org.fluentd.logger.errorhandler.ErrorHandler; | ||
import org.fluentd.logger.sender.ExponentialDelayReconnector; | ||
import org.fluentd.logger.sender.RawSocketSender; | ||
import org.fluentd.logger.sender.Reconnector; | ||
import org.fluentd.logger.sender.Sender; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* An asynchronous wrapper around RawSocketSender | ||
* <br> | ||
* This feature is highly experimental. | ||
* | ||
* @author mxk | ||
* | ||
*/ | ||
public class AsyncRawSocketSender implements Sender { | ||
|
||
private final class EmitRunnable implements Runnable { | ||
private final String tag; | ||
private final Map<String, Object> data; | ||
private final RawSocketSender sender; | ||
private final long timestamp; | ||
|
||
private EmitRunnable(String tag, Map<String, Object> data, | ||
RawSocketSender sender, long timestamp) { | ||
this.tag = tag; | ||
this.data = data; | ||
this.sender = sender; | ||
this.timestamp = timestamp; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
sender.emit(tag, timestamp, data); | ||
} | ||
} | ||
|
||
private final class FlushRunnable implements Runnable { | ||
private final RawSocketSender sender; | ||
|
||
private FlushRunnable(RawSocketSender sender) { | ||
this.sender = sender; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
sender.flush(); | ||
} | ||
} | ||
|
||
private RawSocketSender sender; | ||
private Reconnector reconnector; | ||
|
||
@SuppressWarnings("unused") | ||
private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class); | ||
|
||
private final ExecutorService senderTask = Executors.newSingleThreadExecutor(); | ||
|
||
private static final ErrorHandler DEFAULT_ERROR_HANDLER = new ErrorHandler() {}; | ||
|
||
private ErrorHandler errorHandler = DEFAULT_ERROR_HANDLER; | ||
|
||
public AsyncRawSocketSender() { | ||
this("localhost", 24224); | ||
} | ||
|
||
public AsyncRawSocketSender(String host, int port) { | ||
this(host, port, 3 * 1000, 8 * 1024 * 1024); | ||
} | ||
|
||
public AsyncRawSocketSender(String host, int port, int timeout, | ||
int bufferCapacity) { | ||
this(host, port, timeout, bufferCapacity, | ||
new ExponentialDelayReconnector()); | ||
} | ||
|
||
public AsyncRawSocketSender(String host, int port, int timeout, | ||
int bufferCapacity, Reconnector reconnector) { | ||
this.reconnector = reconnector; | ||
this.sender = new RawSocketSender(host, port, timeout, bufferCapacity, | ||
reconnector); | ||
} | ||
|
||
@Override | ||
public synchronized void flush() { | ||
final RawSocketSender sender = this.sender; | ||
senderTask.execute(new FlushRunnable(sender)); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
sender.close(); | ||
} | ||
|
||
@Override | ||
public boolean emit(String tag, Map<String, Object> data) { | ||
return emit(tag, System.currentTimeMillis() / 1000, data); | ||
} | ||
|
||
@Override | ||
public boolean emit(final String tag, final long timestamp, final Map<String, Object> data) { | ||
final RawSocketSender sender = this.sender; | ||
senderTask.execute(new EmitRunnable(tag, data, sender, timestamp)); | ||
|
||
return true; | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return sender.getName(); | ||
} | ||
|
||
@Override | ||
public synchronized boolean isConnected() { | ||
return sender.isConnected(); | ||
} | ||
|
||
@Override | ||
public void setErrorHandler(ErrorHandler errorHandler) { | ||
if (errorHandler == null) { | ||
throw new IllegalArgumentException("errorHandler is null"); | ||
} | ||
|
||
this.errorHandler = errorHandler; | ||
} | ||
|
||
@Override | ||
public void removeErrorHandler() { | ||
this.errorHandler = DEFAULT_ERROR_HANDLER; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait until a running thread finishes if it exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to implement waiting until finishing running threads.
I'm wondering which is better: