Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PackedForward instead of Message #50

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,11 @@ class BufferOverflowHandler

def flush(messages)
@buffer ||= []
MessagePack::Unpacker.new.feed_each(messages) do |msg|
@buffer << msg
messages.each do |tag, message|
unpacker = MessagePack::Unpacker.new(StringIO.new(message))
unpacker.each do |time, record|
@buffer << [tag, time, record]
end
end
end
end
Expand Down
27 changes: 18 additions & 9 deletions lib/fluent/logger/fluent_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
require 'monitor'
require 'logger'
require 'json'
require 'base64'
require 'securerandom'

module Fluent
module Logger
Expand Down Expand Up @@ -131,17 +133,19 @@ def post_with_time(tag, map, time)
@logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
if @nanosecond_precision && time.is_a?(Time)
write [tag, EventTime.new(time.to_i, time.nsec), map]
write(tag, EventTime.new(time.to_i, time.nsec), map)
else
write [tag, time.to_i, map]
write(tag, time.to_i, map)
end
end

def close
@mon.synchronize {
if @pending
begin
send_data(@pending)
@pending.each do |tag, record|
send_data([tag, record].to_msgpack)
end
rescue => e
set_last_error(e)
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
Expand Down Expand Up @@ -173,7 +177,7 @@ def connection_string

def pending_bytesize
if @pending
@pending.bytesize
@pending.to_s.bytesize
else
0
end
Expand Down Expand Up @@ -201,20 +205,22 @@ def suppress_sec
end
end

def write(msg)
def write(tag, time, map)
begin
data = to_msgpack(msg)
record = to_msgpack([time, map])
rescue => e
set_last_error(e)
msg = [tag, time, map]
@logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}")
return false
end

@mon.synchronize {
if @pending
@pending << data
@pending[tag] << record
else
@pending = data
@pending = Hash.new{|h, k| h[k] = "" }
@pending[tag] = record
end

# suppress reconnection burst
Expand All @@ -225,7 +231,9 @@ def write(msg)
end

begin
send_data(@pending)
@pending.each do |tag, record|
send_data([tag, record].to_msgpack)
end
@pending = nil
true
rescue => e
Expand Down Expand Up @@ -260,6 +268,7 @@ def send_data(data)
# end
# data = data[n..-1]
#end

true
end

Expand Down
13 changes: 8 additions & 5 deletions spec/fluent_logger_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
logger_io.rewind
log = logger_io.read
expect(log).to match /Failed to connect/
expect(log).to match /Can't send logs to/
expect(log).to match /Can\'t send logs to/
}

it ('post limit over') do
Expand All @@ -206,11 +206,11 @@
expect(fluentd.queue.last).to be_nil

logger_io.rewind
expect(logger_io.read).not_to match /Can't send logs to/
expect(logger_io.read).not_to match /Can\'t send logs to/

logger.post('tag', {'a' => ('c' * 1000)})
logger_io.rewind
expect(logger_io.read).to match /Can't send logs to/
expect(logger_io.read).to match /Can\'t send logs to/
end

it ('log connect error once') do
Expand All @@ -233,8 +233,11 @@ class BufferOverflowHandler

def flush(messages)
@buffer ||= []
MessagePack::Unpacker.new.feed_each(messages) do |msg|
@buffer << msg
messages.each do |tag, message|
unpacker = MessagePack::Unpacker.new(StringIO.new(message))
unpacker.each do |time, record|
@buffer << [tag, time, record]
end
end
end
end
Expand Down