Skip to content

Commit

Permalink
Merge pull request #29 from mdeland/stream_parallel
Browse files Browse the repository at this point in the history
Parallel apply
  • Loading branch information
dkrasner committed Apr 3, 2014
2 parents dedd207 + 3385c8e commit dcdd19c
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 64 deletions.
70 changes: 69 additions & 1 deletion rosetta/parallel/parallel_easy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
and a more effective way of handling Ctrl-C exit (we add a timeout).
"""
import itertools
from multiprocessing import Pool, cpu_count
from multiprocessing import cpu_count, Pool, Process, Manager, Lock
from multiprocessing.pool import IMapUnorderedIterator, IMapIterator
import cPickle
import sys
Expand All @@ -25,6 +25,74 @@
# Functions
###############################################################################

def _do_work_off_queue(lock, in_q, func, out_q, sep):
while True:
x = in_q.get()

if x is None:
out_q.put(x)
return

result = func(x)
out_q.put(str(result) + sep)


def _write_to_output(out_q, stream, n_jobs):
ends_seen = 0
while True:
x = out_q.get()
if not x:
ends_seen += 1
if ends_seen == n_jobs:
stream.flush()
return
else:
continue
stream.write(x)


def parallel_apply(func, iterable, n_jobs, sep='\n', out_stream=sys.stdout):
"""
Writes the result of applying func to iterable using n_jobs to out_stream
"""
# if there is only one job, simply read from iterable, apply function
# and write to outpu
if n_jobs == 1:
for each in iterable:
out_stream.write(str(func(each)) + sep)
out_stream.flush()
return

# if there is more than one job, use a queue manager to communicate
# between processes.
manager = Manager()
in_q = manager.Queue(maxsize=2 * n_jobs)
out_q = manager.Queue(maxsize=2 * n_jobs)
lock = Lock()

# start pool workers
pool = []
for i in xrange(n_jobs):
p = Process(target=_do_work_off_queue,
args=(lock, in_q, func, out_q, sep))
p.start()
pool.append(p)

# start output worker
out_p = Process(target=_write_to_output,
args=(out_q, out_stream, n_jobs))
out_p.start()

# put data on input queue
iters = itertools.chain(iterable, (None,) * n_jobs)
for each in iters:
in_q.put(each)

# finish job
for p in pool:
p.join()
out_p.join()


def imap_easy(func, iterable, n_jobs, chunksize, ordered=True):
"""
Expand Down
120 changes: 120 additions & 0 deletions rosetta/parallel/threading_easy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Functions to assist in multithreaded processing with Python 2.7.
"""
import sys
import threading


class LockIterateApply(threading.Thread):
"""
Wraps an iterable into a "locked" iterable for threading, applies
function and write to out_stream.
"""
def __init__(self, func, iterable, lock, sep='\n', out_stream=sys.stdout):
"""
Parameters
----------
func : function of one variable
iterable : iterable, yields func arg
lock : threading.Lock()
sep : str
for writing to out_stream
out_stream : open, buff, standard stream
must have a .write() method
"""
self.lock = lock
self.func = func
self.out_stream = out_stream
self.myiter = iterable
self.sep = sep
threading.Thread.__init__(self)

def run(self):
t = True
while t:
t = self.read_apply()

def read_apply(self):
"""
locks iterable.next() and applies self.transform
"""
try:
self.lock.acquire()
x = self.myiter.next()
self.lock.release()
except StopIteration:
self.lock.release()
return False
y = self.transform(x)
self.output(y)
return True

def transform(self, x):
return self.func(x)

def output(self, y):
"""
Writes to out_stream.
"""
self.out_stream.write(str(y) + self.sep)


def threading_easy(func, iterable, n_threads, sep='\n', out_stream=sys.stdout):
"""
Wraps the python threading library; takes an iterator, a function which
acts on each element the iterator yields and starts up the perscribed
number of threads. The output of each thread process is pass to an
out_stream.
Parameters
----------
func : function of one variable
iterable : iterable which yields function argument
n_threads : int
sep : string
for concatenating results to write
out_stream : open file, buffer, standard stream
must have a .write() method
Returns
-------
writes to out_stream
Examples
--------
Function of one variable:
>>> from time import sleep
>>> import rosetta.parallel.threading_easy as te
>>> my_iter = (x for x in range(10))
>>> def double(n):
sleep(1)
return 2*x
>>> te.threading_easy(my_iter, double, n_threads=10)
Function of more than one variable:
>>> from functools import partial
>>> def double2(n, t):
sleep(t)
return 2*n
>>> double = partial(double2, t=1)
>>> te.threading_easy(my_iter, double, n_threads=10)
Notes: in order to support the default sys.stdout out_stream, all results
are converted to string before writing.
"""
if n_threads is None or n_threads <= 1:
for each in iterable:
out_stream.write(('%s'+sep)%func(each))
else:
lock = threading.Lock()
threads = []
for i in range(n_threads):
t = LockIterateApply(func, iterable, lock, sep, out_stream)
threads.append(t)

for t in threads:
t.start()

for t in threads:
t.join()
58 changes: 58 additions & 0 deletions rosetta/tests/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import pandas as pd
from pandas.util.testing import assert_frame_equal, assert_series_equal
import numpy as np
import threading
from StringIO import StringIO

from rosetta.parallel import parallel_easy, pandas_easy
from rosetta.parallel.threading_easy import threading_easy, LockIterateApply


# A couple functions for testing parallel easy
Expand Down Expand Up @@ -179,3 +182,58 @@ def test_groupby_to_series_to_frame_2(self):
result = pandas_easy.groupby_to_series_to_frame(
df, frame_to_series, 1, use_apply=False, by=labels)
assert_frame_equal(result, benchmark)


class TestLockIterateApply(unittest.TestCase):
"""
Test the Locked Iterator Class
"""
def setUp(self):
self.data = ['my', 'name', 'is', 'daniel']
self.num_threads = 4

def bytwo(x):
return 2 * x

self.func = bytwo

def it():
for i in self.data:
yield i

self.myiter = it()

def test_locked_iterator(self):
threads = []
lock = threading.Lock()
out = StringIO()
for i in range(self.num_threads):
t = LockIterateApply(self.func, self.myiter, lock, ',', out)
threads.append(t)

for t in threads:
t.start()

for t in threads:
t.join()

benchmark = set(['mymy', 'namename', 'danieldaniel', 'isis', ''])
results = set(out.getvalue().split(','))
self.assertEqual(results, benchmark)

def test_threading_easy(self):
out = StringIO()
threading_easy(self.func, self.myiter, self.num_threads, ',', out)

benchmark = set(['mymy', 'namename', 'danieldaniel', 'isis', ''])
results = set(out.getvalue().split(','))
self.assertEqual(results, benchmark)

def test_threading_easy_single(self):
out = StringIO()
threading_easy(self.func, self.myiter, 1, ',', out)

benchmark = set(['mymy', 'namename', 'danieldaniel', 'isis', ''])
results = set(out.getvalue().split(','))
self.assertEqual(results, benchmark)

Loading

0 comments on commit dcdd19c

Please sign in to comment.