Skip to content

Commit

Permalink
Merge pull request #284 from CPSSD/n/reduce#266
Browse files Browse the repository at this point in the history
Add reduce functionality and example
  • Loading branch information
iandioch authored Apr 21, 2017
2 parents 3f1bd8c + d9cfb92 commit c4feaab
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 9 deletions.
78 changes: 72 additions & 6 deletions dipla/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Dipla:
# signal registered under the function. Check the comments for
# Dipla_task_creators for details on how the id is determined.
_task_input_script_info = dict()
_reduce_task_group_sizes = dict()

_use_control_webpage = False

Expand Down Expand Up @@ -85,15 +86,19 @@ def _read_by_consuming(collection, current_location):
def _any_data_available(collection, current_location):
return len(collection) - current_location > 0

def _create_clientside_task(task_instructions):
def _create_clientside_task(task_instructions,
is_reduce=False,
reduce_group_size=2):
task_uid = uid_generator.generate_uid(
length=8,
existing_uids=Dipla.task_queue.get_task_ids())
return Task(
task_uid,
task_instructions,
MachineType.client,
complete_check=Dipla.complete_when_unavailable)
complete_check=Dipla.complete_when_unavailable,
is_reduce=is_reduce,
reduce_group_size=reduce_group_size)

def _generate_uid_in_list(uids_list):
new_uid = uid_generator.generate_uid(length=8, existing_uids=uids_list)
Expand All @@ -120,12 +125,17 @@ def _add_sources_to_task(sources, task, create_data_source_function):
task.add_data_source(
create_data_source_function(source, data_source_creator))

def _create_normal_task(sources, task_instructions):
def _create_normal_task(sources,
task_instructions,
is_reduce=False,
reduce_group_size=2):
"""
sources are objects that can be used to create a data source,
e.g. an iterable or another task
"""
task = Dipla._create_clientside_task(task_instructions)
task = Dipla._create_clientside_task(task_instructions,
is_reduce,
reduce_group_size)

source_uids = []

Expand Down Expand Up @@ -156,6 +166,35 @@ def distributable_decorator(function):

return distributable_decorator

@staticmethod
def reduce_distributable(n=2):
"""Takes a reduce function and converts it to a binary. The binary is
then registered with the BinaryManager.
A reduce function is one which takes a number of inputs, and returns a
single value. This value is then added back to the list of inputs for
the function. This process is repeated until there is only one value
left, which is given as the result.
The reduce function given here should expect a single parameter, which
will be the list of values to reduce. You can provide a parameter `n`
to this decorator; this denotes the maximum number of inputs that will
be given to the reduce function at a time. It defaults to 2, which is
the standard case for a canonical reduce function. Raising this number
may increase performance in some cases."""

if n <= 1:
s = "Input size for a reduce function must be greater than 1"
raise ReduceBadSize(s)

def distributable_decorator(function):
Dipla._process_decorated_function(function, None)
Dipla._task_creators[id(function)] = Dipla._create_normal_task
Dipla._reduce_task_group_sizes[id(function)] = n
return function

return distributable_decorator

@staticmethod
def scoped_distributable(count, verifier=None):
"""
Expand Down Expand Up @@ -337,6 +376,12 @@ def apply_distributable(function, *raw_args):
if id(function) not in Dipla._task_creators:
raise KeyError("Provided function was not decorated using Dipla")

is_reduce = id(function) in Dipla._reduce_task_group_sizes

if is_reduce and len(raw_args) != 1:
s = "Incorrect number of arguments given for reduce distributable"
raise KeyError(s)

args = []
for arg in raw_args:
if isinstance(arg, Promise):
Expand All @@ -346,7 +391,15 @@ def apply_distributable(function, *raw_args):
else:
raise UnsupportedInput()
function_id = id(function)
tasks = Dipla._task_creators[function_id](args, function.__name__)
tasks = None
if is_reduce:
size = Dipla._reduce_task_group_sizes[function_id]
tasks = Dipla._task_creators[function_id](args,
function.__name__,
is_reduce=True,
reduce_group_size=size)
else:
tasks = Dipla._task_creators[function_id](args, function.__name__)
for task in tasks:
task.signals = {
'TERMINATE': lambda server, uid, _: server.terminate_task(uid)
Expand Down Expand Up @@ -425,7 +478,12 @@ def get(promise, run_on_server=False):
else:
server.start(password=Dipla._password)

return get_task.task_output
if Dipla.task_queue.get_task(promise.task_uid).is_reduce:
# The task that has been requested is a reduce task,
# so we only care about the very last value returned.
return get_task.task_output[-1]
else:
return get_task.task_output

@staticmethod
def set_password(password):
Expand Down Expand Up @@ -592,6 +650,14 @@ class DiscoveryBadRequest(RuntimeError):
pass


class ReduceBadSize(RuntimeError):
"""
An exception that is raised when a reduce distributable is given an
invalid input size
"""
pass


# Remember that the function's __name__ is the task name in apply_distributable
# task_name = function.__name__

Expand Down
60 changes: 57 additions & 3 deletions dipla/server/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ def push_task(self, item):
"""
if item.uid is None:
raise AttributeError("Added task to TaskQueue with no id")
self._nodes[item.uid] = TaskQueueNode(item)
if item.is_reduce:
group_size = item.reduce_group_size
self._nodes[item.uid] = ReduceTaskQueueNode(item, group_size)
else:
self._nodes[item.uid] = TaskQueueNode(item)

