Skip to content

Commit

Permalink
Fixes #2 around multi-platform socket code
Browse files Browse the repository at this point in the history
As per suggestions from @makuk66
Yet to confirm this works on MacOSX or with IPv6
Also enable non-blocking connect for JRuby 1.6.7+
  • Loading branch information
lwoggardner committed Dec 15, 2012
1 parent 69e33c2 commit a75fcb7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 25 deletions.
68 changes: 50 additions & 18 deletions lib/zkruby/rubyio.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,78 @@
# All interaction with the session is synchronized
#
# Client synchronous code is implemented with a condition variable that waits on the callback/errback
#

# JRuby does not define Errno::NOERROR
unless defined? Errno::NOERROR
class Errno::NOERROR < SystemCallError
Errno = 0
end

end

module ZooKeeper::RubyIO

class Connection
include ZooKeeper::Protocol
include Slf4r::Logger
include Socket::Constants

HAS_NONBLOCKING_CONNECT = RUBY_PLATFORM == "java" && Gem::Version.new(JRUBY_VERSION.dup) >= Gem::Version.new("1.6.7")
SOL_TCP = IPPROTO_TCP unless defined? ::Socket::SOL_TCP

def initialize(host,port,timeout,session)
@session = session
@write_queue = Queue.new()

# JRuby cannot do non-blocking connects, which means there is
# no way to properly implement the connection-timeout
# See http://jira.codehaus.org/browse/JRUBY-5165
# In any case this should be encapsulated in TCPSocket.open(host,port,timeout)
if RUBY_PLATFORM == "java"
begin
sock = TCPSocket.new(host,port.to_i)
rescue Errno::ECONNREFUSED
logger.warn("TCP Connection refused to #{host}:#{port}")
sock = nil
end
else
if HAS_NONBLOCKING_CONNECT
addr = Socket.getaddrinfo(host, nil)
sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
sock.setsockopt(SOL_SOCKET, SO_LINGER, [0,-1].pack("ii"))
sock.setsockopt(SOL_TCP, TCP_NODELAY,[0].pack("i_"))
begin
sock.setsockopt(SOL_TCP, TCP_NODELAY,[0].pack("i_"))
rescue
# JRuby defines SOL_TCP, but it doesn't work
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY,[0].pack("i_"))
end

sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
resp = IO.select(nil, [sock], nil, timeout)
begin
sock.connect_nonblock(sockaddr)
rescue Errno::ECONNREFUSED
logger.warn("Connection refused to #{ host }:#{ port }")
read,write,errors = IO.select(nil, [sock], nil, timeout)
rescue Exception => ex
#JRuby raises Connection Refused instead of populating error array
logger.warn { "Exception #{ex.inspect} from non blocking select" }
end
optval = sock.getsockopt(Socket::SOL_SOCKET,Socket::SO_ERROR)
sockerr = (optval.unpack "i")[0]
if sockerr == Errno::NOERROR::Errno
#Woohoo! we're connected (lots of example code here will call
#connect_nonblock again to demonstrate EISCONN but I don't think
#this is strictly necessary
else
if sockerr == Errno::ECONNREFUSED::Errno
logger.warn("Connection refused to #{ host }:#{ port }")
else
logger.warn("Connection to #{ host }:#{ port } failed: #{sockerr}")
end
sock = nil
rescue Errno::EISCONN
end
end
else
# JRuby prior to 1.6.7 cannot do non-blocking connects, which means there is
# no way to properly implement the connection-timeout
# See http://jira.codehaus.org/browse/JRUBY-5165
# In any case this should be encapsulated in TCPSocket.open(host,port,timeout)
logger.warn { "Using blocking connect (JRuby < 1.6.7)" }
begin
sock = TCPSocket.new(host,port.to_i)
rescue Errno::ECONNREFUSED
logger.warn("TCP Connection refused to #{host}:#{port}")
sock = nil
end
end
@socket = sock
Thread.new(sock) { |sock| write_loop(sock) } if sock
Expand Down
17 changes: 10 additions & 7 deletions spec/server_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ module ZooKeeperServerHelper

include Slf4r::Logger

JRUBY_COMPAT_SYSTEM = (RUBY_PLATFORM == "java" && Gem::Version.new(JRUBY_VERSION.dup) < Gem::Version.new("1.6.5"))

def jruby_safe_system(arg)
arg = "#{arg} &" if JRUBY_COMPAT_SYSTEM
system(arg)
sleep(3) if JRUBY_COMPAT_SYSTEM
end

def restart_cluster(delay=0)
system("../../bin/zkServer.sh stop >> zk.out")
jruby_safe_system("../../bin/zkServer.sh stop >> zk.out")
Kernel::sleep(delay) if delay > 0
if (::RUBY_PLATFORM == "java" && Gem::Version.new(JRUBY_VERSION) < Gem::Version.new("1.7.0"))
#in JRuby 1.6.3 system does not return
system("../../bin/zkServer.sh start >> zk.out &")
else
system("../../bin/zkServer.sh start >> zk.out")
end
jruby_safe_system("../../bin/zkServer.sh start >> zk.out")
end

def get_addresses()
Expand Down

0 comments on commit a75fcb7

Please sign in to comment.