From a52820ae4d0cbe8090811693e528251b0cfe922e Mon Sep 17 00:00:00 2001 From: okkez Date: Tue, 8 Mar 2016 21:50:00 +0900 Subject: [PATCH] Add require_ack_response option --- lib/fluent/logger/fluent_logger.rb | 36 ++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/lib/fluent/logger/fluent_logger.rb b/lib/fluent/logger/fluent_logger.rb index 7cb108b..fb6972d 100644 --- a/lib/fluent/logger/fluent_logger.rb +++ b/lib/fluent/logger/fluent_logger.rb @@ -90,6 +90,9 @@ def initialize(tag_prefix = nil, *args) end @packer = @factory.packer + @require_ack_response = options[:require_ack_response] + @ack_response_timeout = options[:ack_response_timeout] || 190 + @mon = Monitor.new @pending = nil @connect_error_history = [] @@ -144,7 +147,7 @@ def close if @pending begin @pending.each do |tag, record| - send_data([tag, record].to_msgpack) + send_data(tag, record) end rescue => e set_last_error(e) @@ -232,7 +235,7 @@ def write(tag, time, map) begin @pending.each do |tag, record| - send_data([tag, record].to_msgpack) + send_data(tag, record) end @pending = nil true @@ -250,11 +253,17 @@ def write(tag, time, map) } end - def send_data(data) + def send_data(tag, record) unless connect? connect! end - @con.write data + if @require_ack_response + option = {} + option['chunk'] = generate_chunk + @con.write [tag, record, option].to_msgpack + else + @con.write [tag, record].to_msgpack + end #while true # puts "sending #{data.length} bytes" # if data.length > 32*1024 @@ -269,6 +278,21 @@ def send_data(data) # data = data[n..-1] #end + if @require_ack_response && @ack_response_timeout > 0 + if IO.select([@con], nil, nil, @ack_response_timeout) + raw_data = @con.recv(1024) + + if raw_data.empty? + raise "Closed connection" + else + response = MessagePack.unpack(raw_data) + if response['ack'] != option['chunk'] + raise "ack in response and chunk id in sent data are different" + end + end + end + end + true end @@ -307,6 +331,10 @@ def set_last_error(e) # TODO: Check non GVL env @last_error[Thread.current.object_id] = e end + + def generate_chunk + Base64.encode64(([SecureRandom.random_number(1 << 32)] * 4).pack('NNNN')).chomp + end end end end