Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow VW() instances to connect to existing VW daemons #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions test/test_wabbit_wappa.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,12 @@ def test_training():
vw.close()
vw2.close()
os.remove(filename)


def test_daemons():
# Launch one VW in daemon_mode
vw = VW(loss_function='logistic', daemon_mode=True, port=30000)
# Connect to that one without launching another
vw2 = VW(daemon_ip='127.0.0.1', port=30000)


21 changes: 16 additions & 5 deletions wabbit_wappa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ def __str__(self):

class VW():
"""Wrapper for VW executable, handling online input and outputs."""
def __init__(self, command=None, active_mode=False, dummy_mode=False, **kwargs):
def __init__(self, command=None, active_mode=False, dummy_mode=False, daemon_mode=False,
daemon_ip=None, **kwargs):
"""'command' is the full command-line necessary to run VW. E.g.
vw --loss_function logistic -p /dev/stdout --quiet
-p /dev/stdout --quiet is mandatory for compatibility,
Expand All @@ -220,32 +221,42 @@ def __init__(self, command=None, active_mode=False, dummy_mode=False, **kwargs):
a simulated subprocess.
dummy_mode: Don't actually start any VW process. (Used for assembling
VW command lines separately.)
daemon_mode: (Forced to True if active_mode is set). Communicate with
VW as a server instead of as a subprocess.
NOTE: This is much faster in tests, and will become default in
a future version of WW.
daemon_ip: If given, attach to an already-running VW daemon, ignoring
all other command-related arguments (other than `port`).

If no command is given, any additional keyword arguments are passed to
make_command_line() and the resulting command is used. (This provides
sensible defaults.)
"""
daemon_mode = daemon_mode or active_mode
if command is None:
if active_mode:
active_settings = active_learner.get_active_default_settings()
# Overwrite active settings with kwargs
active_settings.update(kwargs)
kwargs = active_settings
port = kwargs.get('port')
command = make_command_line(**kwargs)
port = kwargs.get('port')
self.active_mode = active_mode
self.dummy_mode = dummy_mode
self.daemon_mode = daemon_mode
if dummy_mode:
self.vw_process = None
else:
if active_mode:
self.vw_process = active_learner.ActiveVWProcess(command, port=port)
if active_mode or daemon_ip or daemon_mode:
self.vw_process = active_learner.DaemonVWProcess(command,
port=port,
ip=daemon_ip)
else:
self.vw_process = pexpect.spawn(command)
# Turn off delaybeforesend; this is necessary only in non-applicable cases
self.vw_process.delaybeforesend = 0
self.vw_process.setecho(False)
logging.info("Started VW({})".format(command))
logging.info("Started VW({})".format(command))
self.command = command
self.namespaces = []
self._line = None
Expand Down
41 changes: 30 additions & 11 deletions wabbit_wappa/active_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""
Interface for VW's active learning mode, which must be communicated with
over a socked.
over a socket.

Derived in great part from
https://github.com/JohnLangford/vowpal_wabbit/blob/master/utl/active_interactor.py
Expand All @@ -13,11 +13,13 @@

import socket
import time
import logging

import pexpect


DEFAULT_PORT = 26542
DEFAULT_IP = '127.0.0.1'
CONNECTION_WAIT = 0.1 # Time between socket connection attempts
MAX_CONNECTION_ATTEMPTS = 50

Expand All @@ -30,30 +32,46 @@ def get_active_default_settings():
return result


class ActiveVWProcess():
"""Class for spawning and interacting with a WV process
in active learning mode. This class implements a subset of the interface
class DaemonVWProcess():
"""Class for spawning and/or interacting with a WV process
in daemon mode. This class implements a subset of the interface
of a pexpect.spawn() object so that it can be a drop-in replacement
for the VW.vw_process member.
"""

_buffer = b''

def __init__(self, command, port=DEFAULT_PORT):
def __init__(self, command=None, port=None, ip=None):
"""'command' is assumed to have the necessary options for use with this
class, which should be guaranteed in the calling context."""
# Launch the VW process, which we will communicate with only
# via its socket
self.vw_process = pexpect.spawn(command)
class (such as a consistent value for `port`),
which should be guaranteed in the calling context.

If 'command' is not given, or if 'ip' is given, assume that a
daemonized VW process has already been launched, and attach
to it with the given ip and port."""
if command and not ip:
# Launch the VW process, which we will communicate with only
# via its socket
self.vw_process = pexpect.spawn(command)
logging.info("Started VW({})".format(command))
else:
self.vw_process = None
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection_tries = 0
while connection_tries < MAX_CONNECTION_ATTEMPTS:
try:
self.sock.connect(('127.0.0.1', port))
if not ip:
ip = DEFAULT_IP
if not port:
port = DEFAULT_PORT
self.sock.connect((ip, port))
logging.info("Connected to VW daemon ({}:{})".format(ip, port))
break # Quit this loop once successful
except socket.error:
connection_tries += 1
time.sleep(CONNECTION_WAIT)
else:
raise
self.before = None

def sendline(self, line):
Expand Down Expand Up @@ -89,6 +107,7 @@ def expect_exact(self, *args, **kwargs):

def close(self):
self.sock.close()
self.vw_process.close()
if self.vw_process:
self.vw_process.close()