From 7704d17d73655f40b56712cac81da1ec4afeb005 Mon Sep 17 00:00:00 2001 From: Michael McGrew Date: Thu, 31 Dec 2015 00:08:25 +0000 Subject: [PATCH 1/4] IPv6 support --- laikad.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/laikad.py b/laikad.py index bbdd57c..a25c0f5 100644 --- a/laikad.py +++ b/laikad.py @@ -273,12 +273,14 @@ def run(self): # Connection for workers backend = context.socket(zmq.ROUTER) + backend.setsockopt(zmq.IPV4ONLY, 0) backend.bind(self.broker_backend_address) backend_poller = zmq.Poller() backend_poller.register(backend, zmq.POLLIN) # Connection for clients frontend = context.socket(zmq.ROUTER) + frontend.setsockopt(zmq.IPV4ONLY, 0) frontend.bind(self.broker_frontend_address) frontend_poller = zmq.Poller() frontend_poller.register(frontend, zmq.POLLIN) From 140e392ee377f86aeb100f8eedc78e0022245718 Mon Sep 17 00:00:00 2001 From: Michael McGrew Date: Tue, 5 Jan 2016 19:45:44 +0000 Subject: [PATCH 2/4] Make v6 optional --- laikad.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/laikad.py b/laikad.py index a25c0f5..fc6d33f 100644 --- a/laikad.py +++ b/laikad.py @@ -129,12 +129,13 @@ class AsyncBroker(Process): returned back to the client. ''' - def __init__(self, broker_backend_address, broker_frontend_address): + def __init__(self, broker_backend_address, broker_frontend_address, use_ipv6): '''Main constructor''' super(AsyncBroker, self).__init__() self.broker_backend_address = broker_backend_address self.broker_frontend_address = broker_frontend_address self.keep_running = True + self.use_ipv6 = use_ipv6 def shutdown(self): '''Shutdown method to be called by the signal handler''' @@ -156,12 +157,24 @@ def run(self): # Connection for workers backend = context.socket(zmq.ROUTER) + if self.use_ipv6: + try: + backend.setsockopt(zmq.IPV6, 1) + except AttributeError: + # Older version that does not have ZMQ_IPV6 + backend.setsockopt(zmq.IPV4ONLY, 0) backend.bind(self.broker_backend_address) backend_poller = zmq.Poller() backend_poller.register(backend, zmq.POLLIN) # Connection for clients frontend = context.socket(zmq.PULL) + if self.use_ipv6: + try: + frontend.setsockopt(zmq.IPV6, 1) + except AttributeError: + # Older version that does not have ZMQ_IPV6 + frontend.setsockopt(zmq.IPV4ONLY, 0) frontend.bind(self.broker_frontend_address) frontend_poller = zmq.Poller() frontend_poller.register(frontend, zmq.POLLIN) @@ -247,13 +260,14 @@ class SyncBroker(Process): ''' def __init__(self, broker_backend_address, broker_frontend_address, - shutdown_grace_timeout=SHUTDOWN_GRACE_TIMEOUT_DEFAULT): + use_ipv6, shutdown_grace_timeout=SHUTDOWN_GRACE_TIMEOUT_DEFAULT): '''Main constructor''' super(SyncBroker, self).__init__() self.broker_backend_address = broker_backend_address self.broker_frontend_address = broker_frontend_address self.shutdown_grace_timeout = shutdown_grace_timeout self.keep_running = True + self.use_ipv6 = use_ipv6 def shutdown(self): '''Shutdown method to be called by the signal handler''' @@ -273,14 +287,24 @@ def run(self): # Connection for workers backend = context.socket(zmq.ROUTER) - backend.setsockopt(zmq.IPV4ONLY, 0) + if self.use_ipv6: + try: + backend.setsockopt(zmq.IPV6, 1) + except AttributeError: + # Older version that does not have ZMQ_IPV6 + backend.setsockopt(zmq.IPV4ONLY, 0) backend.bind(self.broker_backend_address) backend_poller = zmq.Poller() backend_poller.register(backend, zmq.POLLIN) # Connection for clients frontend = context.socket(zmq.ROUTER) - frontend.setsockopt(zmq.IPV4ONLY, 0) + if self.use_ipv6: + try: + frontend.setsockopt(zmq.IPV6, 1) + except AttributeError: + # Older version that does not have ZMQ_IPV6 + frontend.setsockopt(zmq.IPV4ONLY, 0) frontend.bind(self.broker_frontend_address) frontend_poller = zmq.Poller() frontend_poller.register(frontend, zmq.POLLIN) @@ -793,6 +817,10 @@ def main(): dest="gracetimeout", help="when shutting down, the timeout to allow workers to" " finish ongoing scans before being killed") + parser.add_option("-6", "--ipv6", + action="store_true", default=False, + dest="use_ipv6", + help="Enables listening on IPv6.") (options, _) = parser.parse_args() # Set the configuration file path @@ -903,9 +931,16 @@ def shutdown(signum, frame): broker_proc = None if not options.no_broker: if async: - broker_proc = AsyncBroker(broker_backend_address, broker_frontend_address) + broker_proc = AsyncBroker( + broker_backend_address, + broker_frontend_address, + options.use_ipv6) else: - broker_proc = SyncBroker(broker_backend_address, broker_frontend_address, gracetimeout) + broker_proc = SyncBroker( + broker_backend_address, + broker_frontend_address, + options.use_ipv6, + gracetimeout) broker_proc.start() # Start the workers @@ -920,9 +955,15 @@ def shutdown(signum, frame): # Ensure we have a broker if not options.no_broker and not broker_proc.is_alive(): if async: - broker_proc = AsyncBroker(broker_backend_address, broker_frontend_address) + broker_proc = AsyncBroker( + broker_backend_address, + broker_frontend_address, + options.use_ipv6) else: - broker_proc = SyncBroker(broker_backend_address, broker_frontend_address, + broker_proc = SyncBroker( + broker_backend_address, + broker_frontend_address, + options.use_ipv6, gracetimeout) broker_proc.start() From 832fd9a37c3bf1a5f2d1e3f2f036300d3ac46430 Mon Sep 17 00:00:00 2001 From: Michael McGrew Date: Tue, 5 Jan 2016 20:02:58 +0000 Subject: [PATCH 3/4] Change handling of IPV4ONLY vs IPV6' --- laikad.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/laikad.py b/laikad.py index fc6d33f..06b7c93 100644 --- a/laikad.py +++ b/laikad.py @@ -158,11 +158,12 @@ def run(self): # Connection for workers backend = context.socket(zmq.ROUTER) if self.use_ipv6: - try: + if hasattr(zmq, 'IPV6'): backend.setsockopt(zmq.IPV6, 1) - except AttributeError: - # Older version that does not have ZMQ_IPV6 + elif hasattr(zmq, 'IPV4ONLY'): backend.setsockopt(zmq.IPV4ONLY, 0) + else: + logging.error("This version of ZMQ does not support IPv6") backend.bind(self.broker_backend_address) backend_poller = zmq.Poller() backend_poller.register(backend, zmq.POLLIN) @@ -170,11 +171,12 @@ def run(self): # Connection for clients frontend = context.socket(zmq.PULL) if self.use_ipv6: - try: + if hasattr(zmq, 'IPV6'): frontend.setsockopt(zmq.IPV6, 1) - except AttributeError: - # Older version that does not have ZMQ_IPV6 + elif hasattr(zmq, 'IPV4ONLY'): frontend.setsockopt(zmq.IPV4ONLY, 0) + else: + logging.error("This version of ZMQ does not support IPv6") frontend.bind(self.broker_frontend_address) frontend_poller = zmq.Poller() frontend_poller.register(frontend, zmq.POLLIN) @@ -288,11 +290,12 @@ def run(self): # Connection for workers backend = context.socket(zmq.ROUTER) if self.use_ipv6: - try: + if hasattr(zmq, 'IPV6'): backend.setsockopt(zmq.IPV6, 1) - except AttributeError: - # Older version that does not have ZMQ_IPV6 + elif hasattr(zmq, 'IPV4ONLY'): backend.setsockopt(zmq.IPV4ONLY, 0) + else: + logging.error("This version of ZMQ does not support IPv6") backend.bind(self.broker_backend_address) backend_poller = zmq.Poller() backend_poller.register(backend, zmq.POLLIN) @@ -300,11 +303,12 @@ def run(self): # Connection for clients frontend = context.socket(zmq.ROUTER) if self.use_ipv6: - try: + if hasattr(zmq, 'IPV6'): frontend.setsockopt(zmq.IPV6, 1) - except AttributeError: - # Older version that does not have ZMQ_IPV6 + elif hasattr(zmq, 'IPV4ONLY'): frontend.setsockopt(zmq.IPV4ONLY, 0) + else: + logging.error("This version of ZMQ does not support IPv6") frontend.bind(self.broker_frontend_address) frontend_poller = zmq.Poller() frontend_poller.register(frontend, zmq.POLLIN) From a7f4fb3653bebffcbe26f41afd732ed0083ac1cf Mon Sep 17 00:00:00 2001 From: Michael McGrew Date: Tue, 5 Jan 2016 20:15:05 +0000 Subject: [PATCH 4/4] enable v6 for worker --- laikad.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/laikad.py b/laikad.py index 06b7c93..98f23d6 100644 --- a/laikad.py +++ b/laikad.py @@ -469,6 +469,7 @@ class Worker(Process): ''' def __init__(self, config_location, broker_address, max_scan_items, ttl, + use_ipv6, logresult=False, poll_timeout=300, shutdown_grace_timeout=SHUTDOWN_GRACE_TIMEOUT_DEFAULT): @@ -486,6 +487,7 @@ def __init__(self, config_location, broker_address, max_scan_items, ttl, self.broker_poller = zmq.Poller() self.poll_timeout = poll_timeout * 1000 # Poller uses milliseconds self.logresult = logresult + self.use_ipv6 = use_ipv6 def perform_scan(self, poll_timeout): ''' @@ -648,6 +650,13 @@ def run(self): context = zmq.Context(1) self.broker = context.socket(zmq.DEALER) self.broker.setsockopt(zmq.IDENTITY, self.identity) + if self.use_ipv6: + if hasattr(zmq, 'IPV6'): + self.broker.setsockopt(zmq.IPV6, 1) + elif hasattr(zmq, 'IPV4ONLY'): + self.broker.setsockopt(zmq.IPV4ONLY, 0) + else: + logging.error("This version of ZMQ does not support IPv6") self.broker.connect(self.broker_address) self.broker_poller.register(self.broker, zmq.POLLIN) @@ -950,8 +959,15 @@ def shutdown(signum, frame): # Start the workers workers = [] for _ in range(num_procs): - worker_proc = Worker(laikaboss_config_path, worker_connect_address, ttl, - time_ttl, logresult, int(get_option('workerpolltimeout')), gracetimeout) + worker_proc = Worker( + laikaboss_config_path, + worker_connect_address, + ttl, + time_ttl, + options.use_ipv6, + logresult, + int(get_option('workerpolltimeout')), + gracetimeout) worker_proc.start() workers.append(worker_proc) @@ -979,8 +995,15 @@ def shutdown(signum, frame): for worker_proc in dead_workers: workers.remove(worker_proc) - new_proc = Worker(laikaboss_config_path, worker_connect_address, ttl, time_ttl, - logresult, int(get_option('workerpolltimeout')), gracetimeout) + new_proc = Worker( + laikaboss_config_path, + worker_connect_address, + ttl, + time_ttl, + options.use_ipv6, + logresult, + int(get_option('workerpolltimeout')), + gracetimeout) new_proc.start() workers.append(new_proc) worker_proc.join()