diff --git a/abcpy/backends/mpi.py b/abcpy/backends/mpi.py index 479d1443..f93f6e03 100644 --- a/abcpy/backends/mpi.py +++ b/abcpy/backends/mpi.py @@ -1,5 +1,6 @@ import numpy as np import cloudpickle +import time import pickle from mpi4py import MPI @@ -17,8 +18,19 @@ class BackendMPIMaster(Backend): OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7] finalized = False - def __init__(self, master_node_ranks=[0]): - + def __init__(self, master_node_ranks=[0],chunk_size=1): + """ + Parameters + ---------- + master_node_ranks: Python list + list of ranks computation should not happen on. + Should include the master so it doesn't get + overwhelmed with work. + + chunk_size: Integer + size of one block of data to be sent to free + executors + """ self.comm = MPI.COMM_WORLD self.size = self.comm.Get_size() self.rank = self.comm.Get_rank() @@ -31,10 +43,17 @@ def __init__(self, master_node_ranks=[0]): #Initialize a BDS store for both master & slave. self.bds_store = {} + self.pds_store = {} + + #Initialize a store for the pds data that + #.. hasn't been sent to the workers yet + self.pds_pending_store = {} + + self.chunk_size = chunk_size def __command_slaves(self, command, data): - """ + """Tell slaves to enter relevant execution block This method handles the sending of the command to the slaves telling them what operation to perform next. @@ -55,7 +74,8 @@ def __command_slaves(self, command, data): elif command == self.OP_MAP: #In map we receive data as (pds_id,pds_id_new,func) #Use cloudpickle to dump the function into a string. - function_packed = cloudpickle.dumps(data[2], pickle.HIGHEST_PROTOCOL) + # function_packed = self.__sanitize_and_pack_func() + function_packed = cloudpickle.dumps(data[2],pickle.HIGHEST_PROTOCOL) data_packet = (command, data[0], data[1], function_packed) elif command == self.OP_BROADCAST: @@ -75,6 +95,7 @@ def __command_slaves(self, command, data): _ = self.comm.bcast(data_packet, root=0) + def __generate_new_pds_id(self): """ This method generates a new pds_id to associate a PDS with it's remote counterpart @@ -130,21 +151,61 @@ def parallelize(self, python_list): pds_id = self.__generate_new_pds_id() self.__command_slaves(self.OP_PARALLELIZE, (pds_id,)) - #Initialize empty data lists for the processes on the master node - rdd_masters = [[] for i in range(len(self.master_node_ranks))] - - #Split the data only amongst the number of workers - rdd_slaves = np.array_split(python_list, self.size - len(self.master_node_ranks), axis=0) + #Don't send any data. Just keep it as a queue we're going to pop. + self.pds_store[pds_id] = list(python_list) - #Combine the lists into the final rdd before we split it across all ranks. - rdd = rdd_masters + rdd_slaves - data_chunk = self.comm.scatter(rdd, root=0) - - pds = PDSMPI(data_chunk, pds_id, self) + pds = PDSMPI([], pds_id, self) return pds + def orchestrate_map(self,pds_id): + """Orchestrates the slaves/workers to perform a map function + + This works by keeping track of the workers who haven't finished executing, + waiting for them to request the next chunk of data when they are free, + responding to them with the data and then sending them a Sentinel + signalling that they can exit. + """ + is_map_done = [True if i in self.master_node_ranks else False for i in range(self.size)] + status = MPI.Status() + + #Copy it to the pending. This is so when master accesses + #the PDS data it's not empty. + self.pds_pending_store[pds_id] = list(self.pds_store[pds_id]) + + #While we have some ranks that haven't finished + while sum(is_map_done)