diff --git a/lib/zkruby/rubyio.rb b/lib/zkruby/rubyio.rb index 4c26471..d1f326d 100644 --- a/lib/zkruby/rubyio.rb +++ b/lib/zkruby/rubyio.rb @@ -23,6 +23,16 @@ # All interaction with the session is synchronized # # Client synchronous code is implemented with a condition variable that waits on the callback/errback +# + +# JRuby does not define Errno::NOERROR +unless defined? Errno::NOERROR + class Errno::NOERROR < SystemCallError + Errno = 0 + end + +end + module ZooKeeper::RubyIO class Connection @@ -30,39 +40,61 @@ class Connection include Slf4r::Logger include Socket::Constants + HAS_NONBLOCKING_CONNECT = RUBY_PLATFORM == "java" && Gem::Version.new(JRUBY_VERSION.dup) >= Gem::Version.new("1.6.7") + SOL_TCP = IPPROTO_TCP unless defined? ::Socket::SOL_TCP + def initialize(host,port,timeout,session) @session = session @write_queue = Queue.new() - # JRuby cannot do non-blocking connects, which means there is - # no way to properly implement the connection-timeout - # See http://jira.codehaus.org/browse/JRUBY-5165 - # In any case this should be encapsulated in TCPSocket.open(host,port,timeout) - if RUBY_PLATFORM == "java" - begin - sock = TCPSocket.new(host,port.to_i) - rescue Errno::ECONNREFUSED - logger.warn("TCP Connection refused to #{host}:#{port}") - sock = nil - end - else + if HAS_NONBLOCKING_CONNECT addr = Socket.getaddrinfo(host, nil) sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) sock.setsockopt(SOL_SOCKET, SO_LINGER, [0,-1].pack("ii")) - sock.setsockopt(SOL_TCP, TCP_NODELAY,[0].pack("i_")) + begin + sock.setsockopt(SOL_TCP, TCP_NODELAY,[0].pack("i_")) + rescue + # JRuby defines SOL_TCP, but it doesn't work + sock.setsockopt(IPPROTO_TCP, TCP_NODELAY,[0].pack("i_")) + end + sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) begin sock.connect_nonblock(sockaddr) rescue Errno::EINPROGRESS - resp = IO.select(nil, [sock], nil, timeout) begin - sock.connect_nonblock(sockaddr) - rescue Errno::ECONNREFUSED - logger.warn("Connection refused to #{ host }:#{ port }") + read,write,errors = IO.select(nil, [sock], nil, timeout) + rescue Exception => ex + #JRuby raises Connection Refused instead of populating error array + logger.warn { "Exception #{ex.inspect} from non blocking select" } + end + optval = sock.getsockopt(Socket::SOL_SOCKET,Socket::SO_ERROR) + sockerr = (optval.unpack "i")[0] + if sockerr == Errno::NOERROR::Errno + #Woohoo! we're connected (lots of example code here will call + #connect_nonblock again to demonstrate EISCONN but I don't think + #this is strictly necessary + else + if sockerr == Errno::ECONNREFUSED::Errno + logger.warn("Connection refused to #{ host }:#{ port }") + else + logger.warn("Connection to #{ host }:#{ port } failed: #{sockerr}") + end sock = nil - rescue Errno::EISCONN end end + else + # JRuby prior to 1.6.7 cannot do non-blocking connects, which means there is + # no way to properly implement the connection-timeout + # See http://jira.codehaus.org/browse/JRUBY-5165 + # In any case this should be encapsulated in TCPSocket.open(host,port,timeout) + logger.warn { "Using blocking connect (JRuby < 1.6.7)" } + begin + sock = TCPSocket.new(host,port.to_i) + rescue Errno::ECONNREFUSED + logger.warn("TCP Connection refused to #{host}:#{port}") + sock = nil + end end @socket = sock Thread.new(sock) { |sock| write_loop(sock) } if sock diff --git a/spec/server_helper.rb b/spec/server_helper.rb index e4a255e..5f95fb1 100644 --- a/spec/server_helper.rb +++ b/spec/server_helper.rb @@ -4,15 +4,18 @@ module ZooKeeperServerHelper include Slf4r::Logger + JRUBY_COMPAT_SYSTEM = (RUBY_PLATFORM == "java" && Gem::Version.new(JRUBY_VERSION.dup) < Gem::Version.new("1.6.5")) + + def jruby_safe_system(arg) + arg = "#{arg} &" if JRUBY_COMPAT_SYSTEM + system(arg) + sleep(3) if JRUBY_COMPAT_SYSTEM + end + def restart_cluster(delay=0) - system("../../bin/zkServer.sh stop >> zk.out") + jruby_safe_system("../../bin/zkServer.sh stop >> zk.out") Kernel::sleep(delay) if delay > 0 - if (::RUBY_PLATFORM == "java" && Gem::Version.new(JRUBY_VERSION) < Gem::Version.new("1.7.0")) - #in JRuby 1.6.3 system does not return - system("../../bin/zkServer.sh start >> zk.out &") - else - system("../../bin/zkServer.sh start >> zk.out") - end + jruby_safe_system("../../bin/zkServer.sh start >> zk.out") end def get_addresses()