diff --git a/rosetta/parallel/parallel_easy.py b/rosetta/parallel/parallel_easy.py index fe4e29f..4bd9267 100644 --- a/rosetta/parallel/parallel_easy.py +++ b/rosetta/parallel/parallel_easy.py @@ -8,7 +8,7 @@ and a more effective way of handling Ctrl-C exit (we add a timeout). """ import itertools -from multiprocessing import Pool, cpu_count +from multiprocessing import cpu_count, Pool, Process, Manager, Lock from multiprocessing.pool import IMapUnorderedIterator, IMapIterator import cPickle import sys @@ -25,6 +25,74 @@ # Functions ############################################################################### +def _do_work_off_queue(lock, in_q, func, out_q, sep): + while True: + x = in_q.get() + + if x is None: + out_q.put(x) + return + + result = func(x) + out_q.put(str(result) + sep) + + +def _write_to_output(out_q, stream, n_jobs): + ends_seen = 0 + while True: + x = out_q.get() + if not x: + ends_seen += 1 + if ends_seen == n_jobs: + stream.flush() + return + else: + continue + stream.write(x) + + +def parallel_apply(func, iterable, n_jobs, sep='\n', out_stream=sys.stdout): + """ + Writes the result of applying func to iterable using n_jobs to out_stream + """ + # if there is only one job, simply read from iterable, apply function + # and write to outpu + if n_jobs == 1: + for each in iterable: + out_stream.write(str(func(each)) + sep) + out_stream.flush() + return + + # if there is more than one job, use a queue manager to communicate + # between processes. + manager = Manager() + in_q = manager.Queue(maxsize=2 * n_jobs) + out_q = manager.Queue(maxsize=2 * n_jobs) + lock = Lock() + + # start pool workers + pool = [] + for i in xrange(n_jobs): + p = Process(target=_do_work_off_queue, + args=(lock, in_q, func, out_q, sep)) + p.start() + pool.append(p) + + # start output worker + out_p = Process(target=_write_to_output, + args=(out_q, out_stream, n_jobs)) + out_p.start() + + # put data on input queue + iters = itertools.chain(iterable, (None,) * n_jobs) + for each in iters: + in_q.put(each) + + # finish job + for p in pool: + p.join() + out_p.join() + def imap_easy(func, iterable, n_jobs, chunksize, ordered=True): """ diff --git a/rosetta/parallel/threading_easy.py b/rosetta/parallel/threading_easy.py new file mode 100644 index 0000000..b3e3d89 --- /dev/null +++ b/rosetta/parallel/threading_easy.py @@ -0,0 +1,120 @@ +""" +Functions to assist in multithreaded processing with Python 2.7. +""" +import sys +import threading + + +class LockIterateApply(threading.Thread): + """ + Wraps an iterable into a "locked" iterable for threading, applies + function and write to out_stream. + """ + def __init__(self, func, iterable, lock, sep='\n', out_stream=sys.stdout): + """ + Parameters + ---------- + func : function of one variable + iterable : iterable, yields func arg + lock : threading.Lock() + sep : str + for writing to out_stream + out_stream : open, buff, standard stream + must have a .write() method + """ + self.lock = lock + self.func = func + self.out_stream = out_stream + self.myiter = iterable + self.sep = sep + threading.Thread.__init__(self) + + def run(self): + t = True + while t: + t = self.read_apply() + + def read_apply(self): + """ + locks iterable.next() and applies self.transform + """ + try: + self.lock.acquire() + x = self.myiter.next() + self.lock.release() + except StopIteration: + self.lock.release() + return False + y = self.transform(x) + self.output(y) + return True + + def transform(self, x): + return self.func(x) + + def output(self, y): + """ + Writes to out_stream. + """ + self.out_stream.write(str(y) + self.sep) + + +def threading_easy(func, iterable, n_threads, sep='\n', out_stream=sys.stdout): + """ + Wraps the python threading library; takes an iterator, a function which + acts on each element the iterator yields and starts up the perscribed + number of threads. The output of each thread process is pass to an + out_stream. + + Parameters + ---------- + func : function of one variable + iterable : iterable which yields function argument + n_threads : int + sep : string + for concatenating results to write + out_stream : open file, buffer, standard stream + must have a .write() method + + Returns + ------- + writes to out_stream + + Examples + -------- + Function of one variable: + >>> from time import sleep + >>> import rosetta.parallel.threading_easy as te + >>> my_iter = (x for x in range(10)) + >>> def double(n): + sleep(1) + return 2*x + >>> te.threading_easy(my_iter, double, n_threads=10) + + Function of more than one variable: + >>> from functools import partial + >>> def double2(n, t): + sleep(t) + return 2*n + >>> double = partial(double2, t=1) + >>> te.threading_easy(my_iter, double, n_threads=10) + + Notes: in order to support the default sys.stdout out_stream, all results + are converted to string before writing. + + """ + if n_threads is None or n_threads <= 1: + for each in iterable: + out_stream.write(('%s'+sep)%func(each)) + else: + lock = threading.Lock() + threads = [] + for i in range(n_threads): + t = LockIterateApply(func, iterable, lock, sep, out_stream) + threads.append(t) + + for t in threads: + t.start() + + for t in threads: + t.join() diff --git a/rosetta/tests/test_parallel.py b/rosetta/tests/test_parallel.py index 0579e12..4affd0e 100644 --- a/rosetta/tests/test_parallel.py +++ b/rosetta/tests/test_parallel.py @@ -4,8 +4,11 @@ import pandas as pd from pandas.util.testing import assert_frame_equal, assert_series_equal import numpy as np +import threading +from StringIO import StringIO from rosetta.parallel import parallel_easy, pandas_easy +from rosetta.parallel.threading_easy import threading_easy, LockIterateApply # A couple functions for testing parallel easy @@ -179,3 +182,58 @@ def test_groupby_to_series_to_frame_2(self): result = pandas_easy.groupby_to_series_to_frame( df, frame_to_series, 1, use_apply=False, by=labels) assert_frame_equal(result, benchmark) + + +class TestLockIterateApply(unittest.TestCase): + """ + Test the Locked Iterator Class + """ + def setUp(self): + self.data = ['my', 'name', 'is', 'daniel'] + self.num_threads = 4 + + def bytwo(x): + return 2 * x + + self.func = bytwo + + def it(): + for i in self.data: + yield i + + self.myiter = it() + + def test_locked_iterator(self): + threads = [] + lock = threading.Lock() + out = StringIO() + for i in range(self.num_threads): + t = LockIterateApply(self.func, self.myiter, lock, ',', out) + threads.append(t) + + for t in threads: + t.start() + + for t in threads: + t.join() + + benchmark = set(['mymy', 'namename', 'danieldaniel', 'isis', '']) + results = set(out.getvalue().split(',')) + self.assertEqual(results, benchmark) + + def test_threading_easy(self): + out = StringIO() + threading_easy(self.func, self.myiter, self.num_threads, ',', out) + + benchmark = set(['mymy', 'namename', 'danieldaniel', 'isis', '']) + results = set(out.getvalue().split(',')) + self.assertEqual(results, benchmark) + + def test_threading_easy_single(self): + out = StringIO() + threading_easy(self.func, self.myiter, 1, ',', out) + + benchmark = set(['mymy', 'namename', 'danieldaniel', 'isis', '']) + results = set(out.getvalue().split(',')) + self.assertEqual(results, benchmark) + diff --git a/rosetta/tests/test_streamer.py b/rosetta/tests/test_streamer.py index 22cd076..12f18a2 100644 --- a/rosetta/tests/test_streamer.py +++ b/rosetta/tests/test_streamer.py @@ -32,7 +32,7 @@ def test_info_stream(self): token_benchmark = [['doomed', 'failure'], ['set', 'success']] text_benchmark = ['doomed to failure\n', 'set for success\n'] - + token_result = [] text_result = [] for each in stream.info_stream(): @@ -54,7 +54,7 @@ def test_token_stream(self): self.assertEqual(token_benchmark, token_result) self.assertEqual(id_benchmark, stream.__dict__['doc_id_cache']) - + def test_to_vw(self): stream = TextFileStreamer(path_list = [self.doc1, self.doc2], tokenizer=self.tokenizer) @@ -63,7 +63,7 @@ def test_to_vw(self): benchmark = " 1 doc1| failure:1 doomed:1\n 1 doc2| set:1 success:1\n" self.assertEqual(benchmark, result.getvalue()) - + def test_to_scipyspare(self): stream = TextFileStreamer(path_list = [self.doc1, self.doc2], tokenizer=self.tokenizer) @@ -73,7 +73,7 @@ def test_to_scipyspare(self): compare = result.toarray() == benchmark.toarray() self.assertTrue(compare.all()) - + def tearDown(self): os.remove(self.doc1) os.remove(self.doc2) @@ -84,6 +84,8 @@ def setUp(self): self.text_iter = [{'text': 'doomed to failure', 'doc_id': 'a'}, {'text': 'set for success', 'doc_id': '1'}] self.tokenizer = TokenizerBasic() + self.test_path = os.path.abspath('./rosetta/tests') + self.temp_vw_path = os.path.join(self.test_path, 'temp', 'test.vw') def test_info_stream(self): stream = TextIterStreamer(text_iter=self.text_iter, @@ -99,7 +101,7 @@ def test_info_stream(self): self.assertEqual(token_benchmark, token_result) self.assertEqual(text_benchmark, text_result) - + def test_token_stream(self): stream = TextIterStreamer(text_iter=self.text_iter, tokenizer=self.tokenizer) @@ -117,18 +119,17 @@ def test_token_stream(self): def test_to_scipyspare(self): stream = TextFileStreamer(path_list = [self.doc1, self.doc2], tokenizer=self.tokenizer) - + result = stream.to_scipysparse() benchmark = sparse.csr_matrix([[1, 1, 0, 0], [0, 0, 1, 1]]) - + def test_to_vw(self): stream = TextIterStreamer(text_iter=self.text_iter, tokenizer=self.tokenizer) - result = StringIO() - stream.to_vw(result) - + stream.to_vw(open(self.temp_vw_path, 'w')) + result = open(self.temp_vw_path).read() benchmark = " 1 a| failure:1 doomed:1\n 1 1| set:1 success:1\n" - self.assertEqual(benchmark, result.getvalue()) + self.assertEqual(benchmark, result) def test_to_scipyspare(self): stream = TextIterStreamer(text_iter=self.text_iter, @@ -140,11 +141,16 @@ def test_to_scipyspare(self): compare = result.toarray() == benchmark.toarray() self.assertTrue(compare.all()) + def tearDown(self): + os.remove(self.temp_vw_path) if ( + os.path.exists(self.temp_vw_path)) else None class TestMySQLStreamer(unittest.TestCase): def setUp(self): self.query_result = [{'text': 'doomed to failure', 'doc_id': 'a'}, {'text': 'set for success', 'doc_id': '1'}] + self.test_path = os.path.abspath('./rosetta/tests') + self.temp_vw_path = os.path.join(self.test_path, 'temp', 'test.vw') class MockCursor(object): def __init__(self, my_iter): @@ -199,11 +205,11 @@ def test_to_vw(self): stream = MySQLStreamer(self.db_setup, tokenizer=self.tokenizer) stream.cursor = self.mock_cursor - result = StringIO() - stream.to_vw(result, cache_list=['doc_id']) + stream.to_vw(open(self.temp_vw_path, 'w')) + result = open(self.temp_vw_path).read() benchmark = " 1 a| failure:1 doomed:1\n 1 1| set:1 success:1\n" - self.assertEqual(benchmark, result.getvalue()) + self.assertEqual(benchmark, result) def test_to_scipyspare(self): stream = MySQLStreamer(self.db_setup, @@ -216,6 +222,9 @@ def test_to_scipyspare(self): compare = result.toarray() == benchmark.toarray() self.assertTrue(compare.all()) + def tearDown(self): + os.remove(self.temp_vw_path) if ( + os.path.exists(self.temp_vw_path)) else None class TestMongoStreamer(unittest.TestCase): def setUp(self): @@ -243,6 +252,8 @@ def execute(self): self.db_setup['text_key'] = 'text' self.db_setup['translations'] = {'_id': 'doc_id'} self.tokenizer = TokenizerBasic() + self.test_path = os.path.abspath('./rosetta/tests') + self.temp_vw_path = os.path.join(self.test_path, 'temp', 'test.vw') def test_info_stream(self): stream = MongoStreamer(self.db_setup, @@ -278,11 +289,11 @@ def test_to_vw(self): stream = MongoStreamer(self.db_setup, tokenizer=self.tokenizer) stream.cursor = self.mock_cursor - result = StringIO() - stream.to_vw(result, cache_list=['doc_id']) + stream.to_vw(open(self.temp_vw_path, 'w')) + result = open(self.temp_vw_path).read() benchmark = " 1 a| failure:1 doomed:1\n 1 1| set:1 success:1\n" - self.assertEqual(benchmark, result.getvalue()) + self.assertEqual(benchmark, result) def test_to_scipyspare(self): stream = MongoStreamer(self.db_setup, @@ -294,3 +305,7 @@ def test_to_scipyspare(self): compare = result.toarray() == benchmark.toarray() self.assertTrue(compare.all()) + + def tearDown(self): + os.remove(self.temp_vw_path) if ( + os.path.exists(self.temp_vw_path)) else None diff --git a/rosetta/text/streamers.py b/rosetta/text/streamers.py index eb1eee5..e9e38e1 100644 --- a/rosetta/text/streamers.py +++ b/rosetta/text/streamers.py @@ -13,7 +13,7 @@ import MySQLdb.cursors import pymongo -from rosetta.parallel.parallel_easy import imap_easy +from rosetta.parallel.parallel_easy import imap_easy, parallel_apply from .. import common from ..common import lazyprop, smart_open, DocIDError @@ -35,6 +35,10 @@ def info_stream(self, **kwargs): """ return + @abc.abstractmethod + def record_stream(self, **kwargs): + return + def single_stream(self, item, cache_list=None, **kwargs): """ Stream a single item from source. @@ -80,43 +84,36 @@ def token_stream(self, cache_list=None, **kwargs): """ return self.single_stream('tokens', cache_list=cache_list, **kwargs) - def to_vw(self, outfile, n_jobs=-1, chunksize=1000, raise_on_bad_id=True, - cache_list=None, cache_list_file=None): + def to_vw(self, out_stream=sys.stdout, n_jobs=1, raise_on_bad_id=True, + cache_list=None): """ Write our filestream to a VW (Vowpal Wabbit) formatted file. Parameters ---------- - outfile : filepath or buffer + out_stream : stream, buffer or open file object n_jobs : Integer - Use n_jobs different jobs to do the processing. Set = 4 for 4 - jobs. Set = -1 to use all available, -2 for all except 1,... - chunksize : Integer - Workers process this many jobs at once before pickling and sending - results to master. If this is too low, communication overhead - will dominate. If this is too high, jobs will not be distributed - evenly. - cache_list : List of strings - Write these info_stream items to file on every iteration. - cache_list_file : filepath or buffer - """ + number of CPU jobs + cache_list : List of strings. + Cache these items as they appear + E.g. self.token_stream('doc_id', 'tokens') caches + info['doc_id'] and info['tokens'] (assuming both are available). + Notes: + ----- + self.info_stream must have a 'doc_id' field, as this is used to index + the lines in the vw file. + """ + assert self.tokenizer, 'tokenizer must be defined to use .to_vw()' + cache_list = [] if cache_list is None else cache_list + # Initialize the cached items as attributes + for cache_item in cache_list: + self.__dict__[cache_item + '_cache'] = [] formatter = text_processors.VWFormatter() - func = partial(_to_sstr, formatter=formatter, - raise_on_bad_id=raise_on_bad_id, + func = partial(_to_sstr, tokenizer=self.tokenizer, formatter=formatter, + raise_on_bad_id=raise_on_bad_id, streamer=self, cache_list=cache_list) - results_iterator = imap_easy(func, - self.info_stream(), - n_jobs, chunksize) - if cache_list_file: - with smart_open(outfile, 'w') as open_outfile, \ - smart_open(cache_list_file, 'w') as open_cache_file: - for result, cache_list in results_iterator: - open_outfile.write(result + '\n') - open_cache_file.write(str(cache_list) + '\n') - else: - with smart_open(outfile, 'w') as open_outfile: - for result, cache_list in results_iterator: - open_outfile.write(result + '\n') + parallel_apply(func, self.record_stream(), n_jobs, + sep='\n', out_stream=out_stream) def to_scipysparse(self, cache_list=None, **kwargs): """ @@ -237,6 +234,20 @@ def _sfile_stream(self, doc_id=None): continue yield record_dict + def record_stream(self, doc_id=None): + """ + Returns an iterator over record dicts. + + Parameters + ---------- + doc_id : Iterable over Strings + Return record dicts iff doc_id in doc_id + """ + source = self.source(doc_id=doc_id) + + for record_dict in source: + yield record_dict + def info_stream(self, doc_id=None): """ Returns an iterator over info dicts. @@ -246,10 +257,9 @@ def info_stream(self, doc_id=None): doc_id : Iterable over Strings Return info dicts iff doc_id in doc_id """ - source = self.source(doc_id=doc_id) # Read record_dict and convert to info by adding tokens - for record_dict in source: + for record_dict in self.record_stream(doc_id): record_dict['tokens'] = self.formatter._dict_to_tokens(record_dict) yield record_dict @@ -353,6 +363,9 @@ def _file_stat(self, path): return {'mtime': os.path.getmtime(path), 'atime': os.path.getatime(path), 'size': os.path.getsize(path)} + def record_stream(self, paths=None, doc_id=None, limit=None): + return self.info_stream(paths, doc_id, limit) + def info_stream(self, paths=None, doc_id=None, limit=None): """ Returns an iterator over paths yielding dictionaries with information @@ -461,11 +474,18 @@ def __init__(self, text_iter, tokenizer=None, tokenizer_func=None): if tokenizer_func: self.tokenizer = text_processors.MakeTokenizer(tokenizer_func) + def record_stream(self): + """ + Yields a dict from self.streamer + """ + for info in self.text_iter: + yield info + def info_stream(self): """ Yields a dict from self.streamer as well as "tokens". """ - for info in self.text_iter: + for info in self.record_stream(): info['tokens'] = self.tokenizer.text_to_token_list(info['text']) yield info @@ -528,11 +548,15 @@ def iterate_over_query(self): """ return - def info_stream(self, **kwargs): + def record_stream(self): + for info in self.iterate_over_query(): + yield info + + def info_stream(self): """ Yields a dict from self.executing the query as well as "tokens". """ - for info in self.iterate_over_query(): + for info in self.record_stream(): info['tokens'] = self.tokenizer.text_to_token_list(info['text']) yield info @@ -718,22 +742,24 @@ def _group_to_sstr(streamer, formatter, raise_on_bad_id, path_group): return group_results -def _to_sstr(info_dict, formatter, raise_on_bad_id, cache_list): +def _to_sstr(record_dict, tokenizer, formatter, raise_on_bad_id, + streamer, cache_list=None): """ - Yield a list of sstr's (sparse string representations) coming from 'tokens' - in streamer.info_stream(). - If cache_list is passed, yeilds a tuple tok_sstr, cache_dict where the latter - is a subdict of info_dict. + Yield a list of sstr's (sparse string representations); takes yield + instances of STREAMER.record_stream(), tokenizes the text, get 'doc_id' + and returns sstr. """ - doc_id = str(info_dict['doc_id']) - tokens = info_dict['tokens'] + cache_list = [] if cache_list is None else cache_list + + doc_id = str(record_dict['doc_id']) + tokens = tokenizer.text_to_token_list(record_dict['text']) feature_values = Counter(tokens) - cache_dict=None - if cache_list: - cache_dict = dict(zip(cache_list,[info_dict[key] for key in cache_list])) try: tok_sstr = formatter.get_sstr( feature_values, importance=1, doc_id=doc_id) + for cache_item in cache_list: + streamer.__dict__[cache_item + '_cache'].append( + record_dict[cache_item]) except DocIDError as e: msg = e.message + "\doc_id = %s\n" % info_dict['doc_id'] if raise_on_bad_id: @@ -741,4 +767,4 @@ def _to_sstr(info_dict, formatter, raise_on_bad_id, cache_list): else: msg = "WARNING: " + msg sys.stderr.write(msg) - return tok_sstr, cache_dict + return tok_sstr