From 84b37c80451fb4060e7ba967bff6f6180a807241 Mon Sep 17 00:00:00 2001 From: okkez Date: Mon, 22 Feb 2016 21:40:31 +0900 Subject: [PATCH 1/4] Use PackedForward instead of Message See https://github.com/fluent/fluentd/issues/671 --- lib/fluent/logger/fluent_logger.rb | 25 +++++++++++++++++-------- spec/fluent_logger_spec.rb | 13 ++++++++----- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/lib/fluent/logger/fluent_logger.rb b/lib/fluent/logger/fluent_logger.rb index b974265..3d8578f 100644 --- a/lib/fluent/logger/fluent_logger.rb +++ b/lib/fluent/logger/fluent_logger.rb @@ -20,6 +20,8 @@ require 'monitor' require 'logger' require 'json' +require 'base64' +require 'securerandom' module Fluent module Logger @@ -131,9 +133,9 @@ def post_with_time(tag, map, time) @logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug? tag = "#{@tag_prefix}.#{tag}" if @tag_prefix if @nanosecond_precision && time.is_a?(Time) - write [tag, EventTime.new(time.to_i, time.nsec), map] + write(tag, EventTime.new(time.to_i, time.nsec), map) else - write [tag, time.to_i, map] + write(tag, time.to_i, map) end end @@ -141,7 +143,9 @@ def close @mon.synchronize { if @pending begin - send_data(@pending) + @pending.each do |tag, record| + send_data([tag, record].to_msgpack) + end rescue => e set_last_error(e) @logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}") @@ -201,20 +205,22 @@ def suppress_sec end end - def write(msg) + def write(tag, time, map) begin - data = to_msgpack(msg) + record = to_msgpack([time, map]) rescue => e set_last_error(e) + msg = [tag, time, map] @logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}") return false end @mon.synchronize { if @pending - @pending << data + @pending[tag] << record else - @pending = data + @pending = Hash.new{|h, k| h[k] = "" } + @pending[tag] = record end # suppress reconnection burst @@ -225,7 +231,9 @@ def write(msg) end begin - send_data(@pending) + @pending.each do |tag, record| + send_data([tag, record].to_msgpack) + end @pending = nil true rescue => e @@ -260,6 +268,7 @@ def send_data(data) # end # data = data[n..-1] #end + true end diff --git a/spec/fluent_logger_spec.rb b/spec/fluent_logger_spec.rb index eb565f2..4098122 100644 --- a/spec/fluent_logger_spec.rb +++ b/spec/fluent_logger_spec.rb @@ -196,7 +196,7 @@ logger_io.rewind log = logger_io.read expect(log).to match /Failed to connect/ - expect(log).to match /Can't send logs to/ + expect(log).to match /Can\'t send logs to/ } it ('post limit over') do @@ -206,11 +206,11 @@ expect(fluentd.queue.last).to be_nil logger_io.rewind - expect(logger_io.read).not_to match /Can't send logs to/ + expect(logger_io.read).not_to match /Can\'t send logs to/ logger.post('tag', {'a' => ('c' * 1000)}) logger_io.rewind - expect(logger_io.read).to match /Can't send logs to/ + expect(logger_io.read).to match /Can\'t send logs to/ end it ('log connect error once') do @@ -233,8 +233,11 @@ class BufferOverflowHandler def flush(messages) @buffer ||= [] - MessagePack::Unpacker.new.feed_each(messages) do |msg| - @buffer << msg + messages.each do |tag, message| + unpacker = MessagePack::Unpacker.new(StringIO.new(message)) + unpacker.each do |time, record| + @buffer << [tag, time, record] + end end end end From e6a772b6dc07a15dd0f2294ec08c162f4d83919d Mon Sep 17 00:00:00 2001 From: okkez Date: Tue, 23 Feb 2016 17:33:23 +0900 Subject: [PATCH 2/4] Update README.md to follow previous change --- README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e7f9df3..4fd862d 100644 --- a/README.md +++ b/README.md @@ -152,8 +152,11 @@ class BufferOverflowHandler def flush(messages) @buffer ||= [] - MessagePack::Unpacker.new.feed_each(messages) do |msg| - @buffer << msg + messages.each do |tag, message| + unpacker = MessagePack::Unpacker.new(StringIO.new(message)) + unpacker.each do |time, record| + @buffer << [tag, time, record] + end end end end From 36f115f3e8fd4a21192ddf5a66fe7799822132a4 Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Wed, 26 Jul 2017 15:27:51 +0900 Subject: [PATCH 3/4] Use `#to_s` for Hash object Hash does not have `#bytesize` method. --- lib/fluent/logger/fluent_logger.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/logger/fluent_logger.rb b/lib/fluent/logger/fluent_logger.rb index 3d8578f..7cb108b 100644 --- a/lib/fluent/logger/fluent_logger.rb +++ b/lib/fluent/logger/fluent_logger.rb @@ -177,7 +177,7 @@ def connection_string def pending_bytesize if @pending - @pending.bytesize + @pending.to_s.bytesize else 0 end From 3ac3e136f529b8d242676ebdc76bd03c2fb678dd Mon Sep 17 00:00:00 2001 From: okkez Date: Tue, 8 Mar 2016 21:50:00 +0900 Subject: [PATCH 4/4] Add require_ack_response option Signed-off-by: Kenji Okimoto --- lib/fluent/logger/fluent_logger.rb | 36 ++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/lib/fluent/logger/fluent_logger.rb b/lib/fluent/logger/fluent_logger.rb index 7cb108b..fb6972d 100644 --- a/lib/fluent/logger/fluent_logger.rb +++ b/lib/fluent/logger/fluent_logger.rb @@ -90,6 +90,9 @@ def initialize(tag_prefix = nil, *args) end @packer = @factory.packer + @require_ack_response = options[:require_ack_response] + @ack_response_timeout = options[:ack_response_timeout] || 190 + @mon = Monitor.new @pending = nil @connect_error_history = [] @@ -144,7 +147,7 @@ def close if @pending begin @pending.each do |tag, record| - send_data([tag, record].to_msgpack) + send_data(tag, record) end rescue => e set_last_error(e) @@ -232,7 +235,7 @@ def write(tag, time, map) begin @pending.each do |tag, record| - send_data([tag, record].to_msgpack) + send_data(tag, record) end @pending = nil true @@ -250,11 +253,17 @@ def write(tag, time, map) } end - def send_data(data) + def send_data(tag, record) unless connect? connect! end - @con.write data + if @require_ack_response + option = {} + option['chunk'] = generate_chunk + @con.write [tag, record, option].to_msgpack + else + @con.write [tag, record].to_msgpack + end #while true # puts "sending #{data.length} bytes" # if data.length > 32*1024 @@ -269,6 +278,21 @@ def send_data(data) # data = data[n..-1] #end + if @require_ack_response && @ack_response_timeout > 0 + if IO.select([@con], nil, nil, @ack_response_timeout) + raw_data = @con.recv(1024) + + if raw_data.empty? + raise "Closed connection" + else + response = MessagePack.unpack(raw_data) + if response['ack'] != option['chunk'] + raise "ack in response and chunk id in sent data are different" + end + end + end + end + true end @@ -307,6 +331,10 @@ def set_last_error(e) # TODO: Check non GVL env @last_error[Thread.current.object_id] = e end + + def generate_chunk + Base64.encode64(([SecureRandom.random_number(1 << 32)] * 4).pack('NNNN')).chomp + end end end end