From 2d96a598ac837722722987bf0a058d822b4d0ac8 Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Thu, 24 Dec 2015 12:12:09 +0900 Subject: [PATCH 1/2] Use nonblocking write See #30 --- lib/fluent/logger/fluent_logger.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/logger/fluent_logger.rb b/lib/fluent/logger/fluent_logger.rb index ebccf55..d0d12b2 100644 --- a/lib/fluent/logger/fluent_logger.rb +++ b/lib/fluent/logger/fluent_logger.rb @@ -185,7 +185,7 @@ def send_data(data) unless connect? connect! end - @con.write data + @con.write_nonblock data #while true # puts "sending #{data.length} bytes" # if data.length > 32*1024 From 93903e821f882396c4df1176e80e86668d39b816 Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Thu, 24 Dec 2015 12:13:08 +0900 Subject: [PATCH 2/2] Add `:wait_writeable` option When detect EAGAIN or EWOULDBLOCK, `:wait_writeable` is true: store the data in buffer and try to write next time. `:wait_writeable` is false: Raise IO::EAGAINWaitWritable --- lib/fluent/logger/fluent_logger.rb | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lib/fluent/logger/fluent_logger.rb b/lib/fluent/logger/fluent_logger.rb index d0d12b2..aa19378 100644 --- a/lib/fluent/logger/fluent_logger.rb +++ b/lib/fluent/logger/fluent_logger.rb @@ -75,6 +75,9 @@ def initialize(tag_prefix = nil, *args) end end + @wait_writeable = true + @wait_writeable = options[:wait_writeable] if options.key?(:wait_writeable) + @last_error = {} begin @@ -168,6 +171,9 @@ def write(msg) @pending = nil true rescue => e + unless wait_writeable?(e) + raise e + end set_last_error(e) if @pending.bytesize > @limit @logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}") @@ -237,6 +243,14 @@ def set_last_error(e) # TODO: Check non GVL env @last_error[Thread.current.object_id] = e end + + def wait_writeable?(e) + if e.instance_of?(IO::EAGAINWaitWritable) + @wait_writeable + else + true + end + end end end end