Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to chain distributables together #274

Merged
merged 5 commits into from
Apr 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dipla/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,13 @@ class Promise:
def __init__(self, promise_uid):
self.task_uid = promise_uid

def distribute(self, function, *args):
return Dipla.apply_distributable(
function, *([self] + [x for x in args]))

def get(self):
return Dipla.get(self)


class UnsupportedInput(Exception):
"""
Expand Down
4 changes: 1 addition & 3 deletions examples/breadth_first_search_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ def bfs(grid, index, count):

# Apply a distributable function to the stream of values from
# the above source
bfs_result = Dipla.apply_distributable(bfs, [[grid]])

out = Dipla.get(bfs_result)
out = Dipla.apply_distributable(bfs, [[grid]]).get()

u = []
for y in out:
Expand Down
4 changes: 1 addition & 3 deletions examples/explorer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ def add_ten(input_value, discovered):
discovered.append(8)
return input_value + 10

added_ten = Dipla.apply_distributable(add_ten, [1, 2, 3])

# The inputs of add_ten are 1, 2, 3. Since there is a 1 here, it should
# discover another input 8. This means the output should add ten to all
# four inputs, outputting 11, 12, 13, 18

out = Dipla.get(added_ten)
out = Dipla.apply_distributable(add_ten, [1, 2, 3]).get()

for o in out:
print(o)
5 changes: 1 addition & 4 deletions examples/hello_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ def get_factors(n):

# Apply a distributable function to the stream of values from
# the above source
factor_results = Dipla.apply_distributable(get_factors,
data_source)

out = Dipla.get(factor_results)
out = data_source.distribute(get_factors).get()

for o in out:
print(o)
2 changes: 1 addition & 1 deletion examples/invert_matrix_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def cofactors_matrix(matrix, index, count):
print("Matrix is not invertible!")
sys.exit()

cofactors = Dipla.get(Dipla.apply_distributable(cofactors_matrix, [[matrix]]))
cofactors = Dipla.apply_distributable(cofactors_matrix, [[matrix]]).get()

transposed = transpose([x for x in cofactors if len(x) > 0])

Expand Down
4 changes: 1 addition & 3 deletions examples/scoped_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ def get_evens(input_value, index, count):

# Apply a distributable function to the stream of values from
# the above source
evens_results = Dipla.apply_distributable(get_evens, [(1, 12)])

out = Dipla.get(evens_results)
out = Dipla.apply_distributable(get_evens, [(1, 12)]).get()

for o in out:
print(o)
35 changes: 35 additions & 0 deletions tests/api/dipla_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,40 @@ def __eq__(self, other):
self.mock_task_queue.push_task.assert_called_with(
TaskWithDiscoveredSignal())

def test_apply_distributable_twice_on_same_task(self):
@Dipla.data_source
def read(input_value):
return input_value + " "

@Dipla.distributable()
def func(input_value):
return input_value

input_data = ["1", "2", "3", "4", "5"]

promise = Dipla.read_data_source(func, input_data)
result = promise.distribute(func)
self.assertIsNotNone(result.task_uid)

def test_apply_distributable_with_dependent_tasks(self):
@Dipla.data_source
def func1(input_value):
return input_value*2

@Dipla.data_source
def func2(input_value):
return input_value//2

@Dipla.distributable()
def add(a, b):
return a+b

inputs1 = [1, 2, 3, 4, 5]
inputs2 = [6, 7, 8, 9, 10]
result1 = Dipla.read_data_source(func1, inputs1)
result2 = Dipla.read_data_source(func2, inputs2)
promise = result1.distribute(add, result2)
self.assertIsNotNone(promise.task_uid)

def tearDown(self):
Dipla._task_creators = dict()