From c5be7f69374d32c7db330366242b413439c181e2 Mon Sep 17 00:00:00 2001 From: Avinash Ummadisingu Date: Wed, 18 Oct 2017 17:00:18 +0200 Subject: [PATCH 1/3] Removed logging and minor bugfix + perf --- abcpy/backends/mpi.py | 155 +++++++++++++++++++++++++++--------------- 1 file changed, 100 insertions(+), 55 deletions(-) diff --git a/abcpy/backends/mpi.py b/abcpy/backends/mpi.py index 52d04acc..605334c9 100644 --- a/abcpy/backends/mpi.py +++ b/abcpy/backends/mpi.py @@ -1,5 +1,7 @@ import numpy as np import cloudpickle +import time +import pickle from mpi4py import MPI from abcpy.backends import Backend, PDS, BDS @@ -16,8 +18,9 @@ class BackendMPIMaster(Backend): OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7] finalized = False - def __init__(self, master_node_ranks=[0]): + def __init__(self, master_node_ranks=[0],chunk_size=1): + print("Using dirty version from ummavi's repo for MPI based dynamic scheduling") self.comm = MPI.COMM_WORLD self.size = self.comm.Get_size() self.rank = self.comm.Get_rank() @@ -30,6 +33,15 @@ def __init__(self, master_node_ranks=[0]): #Initialize a BDS store for both master & slave. self.bds_store = {} + self.pds_store = {} + self.pds_pending_store = {} + + self.chunk_size = chunk_size + + # try: + # os.mkdir("logs") + # except Exception as e: + # print("folder logs/ already exists") def __command_slaves(self, command, data): @@ -95,7 +107,7 @@ def __sanitize_and_pack_func(self, func): #Set the backend to None to prevent it from being packed globals()['backend'] = {} - function_packed = cloudpickle.dumps(func) + function_packed = cloudpickle.dumps(func,pickle.HIGHEST_PROTOCOL) #Reset the backend to self after it's been packed globals()['backend'] = self @@ -158,21 +170,53 @@ def parallelize(self, python_list): pds_id = self.__generate_new_pds_id() self.__command_slaves(self.OP_PARALLELIZE, (pds_id,)) - #Initialize empty data lists for the processes on the master node - rdd_masters = [[] for i in range(len(self.master_node_ranks))] - - #Split the data only amongst the number of workers - rdd_slaves = np.array_split(python_list, self.size - len(self.master_node_ranks), axis=0) - - #Combine the lists into the final rdd before we split it across all ranks. - rdd = rdd_masters + rdd_slaves + #Don't send any data. Just keep it as a queue we're going to pop. + self.pds_store[pds_id] = list(python_list) - data_chunk = self.comm.scatter(rdd, root=0) - pds = PDSMPI(data_chunk, pds_id, self) + pds = PDSMPI([], pds_id, self) return pds + def orchestrate_map(self,pds_id): + """Orchestrates the slaves to perform a map function + """ + is_map_done = [True if i in self.master_node_ranks else False for i in range(self.size)] + status = MPI.Status() + + #Copy it to the pending. + self.pds_pending_store[pds_id] = list(self.pds_store[pds_id]) + + #While we have some ranks that haven't finished + while sum(is_map_done)Data_chunks",data_chunks) + + if data_chunks is None: + break + + for chunk in data_chunks: + data_index,data_item = chunk + rdd+=[(data_index,func(data_item))] pds_res = PDSMPI(rdd, pds_id_new, self) + # self.log_fd.write("MAP_START "+str(map_start)+"\nMAP_END "+str(time.time())+"\n") + return pds_res From 21f2133aae4b9427e6d9ac492bd8f03efbe80587 Mon Sep 17 00:00:00 2001 From: Avinash Ummadisingu Date: Thu, 19 Oct 2017 01:16:32 +0200 Subject: [PATCH 2/3] Added logging for benchmarking --- abcpy/backends/mpi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/abcpy/backends/mpi.py b/abcpy/backends/mpi.py index 605334c9..077d940f 100644 --- a/abcpy/backends/mpi.py +++ b/abcpy/backends/mpi.py @@ -379,7 +379,7 @@ def __init__(self): #Initialize a BDS store for both master & slave. self.bds_store = {} - # self.log_fd = open("logs/slave_"+str(self.rank),"w") + self.log_fd = open("logs/slave_"+str(self.rank),"w") #Go into an infinite loop waiting for commands from the user. self.slave_run() @@ -516,7 +516,7 @@ def map(self, func): pds_res = PDSMPI(rdd, pds_id_new, self) - # self.log_fd.write("MAP_START "+str(map_start)+"\nMAP_END "+str(time.time())+"\n") + self.log_fd.write("MAP_START "+str(map_start)+"\nMAP_END "+str(time.time())+"\n") return pds_res From 6a7a0c4c903334d63a04883b0b313a70bd593eb6 Mon Sep 17 00:00:00 2001 From: Avinash Ummadisingu Date: Tue, 7 Nov 2017 09:37:57 +0100 Subject: [PATCH 3/3] dyn-mpi cleanup and documentation --- abcpy/backends/mpi.py | 101 ++++++++++++++++-------------------------- 1 file changed, 39 insertions(+), 62 deletions(-) diff --git a/abcpy/backends/mpi.py b/abcpy/backends/mpi.py index 077d940f..f93f6e03 100644 --- a/abcpy/backends/mpi.py +++ b/abcpy/backends/mpi.py @@ -19,8 +19,18 @@ class BackendMPIMaster(Backend): finalized = False def __init__(self, master_node_ranks=[0],chunk_size=1): - - print("Using dirty version from ummavi's repo for MPI based dynamic scheduling") + """ + Parameters + ---------- + master_node_ranks: Python list + list of ranks computation should not happen on. + Should include the master so it doesn't get + overwhelmed with work. + + chunk_size: Integer + size of one block of data to be sent to free + executors + """ self.comm = MPI.COMM_WORLD self.size = self.comm.Get_size() self.rank = self.comm.Get_rank() @@ -34,18 +44,16 @@ def __init__(self, master_node_ranks=[0],chunk_size=1): #Initialize a BDS store for both master & slave. self.bds_store = {} self.pds_store = {} + + #Initialize a store for the pds data that + #.. hasn't been sent to the workers yet self.pds_pending_store = {} self.chunk_size = chunk_size - # try: - # os.mkdir("logs") - # except Exception as e: - # print("folder logs/ already exists") - def __command_slaves(self, command, data): - """ + """Tell slaves to enter relevant execution block This method handles the sending of the command to the slaves telling them what operation to perform next. @@ -66,7 +74,8 @@ def __command_slaves(self, command, data): elif command == self.OP_MAP: #In map we receive data as (pds_id,pds_id_new,func) #Use cloudpickle to dump the function into a string. - function_packed = self.__sanitize_and_pack_func(data[2]) + # function_packed = self.__sanitize_and_pack_func() + function_packed = cloudpickle.dumps(data[2],pickle.HIGHEST_PROTOCOL) data_packet = (command, data[0], data[1], function_packed) elif command == self.OP_BROADCAST: @@ -86,34 +95,6 @@ def __command_slaves(self, command, data): _ = self.comm.bcast(data_packet, root=0) - def __sanitize_and_pack_func(self, func): - """ - Prevents the function from packing the backend by temporarily - setting it to another variable and then uses cloudpickle - to pack it into a string to be sent. - - Parameters - ---------- - func: Python Function - The function we are supposed to pack while sending it along to the slaves - during the map function - - Returns - ------- - Returns a string of the function packed by cloudpickle - - """ - - #Set the backend to None to prevent it from being packed - globals()['backend'] = {} - - function_packed = cloudpickle.dumps(func,pickle.HIGHEST_PROTOCOL) - - #Reset the backend to self after it's been packed - globals()['backend'] = self - - return function_packed - def __generate_new_pds_id(self): """ @@ -179,16 +160,23 @@ def parallelize(self, python_list): return pds def orchestrate_map(self,pds_id): - """Orchestrates the slaves to perform a map function + """Orchestrates the slaves/workers to perform a map function + + This works by keeping track of the workers who haven't finished executing, + waiting for them to request the next chunk of data when they are free, + responding to them with the data and then sending them a Sentinel + signalling that they can exit. """ is_map_done = [True if i in self.master_node_ranks else False for i in range(self.size)] status = MPI.Status() - #Copy it to the pending. + #Copy it to the pending. This is so when master accesses + #the PDS data it's not empty. self.pds_pending_store[pds_id] = list(self.pds_store[pds_id]) #While we have some ranks that haven't finished while sum(is_map_done)Data_chunks",data_chunks) + #If it receives a sentinel, it's done and it can exit if data_chunks is None: break + #Accumulate the indicess and *processed* chunks for chunk in data_chunks: data_index,data_item = chunk rdd+=[(data_index,func(data_item))] pds_res = PDSMPI(rdd, pds_id_new, self) - self.log_fd.write("MAP_START "+str(map_start)+"\nMAP_END "+str(time.time())+"\n") - return pds_res @@ -580,6 +556,7 @@ def __init__(self, master_node_ranks=[0]): raise Exception("Slaves exitted main loop.") + class PDSMPI(PDS): """ This is an MPI wrapper for a Python parallel data set.