# Add this task as a dependant of all its prerequisite tasks
active = True
Expand Down Expand Up @@ -155,6 +159,16 @@ def add_result(self, task_id, result):
"Attempted to add result for a task not present in the queue")

self._nodes[task_id].task_item.add_result(result)
if self._nodes[task_id].task_item.is_reduce:
# This task has been marked as a reduce task, so outputs should be
# put back into the same task as an input.

# self.push_task_input() expects a series of groups of inputs,
# (one group of inputs is the things a task needs to run once)
# so we must turn this single value into that format
args = [[result]]
self.push_task_input(task_id, args)

if self.is_task_open(task_id):
self.activate_new_tasks(self._nodes[task_id].dependees)
# Check if the task is now completed
Expand Down Expand Up @@ -256,7 +270,7 @@ def next_input(self):
self.task_item.instructions,
self.task_item.machine_type,
arguments,
signals=[x for x in self.task_item.signals])
signals=list(self.task_item.signals))

def has_next_input(self):
if len(self.dependencies) == 0:
Expand All @@ -273,6 +287,42 @@ def is_machine_type(self, machine_type):
return machine_type == self.task_item.machine_type


class ReduceTaskQueueNode(TaskQueueNode):

def __init__(self, task_item, reduce_group_size):
super().__init__(task_item)
self.reduce_group_size = reduce_group_size

def next_input(self):
if not self.dependencies[0].data_streamer.has_available_data():
raise DataStreamerEmpty(
"Attempted to read input from an empty source")
arguments = []

for _ in range(self.reduce_group_size):
dependency = self.dependencies[0]
if not dependency.data_streamer.has_available_data():
break
argument_id = dependency.source_uid
arg = dependency.data_streamer.read()

arguments.append(arg[0])

# if we are not adding things one by one
if len(arguments) >= self.reduce_group_size:
break
arguments = [[arguments]]

# Not very pretty, but expect a result for every element in the args
self.task_item.inc_expected_results_by(len(arguments[0]))
return TaskInput(
self.task_item.uid,
self.task_item.instructions,
self.task_item.machine_type,
arguments,
signals=list(self.task_item.signals))


class DataStreamerEmpty(Exception):
"""
An exception raised when an attempt is made to read a data streamer,
Expand Down Expand Up @@ -463,7 +513,9 @@ def __init__(
machine_type,
open_check=lambda x: True,
complete_check=lambda x: False,
signals={}):
signals={},
is_reduce=False,
reduce_group_size=2):
"""
Initalises the Task
Expand Down Expand Up @@ -494,6 +546,8 @@ def __init__(
self.uid = uid
self.instructions = task_instructions
self.machine_type = machine_type
self.is_reduce = is_reduce
self.reduce_group_size = reduce_group_size
self.data_instructions = []

self.open_check = open_check
Expand Down
45 changes: 45 additions & 0 deletions examples/reduce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from os import path
import sys
# Append to path so that this script can access the dipla package
sys.path.append(path.abspath('../dipla'))

from dipla.api import Dipla

# This program finds the word with the most vowels from the opening of the Communist Manifesto

communist_manifesto = """
A spectre is haunting Europe — the spectre of communism. All the powers of old Europe have entered into a holy alliance to exorcise this spectre: Pope and Tsar, Metternich and Guizot, French Radicals and German police-spies.
Where is the party in opposition that has not been decried as communistic by its opponents in power? Where is the opposition that has not hurled back the branding reproach of communism, against the more advanced opposition parties, as well as against its reactionary adversaries?
Two things result from this fact:
I. Communism is already acknowledged by all European powers to be itself a power.
II. It is high time that Communists should openly, in the face of the whole world, publish their views, their aims, their tendencies, and meet this nursery tale of the Spectre of Communism with a manifesto of the party itself.
To this end, Communists of various nationalities have assembled in London and sketched the following manifesto, to be published in the English, French, German, Italian, Flemish and Danish languages.
"""

input_data = [word.lower() for word in communist_manifesto.split() if len(word) > 0]

@Dipla.reduce_distributable(n=6)
def find_most_vowels(words):
vowels = set(['a', 'e', 'i', 'o', 'u'])
best = ""
best_count = -1
for word in words:
n = 0
for c in word:
if c in vowels:
n += 1
if n > best_count:
best_count = n
best = word
return best

print("input data:", input_data)

out = Dipla.apply_distributable(find_most_vowels, input_data).get()

print('result:', out)

0 comments on commit c4feaab

Please sign in to comment.