diff --git a/doranet/modules/enzymatic/generate_network.py b/doranet/modules/enzymatic/generate_network.py index 9127b82..2fa6bbe 100644 --- a/doranet/modules/enzymatic/generate_network.py +++ b/doranet/modules/enzymatic/generate_network.py @@ -442,16 +442,12 @@ def generate_network( engine = dn.create_engine() network = engine.new_network() - for key in cofactors_dict: + for key, value in cofactors_dict.items(): if excluded_cofactors is None or key not in excluded_cofactors: # add cofactors to network, they're like helpers in chem expansion network.add_mol( - engine.mol.rdkit(cofactors_dict[key]), - meta={ - "SMILES": Chem.MolToSmiles( - Chem.MolFromSmiles(cofactors_dict[key]) - ) - }, + engine.mol.rdkit(value), + meta={"SMILES": Chem.MolToSmiles(Chem.MolFromSmiles(value))}, ) my_start_i = -1 diff --git a/doranet/modules/post_processing/post_processing.py b/doranet/modules/post_processing/post_processing.py index 8dd5e4e..b81e447 100644 --- a/doranet/modules/post_processing/post_processing.py +++ b/doranet/modules/post_processing/post_processing.py @@ -633,8 +633,9 @@ def pathway_finder( for pro in products: producers_dict[pro][rxn_idx] = products_weight[pro] / total_weight - for smiles in producers_dict: # number of producers, used to reduce forks - producers_dict_len[smiles] = len(producers_dict[smiles]) + # number of producers, used to reduce forks + for smiles, value in producers_dict.items(): + producers_dict_len[smiles] = len(value) distance_to_startrs_dict = dict() # min distance to starters for smiles in starters_set: @@ -930,6 +931,7 @@ def find_cyc( f_result.write( "place holder " + path["pathway_by-product"] + "\n" ) + f_result.write("place holder " + "empty" + "\n") f_result.write( "reaction SMILES stoichiometry " + str(path["stoi"]) @@ -1046,9 +1048,9 @@ def SMILES_rm_keku(_SMILES): # remove reaxys unsuported mols, idx * max_rxns_per_batch : ] - for key in output_dict: + for key, value in output_dict.items(): with open(key + ".txt", "w", encoding="utf-8") as f_result: - for query in output_dict[key]: + for query in value: f_result.write("SMILES='" + query + "'") f_result.write("\n") # Save a templet csv file, user can copy Reaxys query result over @@ -1235,6 +1237,36 @@ def __call__(self, recipe: interfaces.ReactionExplicit) -> bool: return False +@typing.final +@dataclasses.dataclass(frozen=True) +class Allowed_Elements_Filter(metadata.ReactionFilterBase): + # only allow reactions with specified elements in reactants. + # does not check hydrogen + def __call__(self, recipe: interfaces.ReactionExplicit) -> bool: + if ( + recipe.operator.meta is None + or recipe.operator.meta["allowed_elements"][0] == "All" + ): + return True + for mol in recipe.reactants: + if not isinstance(mol.item, interfaces.MolDatRDKit): + raise NotImplementedError( + f"""Filter only implemented for molecule type \ + MolDatRDKit, not {type(mol.item)}""" + ) + for atom in mol.item.rdkitmol.GetAtoms(): + if ( + atom.GetSymbol() + not in recipe.operator.meta["allowed_elements"] + ): + return False + return True + + @property + def meta_required(self) -> interfaces.MetaKeyPacket: + return interfaces.MetaKeyPacket(operator_keys={"allowed_elements"}) + + def Byproduct_index( path_idx, _input_list, molecule_thermo_calculator, max_rxn_thermo_change ): @@ -1262,6 +1294,7 @@ def Byproduct_index( "kekulize_flag": smarts.kekulize_flag, "Retro_Not_Aromatic": smarts.Retro_Not_Aromatic, "number_of_steps": smarts.number_of_steps, + "allowed_elements": smarts.allowed_elements, }, ) if smarts.kekulize_flag is True: @@ -1279,6 +1312,7 @@ def Byproduct_index( "kekulize_flag": smarts.kekulize_flag, "Retro_Not_Aromatic": smarts.Retro_Not_Aromatic, "number_of_steps": smarts.number_of_steps, + "allowed_elements": smarts.allowed_elements, }, ) @@ -1290,6 +1324,7 @@ def Byproduct_index( Ring_Issues_Filter() # >> H_calc # >> thermo_filter + >> Allowed_Elements_Filter() >> Chem_Rxn_dH_Calculator("dH", "forward", molecule_thermo_calculator) >> Rxn_dH_Filter(max_rxn_thermo_change, "dH") ) @@ -1335,6 +1370,7 @@ def Inter_byproduct( "kekulize_flag": smarts.kekulize_flag, "Retro_Not_Aromatic": smarts.Retro_Not_Aromatic, "number_of_steps": smarts.number_of_steps, + "allowed_elements": smarts.allowed_elements, }, ) if smarts.kekulize_flag is True: @@ -1352,6 +1388,7 @@ def Inter_byproduct( "kekulize_flag": smarts.kekulize_flag, "Retro_Not_Aromatic": smarts.Retro_Not_Aromatic, "number_of_steps": smarts.number_of_steps, + "allowed_elements": smarts.allowed_elements, }, ) @@ -1364,6 +1401,7 @@ def Inter_byproduct( >> Ring_Issues_Filter() # >> H_calc # >> thermo_filter + >> Allowed_Elements_Filter() >> Chem_Rxn_dH_Calculator("dH", "forward", molecule_thermo_calculator) >> Rxn_dH_Filter(max_rxn_thermo_change, "dH") ) @@ -1392,6 +1430,7 @@ def pathway_ranking( cool_reactions=None, molecule_thermo_calculator=None, # for by-product calculator max_rxn_thermo_change=15, + chemicals_prices_name=None, # csv file with SMILES and price by mole ): if not starters or not target: raise Exception("Starters and target are" " needed to rank pathways") @@ -1426,6 +1465,7 @@ def pathway_ranking( "salt_score": 0, "in_reaxys": 0, "coolness": 0, + "profit": 0, } if reaxys_result_name is None: reaxys_result_name = f"{job_name}_reaxys_batch_result.csv" @@ -1479,10 +1519,10 @@ def pathway_ranking( for idx, i in enumerate(clean_list): if "pathway number" in i: pathway_marker.append(idx) - + smiles_line_pos = 7 for idx, marker in enumerate(pathway_marker): temp_dict = dict() - temp_dict["stoi"] = eval(clean_list[marker + 4][30:]) # list of str + temp_dict["stoi"] = eval(clean_list[marker + 5][30:]) # list of str if idx + 1 < len(pathway_marker): # 1, 2, 3 next_marker = pathway_marker[idx + 1] @@ -1490,17 +1530,23 @@ def pathway_ranking( next_marker = len(clean_list) temp_dict["SMILES"] = clean_list[ - marker + 6 : marker + 6 + int((next_marker - (marker + 6)) / 3) + marker + smiles_line_pos : marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) ] temp_dict["name"] = clean_list[ - marker + 6 + int((next_marker - (marker + 6)) / 3) : marker - + 6 - + int((next_marker - (marker + 6)) / 3) * 2 + marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) : marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) * 2 ] temp_dict["enthalpy"] = clean_list[ - marker + 6 + int((next_marker - (marker + 6)) / 3) * 2 : marker - + 6 - + int((next_marker - (marker + 6)) / 3) * 3 + marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) * 2 : marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) * 3 ] pathways_list.append(temp_dict) @@ -1726,13 +1772,13 @@ def path_eco( if timecount == timeout: return None by_product_weight = 0 - for i in right_dict: + for i, value in right_dict.items(): if i in has_bio: by_product_weight += 0 else: by_product_weight += ( Descriptors.MolWt(rdkit.Chem.rdmolfiles.MolFromSmiles(i)) - * right_dict[i] + * value ) to_return = target_weight / (target_weight + by_product_weight) @@ -1786,6 +1832,253 @@ def path_eco( eco_score_list = [0] * len(data) eco_list = [0] * len(data) + # Profitability Index ############################### + # Profitability Index = Product price / Feedstock price + # Also referred to as the Return on Investment Ratio (ROI Ratio) + # A value greater than 1 indicates a profitable process + # Used as a preliminary metric when other cost is unknown + # modified from atom economy calculator, uses the prices of molecules + # instead of molecular weight + # for bio pathways ignore cofactors cost + # if a path has both chem and bio rxns, the result might be + # incorrect if a chem helper is also a bio cofactor + + profit_list = list() + profit_score_list = list() + + def path_profit( + _path, + has_bio, + prices_dict, + ): # assuming no circular loops; if a middle rxn produces a starter + # or intermediat consumed in upstream, it's considered recycled; + _path = list(_path) + left_dict = dict() # smiles: number of mol + right_dict = dict() + target_cost = prices_dict[target_smiles] + idx_remove = 0 + for idx, rxn in enumerate( + _path + ): # find rxn for final target, add reas in left, + # by-product in right, remove rxn + pros = rxn.split(">")[3].split(".") + reas = rxn.split(">")[0].split(".") + rea_stoi = eval(rxn.split(">")[2].split("$")[1]) + pro_stoi = eval(rxn.split(">")[2].split("$")[2]) + + if target_smiles in pros: + idx_remove = idx + # target_idx = pros.index(target_smiles) + target_stoi = 0 + for idx2, pro in enumerate(pros): + if pro == target_smiles: + target_stoi += pro_stoi[idx2] + for idx2, pro in enumerate(pros): + if pro != target_smiles: + if pro not in right_dict: + right_dict[pro] = pro_stoi[idx2] / target_stoi + else: + right_dict[pro] = ( + right_dict[pro] + pro_stoi[idx2] / target_stoi + ) + for idx2, rea in enumerate(reas): + if rea not in left_dict: + left_dict[rea] = rea_stoi[idx2] / target_stoi + else: + left_dict[rea] = ( + left_dict[rea] + rea_stoi[idx2] / target_stoi + ) + break + del _path[idx_remove] + + repeat_flag = True + # explored_rxns = set() + timeout = 100 # stop if encounter loops + timecount = 0 + while ( + repeat_flag is True and len(_path) > 0 + ): # repeat until left only contain starters; + # check if all parents are starters + timecount += 1 + popkey = 99 + for ( + i + ) in left_dict: # find idx for a intermediate molecule to produce + if i not in starters | set(has_bio): + popkey = i + break + # num_mol = left_dict.pop(popkey) + num_mol = left_dict[popkey] + for idx, rxn in enumerate(_path): + pros = rxn.split(">")[3].split(".") + reas = rxn.split(">")[0].split(".") + rea_stoi = eval(rxn.split(">")[2].split("$")[1]) + pro_stoi = eval(rxn.split(">")[2].split("$")[2]) + # if popkey in pros and rxn not in explored_rxns: + # find a rxn that product the molecule + if popkey in pros: + # explored_rxns.add(rxn) + # popmol_idx = pros.index(popkey) + popmol_stoi = 0 + for idx2, pro in enumerate(pros): + if pro == popkey: + popmol_stoi += pro_stoi[idx2] + for idx2, pro in enumerate(pros): + # if pro != popkey: # add all reactants to left + if pro not in right_dict: + right_dict[pro] = ( + num_mol / popmol_stoi * pro_stoi[idx2] + ) + else: + right_dict[pro] = ( + right_dict[pro] + + num_mol / popmol_stoi * pro_stoi[idx2] + ) + for idx2, rea in enumerate(reas): + if rea not in left_dict: + left_dict[rea] = ( + num_mol / popmol_stoi * rea_stoi[idx2] + ) + else: + left_dict[rea] = ( + left_dict[rea] + + num_mol / popmol_stoi * rea_stoi[idx2] + ) + + # move used rxn to end of list to avoid + # code loop if a loop in pathway + del _path[idx] + _path.append(rxn) + break + + key_list = list() + for key in left_dict: # balance left and right dict + if key not in starters | set(has_bio) and key in right_dict: + key_list.append(key) + for key in key_list: + if left_dict[key] == right_dict[key]: + # print(left_dict) + left_dict.pop(key) + # print(left_dict) + right_dict.pop(key) + elif left_dict[key] < right_dict[key]: + right_dict[key] = right_dict[key] - left_dict[key] + left_dict.pop(key) + else: + left_dict[key] = left_dict[key] - right_dict[key] + right_dict.pop(key) + + done_flag = True + for key in left_dict: + if key not in starters | set(has_bio): + # check if left only contain starters + done_flag = False + if done_flag is True or timecount == timeout: + repeat_flag = False + + if timecount == timeout: + return None + + feedstocks_cost = 0 + for i, value in left_dict.items(): + if i in has_bio: + feedstocks_cost += 0 + else: + feedstocks_cost += prices_dict[i] * value + + to_return = target_cost / feedstocks_cost + return to_return + + if weights["profit"] != 0: + print("Pathway profitability index ranking working") + + if chemicals_prices_name is None: + chemicals_prices_name = f"{job_name}_prices.csv" + try: + price_result = np.genfromtxt( + chemicals_prices_name, + comments="?", + dtype=[("column1", "U1000"), ("column2", "f8")], + delimiter=",", + skip_header=0, + usecols=(0, 1), + ) + except FileNotFoundError: + print("Prices file not found, exiting pathway ranking") + raise + + price_result = np.atleast_1d(price_result) + price_d = dict() # {smiles: float} + for row in price_result: + smiles = str(row[0]) + price = float(row[1]) + price_d[Chem.MolToSmiles(Chem.MolFromSmiles(smiles))] = price + + # check all pathways and gether all used starters + used_starters = set() + for path in data: + for rxn in path: + reas = rxn.split(">")[0].split(".") + for rea in reas: + if rea in starters: + used_starters.add(rea) + need_price = set() + for starter in used_starters: + if starter not in price_d: + need_price.add(starter) + if len(need_price) != 0: + raise ValueError( + "Prices of the following SMILES are needed:", need_price + ) + # get scores + none_PI = 0 + min_PI = float("inf") + max_PI = 0 + for idx, path in enumerate(data): + has_bio_rxn_flag = False + for rxn in path: + name = rxn.split(">")[1] + if name in bio_rxn_names: + has_bio_rxn_flag = True + + price_zero = set() # if bio rxn skip cofactors not in helpers + if has_bio_rxn_flag: + price_zero = cofactors_set - starters + try: + path_PI = path_profit(path, price_zero, price_d) + except KeyError: + path_PI = 0 + print( + "Profitability index calculation error for pathway", idx + 1 + ) + # print(idx) + if path_PI is None: + none_PI += 1 + profit_list.append(None) + else: + min_PI = min(min_PI, path_PI) + max_PI = max(max_PI, path_PI) + profit_list.append(path_PI) + + for idx, i in enumerate(profit_list): + if i is None: + profit_list[idx] = min_PI + + print("Min_PI", min_PI) + print("Max_PI", max_PI) + print("None_PI", none_PI) + diff_PI = max_PI - min_PI + for i in profit_list: + if diff_PI != 0: + profit_score_list.append((i - min_PI) / diff_PI) + else: + profit_score_list.append((i - min_PI) / 0.001) + print("Pathway profitability index ranking finished") + + else: + profit_score_list = [0] * len(data) + profit_list = [0] * len(data) + # Number of steps ############################### if weights["number_of_steps"] != 0: print("Number of steps ranking working") @@ -2088,6 +2381,7 @@ def SMILES_rm_keku( weight.append(weights["salt_score"]) weight.append(weights["in_reaxys"]) weight.append(weights["coolness"]) + weight.append(weights["profit"]) final_score = list() @@ -2100,6 +2394,7 @@ def SMILES_rm_keku( + salt_score_list[i] * weight[4] + reaxys_score_list[i] * weight[5] + cool_score_list[i] * weight[6] + + profit_score_list[i] * weight[7] ) # print("len(final_score)", len(final_score)) @@ -2120,6 +2415,8 @@ def SMILES_rm_keku( salt_score_list, reaxys_score_list, cool_score_list, + profit_score_list, + profit_list, ) = zip( *sorted( zip( @@ -2135,6 +2432,8 @@ def SMILES_rm_keku( salt_score_list, reaxys_score_list, cool_score_list, + profit_score_list, + profit_list, strict=False, ) ), @@ -2149,7 +2448,6 @@ def clean_rxn_strings( dH_list = list() for rxn in _path: dH = rxn.split(">")[2].split("$")[0] - rxn_str = rxn.split(">")[0] + ">>" + rxn.split(">")[3] clean_path.append(rxn_str) names.append(rxn.split(">")[1]) @@ -2227,6 +2525,14 @@ def clean_rxn_strings( " = ", cool_score_list[idx] * weight[6], ) + print( + "Profitability idex score", + profit_score_list[idx], + " x ", + weight[7], + " = ", + profit_score_list[idx] * weight[7], + ) f_result.write("final score " + str(final_score[idx])) f_result.write("\n") @@ -2236,6 +2542,9 @@ def clean_rxn_strings( print("pathway by-product", by_pro_list[idx]) f_result.write("pathway by-product " + str(by_pro_list[idx])) f_result.write("\n") + print("profitability idex", profit_list[idx]) + f_result.write("profitability idex " + str(profit_list[idx])) + f_result.write("\n") print( "intermediate by-product", intermediate_by_product_dict_list[idx], @@ -2276,6 +2585,7 @@ def create_page( _reaxys_rxn_color, _normal_rxn_color, use_custom_layout, + price_d, ): font_path = Path(__file__).parent / "OpenSans-Regular.ttf" @@ -2326,11 +2636,19 @@ def add_margin(pil_img): # add white margin on top and bottom of image return result def add_text(img, msg): # add a text on top-right of image + font_size = 25 img_w, img_h = img.size I1 = ImageDraw.Draw(img) - myFont = ImageFont.truetype(str(font_path), 25) + myFont = ImageFont.truetype(str(font_path), font_size) new_box = I1.textbbox((0, 0), msg, font=myFont) - I1.text((img_w - new_box[2], 0), msg, font=myFont, fill=(0, 0, 255)) + if new_box[2] > img_w: + myFont = ImageFont.truetype(str(font_path), 10) + new_box = I1.textbbox((0, 0), msg, font=myFont) + I1.text( + (img_w - new_box[2], 15), msg, font=myFont, fill=(0, 0, 255) + ) + else: + I1.text((img_w - new_box[2], 0), msg, font=myFont, fill=(0, 0, 255)) return img def SMILES_rm_keku( @@ -2416,6 +2734,9 @@ def SMILES_rm_keku( + "\n" + "Pathway by-product " + pathway_dict["pathway_by-product"] + + "\n" + + "Profitability Idex " + + pathway_dict["profitability_idex"] ) # to display on each page split_list = pathway_dict["enthalpy"] @@ -2444,6 +2765,7 @@ def SMILES_rm_keku( "intermediate_by-product" ] # intermediate by product number, dict, key smiles, value int inter_py_pro["O=C=O"] = "" # for reagent node (CO2) + # rxn_node = 0 node_labels_dict = dict() node_labels_reagent = dict() @@ -2491,7 +2813,12 @@ def addjust_mol_image(mol): ) ) ), - str(inter_py_pro.get(rea, "")), + str(inter_py_pro.get(rea, "")) + + ( + " $" + price_d.get(rea, "") + if price_d.get(rea, "") + else "" + ), ), ) G.add_edge(rea, rxn_node, arror_margin=10) @@ -2509,7 +2836,12 @@ def addjust_mol_image(mol): ) ) ), - str(inter_py_pro.get(pro, "")), + str(inter_py_pro.get(pro, "")) + + ( + " $" + price_d.get(pro, "") + if price_d.get(pro, "") + else "" + ), ), ) G.add_edge(rxn_node, pro, arror_margin=120) @@ -2542,7 +2874,12 @@ def addjust_mol_image(mol): ) ) ), - str(inter_py_pro.get(i, "")), + str(inter_py_pro.get(i, "")) + + ( + " $" + price_d.get(i, "") + if price_d.get(i, "") + else "" + ), ), ) G2.add_edge( @@ -2653,6 +2990,7 @@ def pathway_visualization( exclude_smiles=None, reaxys_rxn_color="blue", normal_rxn_color="black", + chemicals_prices_name=None, ): start_time = time.time() print(f"Job name: {job_name}") @@ -2778,6 +3116,7 @@ def pathway_visualization( if "ranking" in i: pathway_marker.append(idx) + smiles_line_pos = 7 for idx, marker in enumerate(pathway_marker): temp_dict = dict() temp_dict["ranking"] = str(pathway_num) @@ -2787,7 +3126,10 @@ def pathway_visualization( str("%.1f" % float(float(clean_list[marker + 2][15:]) * 100)) + "%" ) temp_dict["pathway_by-product"] = clean_list[marker + 3][19:] - temp_dict["intermediate_by-product"] = eval(clean_list[marker + 4][24:]) + temp_dict["profitability_idex"] = str( + round(float(clean_list[marker + 4][19:]), 2) + ) + temp_dict["intermediate_by-product"] = eval(clean_list[marker + 5][24:]) if idx + 1 < len(pathway_marker): # 1, 2, 3 next_marker = pathway_marker[idx + 1] @@ -2795,7 +3137,9 @@ def pathway_visualization( next_marker = len(clean_list) temp_dict["SMILES"] = clean_list[ - marker + 6 : marker + 6 + int((next_marker - (marker + 6)) / 3) + marker + smiles_line_pos : marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) ] all_reas = list() @@ -2804,14 +3148,18 @@ def pathway_visualization( temp_dict["all_reactants"] = set(all_reas) temp_dict["name"] = clean_list[ - marker + 6 + int((next_marker - (marker + 6)) / 3) : marker - + 6 - + int((next_marker - (marker + 6)) / 3) * 2 + marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) : marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) * 2 ] temp_dict["enthalpy"] = clean_list[ - marker + 6 + int((next_marker - (marker + 6)) / 3) * 2 : marker - + 6 - + int((next_marker - (marker + 6)) / 3) * 3 + marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) * 2 : marker + + smiles_line_pos + + int((next_marker - (marker + smiles_line_pos)) / 3) * 3 ] pathways_list.append(temp_dict) @@ -2846,6 +3194,27 @@ def pathway_visualization( ) ] = helpers[key] + # read chemicals_prices file + if chemicals_prices_name is None: + chemicals_prices_name = f"{job_name}_prices.csv" + try: + price_result = np.genfromtxt( + chemicals_prices_name, + comments="?", + dtype=[("column1", "U1000"), ("column2", "f8")], + delimiter=",", + skip_header=0, + usecols=(0, 1), + ) + price_result = np.atleast_1d(price_result) + price_d = dict() # {smiles: float} + for row in price_result: + smiles = str(row[0]) + price = str(row[1]) + price_d[Chem.MolToSmiles(Chem.MolFromSmiles(smiles))] = price + except FileNotFoundError: + price_d = dict() + print("Working on creating pages") print("You can adjust multi-processing number to speed up PDF generation") with Pool(processes=num_process) as pool: @@ -2862,6 +3231,7 @@ def pathway_visualization( reaxys_rxn_color, normal_rxn_color, use_custom_layout, + price_d, ), ) for work in work_list @@ -2920,6 +3290,7 @@ def one_step( max_rxn_thermo_change=15, consider_name_difference=True, reaxys_result_name=None, + chemicals_prices_name=None, exclude_smiles=None, reaxys_rxn_color="blue", normal_rxn_color="black", @@ -2961,6 +3332,7 @@ def one_step( "salt_score": 0, "in_reaxys": 0, "coolness": 0, + "profit": 0, } if reaxys_result_name is None: reaxys_result_name = f"{job_name}_reaxys_batch_result.csv" @@ -2976,6 +3348,7 @@ def one_step( cool_reactions=cool_reactions, molecule_thermo_calculator=molecule_thermo_calculator, max_rxn_thermo_change=max_rxn_thermo_change, + chemicals_prices_name=chemicals_prices_name, ) pathway_visualization( @@ -2987,4 +3360,5 @@ def one_step( exclude_smiles=exclude_smiles, reaxys_rxn_color=reaxys_rxn_color, normal_rxn_color=normal_rxn_color, + chemicals_prices_name=chemicals_prices_name, ) diff --git a/doranet/modules/synthetic/Reaction_Smarts_Forward.py b/doranet/modules/synthetic/Reaction_Smarts_Forward.py index 8aba1af..6ec3e24 100644 --- a/doranet/modules/synthetic/Reaction_Smarts_Forward.py +++ b/doranet/modules/synthetic/Reaction_Smarts_Forward.py @@ -31,6 +31,9 @@ class OperatorSmarts: reaction_type: typing.Optional[str] = ( "Catalytic" # Catalytic for chemical reactions. the other option is Enzymatic, for bio rxn rules ) + regioselectivity: typing.Optional[tuple[str, int, int]] = ( + None # If not None, use a tuple like ("Markovnikov", 0, 0), can be "Markovnikov", "Anti-Markovnikov", "Zaitsev", "Hofmann", and "Baeyer-Villiger". Followed by the index of the main reactant in reactants and the index of main product in products + ) op_smarts = ( @@ -51,48 +54,65 @@ class OperatorSmarts: (1, 1), ), # Below are some typical cracking/pyrolysis reactions, use when necessary - # Propane Cracking - OperatorSmarts( - "Propane Cracking", - "[C+0H3:1][C+0H2:2][C+0H3:3]>>[*:1]=[*:2].[*:3]", - (1,), - (1, 1), - ), - # Butane Cracking - OperatorSmarts( - "Butane Cracking 1", - "[C+0H3:1][C+0H2:2][C+0H2:3][C+0H3:4]>>[*:1][*:2]=[*:3].[*:4]", + # # Propane Cracking (A special case of Alkane Cracking) + # OperatorSmarts( + # "Propane Cracking", + # "[C+0H3:1][C+0H2:2][C+0H3:3]>>[*:1]=[*:2].[*:3]", + # (1,), + # (1, 1), + # ), + # # Butane Cracking (A special case of Alkane Cracking) + # OperatorSmarts( + # "Butane Cracking 1", + # "[C+0H3:1][C+0H2:2][C+0H2:3][C+0H3:4]>>[*:1][*:2]=[*:3].[*:4]", + # (1,), + # (1, 1), + # ), + # OperatorSmarts( + # "Butane Cracking 2", + # "[C+0H3:1][C+0H2:2][C+0H2:3][C+0H3:4]>>[*:1]=[*:2].[*:3][*:4]", + # (1,), + # (1, 1), + # ), + # # Pentane Cracking (A special case of Alkane Cracking) + # OperatorSmarts( + # "Pentane Cracking", + # "[C+0H3:1][C+0H2:2][C+0H2:3][C+0H2:4][C+0H3:5]>>[*:1][*:2][*:3]=[*:4].[*:5]", + # (1,), + # (1, 1), + # ), + # Alkane Cracking + OperatorSmarts( + "Alkane Cracking", + "[C+0!H0:1][C+0:2]!@[C+0:3]>>[*:1]=[*:2].[*:3]", (1,), (1, 1), + allowed_elements=("C", "H"), ), OperatorSmarts( - "Butane Cracking 2", - "[C+0H3:1][C+0H2:2][C+0H2:3][C+0H3:4]>>[*:1]=[*:2].[*:3][*:4]", + "Alkane Cracking, Intramolecular", + "[C+0!H0:1][C+0:2]@[C+0:3]>>([*:1]=[*:2].[*:3])", (1,), - (1, 1), - ), - # Pentane Cracking - OperatorSmarts( - "Pentane Cracking", - "[C+0H3:1][C+0H2:2][C+0H2:3][C+0H2:4][C+0H3:5]>>[*:1][*:2][*:3]=[*:4].[*:5]", (1,), - (1, 1), + allowed_elements=("C", "H"), ), - # Catalytic Reforming + # Alkane Cyclization (a type of catalytic reforming) OperatorSmarts( - "Catalytic Reforming", - "[C+0H3:1][C+0H2:2][C+0H2:3][C+0H2:4][C+0H2:5][C+0H3:6]>>[*:1]1[*:2][*:3][*:4][*:5][*:6]1.[H][H]", + "Alkane Cyclization", + "[C+0!H0:1][C+0:2][C+0:3][C+0:4][C+0:5][C+0!H0:6]>>[*:1]1[*:2][*:3][*:4][*:5][*:6]1.[H][H]", (1,), (1, 1), + allowed_elements=("C", "H"), ), - # Isomerization of Butane + # Alkane Isomerization OperatorSmarts( - "Isomerization of Butane", - "[C+0H3:1][C+0H2:2][C+0H2:3][C+0H3:4]>>[*:1][*:2]([*:3])[*:4]", + "Alkane Isomerization", + "[C+0:1][C+0!H0:2][C+0:3][C+0H3:4]>>[*:1][*:2]([*:3])[*:4]", (1,), (1,), + allowed_elements=("C", "H"), ), - # Alkane Dehydrogenation these are used in the cracking process but may not suit general organic chemistry applications, use when necessary + # Alkane Dehydrogenation OperatorSmarts( "Alkane Dehydrogenation", "[C+0!H0:1][C+0!H0:2]>>[*:1]=[*:2].[H][H]", @@ -181,6 +201,7 @@ class OperatorSmarts: "[C+0:1]=[C+0:2].[O+0!H0:3]>>[*:1][*:2][*:3]", (1, 1), (1,), + regioselectivity=("Markovnikov", 0, 0), ), OperatorSmarts( "Addition of Alcohols or Acids to Alkenes, Intramolecular", @@ -202,6 +223,7 @@ class OperatorSmarts: "[C+0:1]=[C+0:2].[F,Cl,Br,I;+0H:3]>>[*:1][*:2][*:3]", (1, 1), (1,), + regioselectivity=("Markovnikov", 0, 0), ), # Halogenation of Alkenes OperatorSmarts( @@ -423,6 +445,7 @@ class OperatorSmarts: "[C+0:1]#[C+0:2].[F,Cl,Br,I;+0H:3]>>[*:1]([*:3])=[*:2]", (1, 1), (1,), + regioselectivity=("Markovnikov", 0, 0), ), # Hydration of Alkynes OperatorSmarts( @@ -430,6 +453,7 @@ class OperatorSmarts: "[C+0:1]#[C+0:2].[O+0H2:3]>>[*:1]([*:3])=[*:2]", (1, 1), (1,), + regioselectivity=("Markovnikov", 0, 0), ), # Hydrogenation of Alkynes OperatorSmarts( @@ -562,6 +586,7 @@ class OperatorSmarts: "[C+0:1]([F,Cl,Br,I;+0:2])[C+0!H0:3]>>[*:1]=[*:3].[*:2]", (1,), (1, 1), + regioselectivity=("Zaitsev", 0, 0), ), # Aromatic Halogenation OperatorSmarts( @@ -772,6 +797,7 @@ class OperatorSmarts: "[C+0!H0:1][C+0:2][O+0H:3]>>[*:1]=[*:2].[*:3]", (1,), (1, 1), + regioselectivity=("Zaitsev", 0, 0), ), # Dehydration of Alcohols, 2-step OperatorSmarts( @@ -1003,6 +1029,7 @@ class OperatorSmarts: "[C,c;+0:5][C+0:1](=[O+0:2])[C,c;+0:3].[O+0:4]=[O+0]>>[*:5][*:1](=[*:2])[*:4][*:3]", (1, 0.5), (1,), + regioselectivity=("Baeyer-Villiger", 0, 0), ), # may use kekulize_flag = True but needs confirmation OperatorSmarts( "Baeyer-Villiger Oxidation (Aldehydes)", # with aldehydes doi.org/10.1002/0471264180.or043.03 @@ -1784,6 +1811,7 @@ class OperatorSmarts: "[C+0X4!H0:1][CX4+0:2][N+0H2:3].[C+0:4][F,Cl,Br,I;+0:5]>>[*:1]=[*:2].[*:3]([*:4])([*:4])[*:4].[*:5]", (1, 3), (1, 1, 3), + regioselectivity=("Hofmann", 0, 0), ), OperatorSmarts( "Hofmann Elimination R-NHR", @@ -1822,7 +1850,7 @@ class OperatorSmarts: (1, 1, 1), (1, 2), ), - # Hydroamination of Alkenes + # Hydroamination of Alkenes can be Markovnikov or anti-Markovnikov depending on the catalyst OperatorSmarts( "Hydroamination of Alkenes", "[C+0:1]=[C+0:2].[N+0X3!H0:3]>>[*:1][*:2][*:3]", @@ -1899,7 +1927,7 @@ class OperatorSmarts: (1, 1, 1), (1, 2, 1), ), - # Primary Amines with Nitrous Acid to Alkenes not including possible rearrangement + # Primary Amines with Nitrous Acid to Alkenes not including possible rearrangement; Zaitsev? OperatorSmarts( "Primary Amines with Nitrous Acid to Alkenes", "[C+0!H0:6][C+0:1][N+0H2:2].[O+0H][N+0:4]=[O+0:5]>>[*:6]=[*:1].[*:5].[*:2]#[*:4]", @@ -2496,7 +2524,7 @@ class OperatorSmarts: (1, 1), (1,), ), - # Thiol-ene Reaction + # Thiol-ene Reaction Anti-Markovnikov OperatorSmarts( "Thiol-ene Reaction", "[C,c;+0:1][SX2+0H:2].[C+0:3]=[C+0:4]>>[*:1][*:2][*:3][*:4]", @@ -2666,7 +2694,7 @@ class OperatorSmarts: (1, 1), (1, 1), ), - # Alkenes Addition by Sulfuric Acid + # Alkenes Addition by Sulfuric Acid Markovnikov OperatorSmarts( "Alkenes Addition by Sulfuric Acid", "[C+0:1]=[C+0:2].[O+0:3]=[SX4+0:4](=[O+0:5])([O+0H:6])[O+0H:7]>>[*:1][*:2][*:6][*:4](=[*:3])(=[*:5])[*:7]", @@ -2679,7 +2707,7 @@ class OperatorSmarts: (1,), (1, 1), ), - # Alkenes Addition by Bisulfites + # Alkenes Addition by Bisulfites Markovnikov? OperatorSmarts( "Alkenes Addition by Bisulfites", "[C+0:1]=[C+0:2].[O+0:3]=[SX3+0:4]([O+0H:5])[O+0H:6]>>[*:1][*:2][*:4](=[*:3])(=[*:5])[*:6]", diff --git a/doranet/modules/synthetic/Reaction_Smarts_Retro.py b/doranet/modules/synthetic/Reaction_Smarts_Retro.py index 980adc6..b35178f 100644 --- a/doranet/modules/synthetic/Reaction_Smarts_Retro.py +++ b/doranet/modules/synthetic/Reaction_Smarts_Retro.py @@ -31,6 +31,9 @@ class OperatorSmarts: reaction_type: typing.Optional[str] = ( "Catalytic" # Catalytic for chemical reactions. the other option is Enzymatic, for bio rxn rules ) + regioselectivity: typing.Optional[tuple[str, int, int]] = ( + None # If not None, use a tuple like ("Markovnikov", 0, 0), can be "Markovnikov", "Anti-Markovnikov", "Zaitsev", "Hofmann", and "Baeyer-Villiger". Followed by the index of the main reactant in reactants and the index of main product in products + ) op_retro_smarts = ( @@ -51,48 +54,68 @@ class OperatorSmarts: (1, 1), ), # Below are some typical cracking/pyrolysis reactions, use when necessary - # Propane Cracking - OperatorSmarts( - "Propane Cracking", - "[C+0H2:1]=[C+0H2:2].[C+0H4:3]>>[*:1][*:2][*:3]", + # # Propane Cracking (A special case of Alkane Cracking) + # OperatorSmarts( + # "Propane Cracking", + # "[C+0H2:1]=[C+0H2:2].[C+0H4:3]>>[*:1][*:2][*:3]", + # (1, 1), + # (1,), + # ), + # # Butane Cracking (A special case of Alkane Cracking) + # OperatorSmarts( + # "Butane Cracking 1", + # "[C+0H3:1][C+0H:2]=[C+0H2:3].[C+0H4:4]>>[*:1][*:2][*:3][*:4]", + # (1, 1), + # (1,), + # ), + # OperatorSmarts( + # "Butane Cracking 2", + # "[C+0H2:1]=[C+0H2:2].[C+0H3:3][C+0H3:4]>>[*:1][*:2][*:3][*:4]", + # (1, 1), + # (1,), + # ), + # # Pentane Cracking (A special case of Alkane Cracking) + # OperatorSmarts( + # "Pentane Cracking", + # "[C+0H3:1][C+0H2:2][C+0H:3]=[C+0H2:4].[C+0H4:5]>>[*:1][*:2][*:3][*:4][*:5]", + # (1, 1), + # (1,), + # ), + # Alkane Cracking + OperatorSmarts( + "Alkane Cracking", + "[C+0:1]=[C+0:2].[C+0!H0:3]>>[*:1][*:2][*:3]", (1, 1), (1,), + kekulize_flag=True, + allowed_elements=("C", "H"), ), - # Butane Cracking OperatorSmarts( - "Butane Cracking 1", - "[C+0H3:1][C+0H:2]=[C+0H2:3].[C+0H4:4]>>[*:1][*:2][*:3][*:4]", - (1, 1), + "Alkane Cracking, Intramolecular", + "([C+0:1]=[C+0:2].[C+0!H0:3])>>[*:1][*:2][*:3]", (1,), - ), - OperatorSmarts( - "Butane Cracking 2", - "[C+0H2:1]=[C+0H2:2].[C+0H3:3][C+0H3:4]>>[*:1][*:2][*:3][*:4]", - (1, 1), - (1,), - ), - # Pentane Cracking - OperatorSmarts( - "Pentane Cracking", - "[C+0H3:1][C+0H2:2][C+0H:3]=[C+0H2:4].[C+0H4:5]>>[*:1][*:2][*:3][*:4][*:5]", - (1, 1), (1,), + ring_issue=True, + kekulize_flag=True, + allowed_elements=("C", "H"), ), - # Catalytic Reforming + # Alkane Cyclization (a type of catalytic reforming) OperatorSmarts( - "Catalytic Reforming", - "[C+0H2:1]1[C+0H2:2][C+0H2:3][C+0H2:4][C+0H2:5][C+0H2:6]1.[H][H]>>[*:1][*:2][*:3][*:4][*:5][*:6]", + "Alkane Cyclization", + "[C+0:1]1[C+0:2][C+0:3][C+0:4][C+0:5][C+0:6]1.[H][H]>>[*:1][*:2][*:3][*:4][*:5][*:6]", (1, 1), (1,), + allowed_elements=("C", "H"), ), - # Isomerization of Butane + # Alkane Isomerization OperatorSmarts( - "Isomerization of Butane", - "[C+0H3:1][C+0H:2]([C+0H3:3])[C+0H3:4]>>[*:1][*:2][*:3][*:4]", + "Alkane Isomerization", + "[C+0:1][C+0:2]([C+0!H0:3])[C+0H3:4]>>[*:1][*:2][*:3][*:4]", (1,), (1,), + allowed_elements=("C", "H"), ), - # Alkane Dehydrogenation these are used in the cracking process but may not suit general organic chemistry applications, use when necessary + # Alkane Dehydrogenation OperatorSmarts( "Alkane Dehydrogenation", "[C+0:1]=[C+0:2].[H][H]>>[*:1][*:2]", @@ -187,6 +210,7 @@ class OperatorSmarts: (1,), (1, 1), Retro_Not_Aromatic=True, + regioselectivity=("Markovnikov", 0, 0), ), OperatorSmarts( "Addition of Alcohols or Acids to Alkenes, Intramolecular", @@ -211,6 +235,7 @@ class OperatorSmarts: (1,), (1, 1), Retro_Not_Aromatic=True, + regioselectivity=("Markovnikov", 0, 0), ), # Halogenation of Alkenes OperatorSmarts( @@ -440,6 +465,7 @@ class OperatorSmarts: "[C+0:1]([F,Cl,Br,I;+0:3])=[C+0!H0:2]>>[*:1]#[*:2].[*:3]", (1,), (1, 1), + regioselectivity=("Markovnikov", 0, 0), ), # Hydration of Alkynes OperatorSmarts( @@ -447,6 +473,7 @@ class OperatorSmarts: "[C+0:1]([O+0H:3])=[C+0!H0:2]>>[*:1]#[*:2].[*:3]", (1,), (1, 1), + regioselectivity=("Markovnikov", 0, 0), ), # Hydrogenation of Alkynes OperatorSmarts( @@ -573,6 +600,7 @@ class OperatorSmarts: (1, 1), (1,), kekulize_flag=True, + regioselectivity=("Zaitsev", 0, 0), ), # Aromatic Halogenation OperatorSmarts( @@ -798,6 +826,7 @@ class OperatorSmarts: (1, 1), (1,), kekulize_flag=True, + regioselectivity=("Zaitsev", 0, 0), ), # Dehydration of Alcohols, 2-step OperatorSmarts( @@ -1040,6 +1069,7 @@ class OperatorSmarts: (1, 0.5), kekulize_flag=True, Retro_Not_Aromatic=True, + regioselectivity=("Baeyer-Villiger", 0, 0), ), OperatorSmarts( "Baeyer-Villiger Oxidation (Aldehydes)", # with aldehydes doi.org/10.1002/0471264180.or043.03 @@ -1859,6 +1889,7 @@ class OperatorSmarts: (1, 1, 3), (1, 3), kekulize_flag=True, + regioselectivity=("Hofmann", 0, 0), ), OperatorSmarts( "Hofmann Elimination R-NHR", # similar problem as above, so limit to N(CH3)(CH3)R @@ -1902,7 +1933,7 @@ class OperatorSmarts: (1, 2), (1, 1, 1), ), - # Hydroamination of Alkenes + # Hydroamination of Alkenes can be Markovnikov or anti-Markovnikov depending on the catalyst OperatorSmarts( "Hydroamination of Alkenes", "[C+0!H0:1][C+0:2]-!@[N+0X3:3]>>[*:1]=[*:2].[*:3]", diff --git a/doranet/modules/synthetic/generate_network.py b/doranet/modules/synthetic/generate_network.py index ad98c1f..b69c717 100644 --- a/doranet/modules/synthetic/generate_network.py +++ b/doranet/modules/synthetic/generate_network.py @@ -8,7 +8,7 @@ from datetime import datetime from rdkit import Chem -from rdkit.Chem import rdqueries +from rdkit.Chem import rdqueries, rdRascalMCES from rdkit.Chem.rdMolDescriptors import CalcMolFormula import doranet as dn @@ -420,6 +420,349 @@ def meta_required(self) -> interfaces.MetaKeyPacket: return interfaces.MetaKeyPacket() +@typing.final +@dataclasses.dataclass(frozen=True) +class Regioselectivity_filter(metadata.ReactionFilterBase): + __slots__ = ("regio_user", "direction") + regio_user: typing.Optional[typing.Collection] + direction: str + + def __call__(self, recipe: interfaces.ReactionExplicit) -> bool: + if self.regio_user is None: # if user doesn't use it + return True + regio_rxn_tup = recipe.operator.meta["regioselectivity"] + if regio_rxn_tup is None: # if rxn doesn't have it + return True + Markovnikov, Anti_Markovnikov, Baeyer_Villiger, Zaitsev, Hofmann = ( + False, + False, + False, + False, + False, + ) + if regio_rxn_tup[0] in self.regio_user: + if regio_rxn_tup[0] == "Markovnikov": + Markovnikov = True + elif regio_rxn_tup[0] == "Anti-Markovnikov": + Anti_Markovnikov = True + elif regio_rxn_tup[0] == "Baeyer-Villiger": + Baeyer_Villiger = True + elif regio_rxn_tup[0] == "Zaitsev": + Zaitsev = True + elif regio_rxn_tup[0] == "Hofmann": + Hofmann = True + else: + return True + main_rea = recipe.reactants[regio_rxn_tup[1]].item + if not isinstance(main_rea, interfaces.MolDatRDKit): + raise NotImplementedError( + f"""Filter only implemented for molecule type \ + MolDatRDKit, not {type(main_rea)}""" + ) + main_pro = recipe.products[regio_rxn_tup[2]].item + if not isinstance(main_pro, interfaces.MolDatRDKit): + raise NotImplementedError( + f"""Filter only implemented for molecule type \ + MolDatRDKit, not {type(main_pro)}""" + ) + reactant = main_rea.rdkitmol + product = main_pro.rdkitmol + if self.direction == "retro": + reactant, product = product, reactant + if Zaitsev or Hofmann: + reactant, product = product, reactant + mol1 = reactant # reactant + num_atoms1 = mol1.GetNumAtoms() + # add Hs so terminal double bonds like C=CC, and + # small mols like CC=CC can be properly matched + mol1 = Chem.AddHs(mol1) + mol2 = product + num_atoms2 = mol2.GetNumAtoms() + mol2 = Chem.AddHs(mol2) + # rdRascalMCES opts + opts = rdRascalMCES.RascalOptions() + opts.similarityThreshold = 0 # so it works for small mols + opts.maxBondMatchPairs = 3000 # default 1k. + opts.ringMatchesRingOnly = True + # Find MCES + results = rdRascalMCES.FindMCES(mol1, mol2, opts) + try: + res = results[0] + except IndexError: + return True # if molecule too big to match, let it pass + matching_bonds = res.bondMatches() + matching_atoms = res.atomMatches() + mol1_all_bond_indices = [bond.GetIdx() for bond in mol1.GetBonds()] + mol1_matching_bonds = {t[0] for t in matching_bonds} + mol1_rxn_bond_index_list = [ + item + for item in mol1_all_bond_indices + if item not in mol1_matching_bonds + ] + if len(mol1_rxn_bond_index_list) != 1: + raise ValueError( + "Number of transformed bonds in the reactant is not 1" + ) + mol1_rxn_bond_index = mol1_rxn_bond_index_list[0] + mol1_rxn_bond_obj = mol1.GetBondWithIdx(mol1_rxn_bond_index) + mol1_atom1_idx = mol1_rxn_bond_obj.GetBeginAtomIdx() + mol1_atom1 = mol1_rxn_bond_obj.GetBeginAtom() + mol1_atom2_idx = mol1_rxn_bond_obj.GetEndAtomIdx() + mol1_atom2 = mol1_rxn_bond_obj.GetEndAtom() + + def find_ketone_carbon(mol, atom1, atom2, index1, index2): + oxygen_atom_num = 8 + CO_atom = [] + for neighbor in atom1.GetNeighbors(): + # Check if neighbor is Oxygen + if neighbor.GetAtomicNum() == oxygen_atom_num: + bond = mol.GetBondBetweenAtoms(index1, neighbor.GetIdx()) + if bond.GetBondType() == Chem.rdchem.BondType.DOUBLE: + CO_atom.append((atom1, index2)) + for neighbor in atom2.GetNeighbors(): + # Check if neighbor is Oxygen + if neighbor.GetAtomicNum() == oxygen_atom_num: + bond = mol.GetBondBetweenAtoms(index2, neighbor.GetIdx()) + if bond.GetBondType() == Chem.rdchem.BondType.DOUBLE: + CO_atom.append((atom2, index1)) + if len(CO_atom) == 1: + return CO_atom[0] + diketones_case = 2 + return len(CO_atom) == diketones_case + # elif len(CO_atom) == 2: # 1,2-diketones + # return True + # else: # if no ketone carbon is found + # return False + + if Markovnikov or Anti_Markovnikov: + if num_atoms2 - num_atoms1 == 1: + mol1_atom1_non_h_neighbors = [ + nbr.GetIdx() + for nbr in mol1_atom1.GetNeighbors() + if nbr.GetAtomicNum() != 1 + ] + num_mol1_atom1_non_h_neighbors = len(mol1_atom1_non_h_neighbors) + mol1_atom2_non_h_neighbors = [ + nbr.GetIdx() + for nbr in mol1_atom2.GetNeighbors() + if nbr.GetAtomicNum() != 1 + ] + num_mol1_atom2_non_h_neighbors = len(mol1_atom2_non_h_neighbors) + + mol2_atom1_idx = next( + second + for first, second in matching_atoms + if first == mol1_atom1_idx + ) + mol2_atom2_idx = next( + second + for first, second in matching_atoms + if first == mol1_atom2_idx + ) + mol2_atom1 = mol2.GetAtomWithIdx(mol2_atom1_idx) + mol2_atom2 = mol2.GetAtomWithIdx(mol2_atom2_idx) + mol2_atom1_non_h_neighbors = [ + nbr.GetIdx() + for nbr in mol2_atom1.GetNeighbors() + if nbr.GetAtomicNum() != 1 + ] + num_mol2_atom1_non_h_neighbors = len(mol2_atom1_non_h_neighbors) + mol2_atom2_non_h_neighbors = [ + nbr.GetIdx() + for nbr in mol2_atom2.GetNeighbors() + if nbr.GetAtomicNum() != 1 + ] + num_mol2_atom2_non_h_neighbors = len(mol2_atom2_non_h_neighbors) + if ( + num_mol1_atom1_non_h_neighbors + == num_mol1_atom2_non_h_neighbors + ): + return True + mo1_more_sub = ( + "atom1" + if num_mol1_atom1_non_h_neighbors + > num_mol1_atom2_non_h_neighbors + else "atom2" + ) + # check which atom in mol2 gets new group + if ( + num_mol2_atom1_non_h_neighbors + > num_mol1_atom1_non_h_neighbors + ): + atom_increased = "atom1" + elif ( + num_mol2_atom2_non_h_neighbors + > num_mol1_atom2_non_h_neighbors + ): + atom_increased = "atom2" + else: + raise ValueError( + "Something wrong with the addition reaction" + ) + if Markovnikov: + return mo1_more_sub == atom_increased + if Anti_Markovnikov: + return mo1_more_sub != atom_increased + elif num_atoms2 - num_atoms1 > 1: # addition of alcohols + return True + else: + raise ValueError("Something wrong with the addition reaction") + + elif Baeyer_Villiger: + carbon_atom_num = 6 + oxygen_atom_num = 8 + mol1_ketone = find_ketone_carbon( + mol1, mol1_atom1, mol1_atom2, mol1_atom1_idx, mol1_atom2_idx + ) + if type(mol1_ketone) is tuple: + mol1_CO_C = mol1_ketone[0] + mol1_rebond_C_idx = mol1_ketone[1] + two_neighbors = list() + for neighbor in mol1_CO_C.GetNeighbors(): + # 2 neighboring C atoms + if neighbor.GetAtomicNum() == carbon_atom_num: + two_neighbors.append(neighbor) + # redefine mol1, mol2 as the 2 neighboring atoms of C=O + mol1_atom1, mol1_atom2 = two_neighbors + mol1_atom1_idx = mol1_atom1.GetIdx() + mol1_atom2_idx = mol1_atom2.GetIdx() + mol1_unchanged_C_idx = ( + mol1_atom1_idx + if mol1_atom2_idx == mol1_rebond_C_idx + else mol1_atom2_idx + ) + # check if unchanged is a C=O group, if so let it pass + mol1_unchanged_C = mol1.GetAtomWithIdx(mol1_unchanged_C_idx) + for neighbor in mol1_unchanged_C.GetNeighbors(): + # Check if neighbor is O + if neighbor.GetAtomicNum() == oxygen_atom_num: + bond = mol1.GetBondBetweenAtoms( + mol1_unchanged_C_idx, neighbor.GetIdx() + ) + if bond.GetBondType() == Chem.rdchem.BondType.DOUBLE: + return True + mol1_atom1_non_h_neighbors = [ + nbr.GetIdx() + for nbr in mol1_atom1.GetNeighbors() + if nbr.GetAtomicNum() == carbon_atom_num + ] + num_mol1_atom1_non_h_neighbors = len(mol1_atom1_non_h_neighbors) + mol1_atom2_non_h_neighbors = [ + nbr.GetIdx() + for nbr in mol1_atom2.GetNeighbors() + if nbr.GetAtomicNum() == carbon_atom_num + ] + num_mol1_atom2_non_h_neighbors = len(mol1_atom2_non_h_neighbors) + if ( + num_mol1_atom1_non_h_neighbors + == num_mol1_atom2_non_h_neighbors + ): + return True + if ( + num_mol1_atom1_non_h_neighbors + > num_mol1_atom2_non_h_neighbors + ): + mo1_more_sub_idx = mol1_atom1_idx + else: + mo1_more_sub_idx = mol1_atom2_idx + return mo1_more_sub_idx == mol1_rebond_C_idx + elif mol1_ketone is True: # 1,2-diketones, let it pass + return True + else: # no ketone + raise ValueError( + "No carbonyl group reacted in the Baeyer Villiger rxn" + ) + + elif Zaitsev or Hofmann: + carbon_atom_num = 6 + if num_atoms2 - num_atoms1 == 1: + mol2_atom1_idx = next( + second + for first, second in matching_atoms + if first == mol1_atom1_idx + ) + mol2_atom2_idx = next( + second + for first, second in matching_atoms + if first == mol1_atom2_idx + ) + mol2_all_atom_indices = [ + atom.GetIdx() for atom in mol2.GetAtoms() + ] + mol2_matching_atoms = {t[1] for t in matching_atoms} + mol2_leaving_group_index = next( + item + for item in mol2_all_atom_indices + if ( + item not in mol2_matching_atoms + and mol2.GetAtomWithIdx(item).GetAtomicNum() != 1 + ) + ) + mol2_base_C = next( + atom + for atom in mol2.GetAtomWithIdx( + mol2_leaving_group_index + ).GetNeighbors() + if atom.GetAtomicNum() != 1 + ) + mol2_loseH_C_idx = ( + mol2_atom1_idx + if mol2_base_C.GetIdx() == mol2_atom2_idx + else mol2_atom2_idx + ) + base_neighbors = [ # find all neighbor C who has H + atom + for atom in mol2_base_C.GetNeighbors() + if ( + atom.GetAtomicNum() == carbon_atom_num + and any( + neighbor.GetAtomicNum() == 1 + for neighbor in atom.GetNeighbors() + ) + ) + ] + num_neighbor_list = list() + for atom in base_neighbors: + num_neighbor = len( + [ + nbr.GetIdx() + for nbr in atom.GetNeighbors() + if nbr.GetAtomicNum() == carbon_atom_num + ] + ) + num_neighbor_list.append(num_neighbor) + if Zaitsev: + max_value = max(num_neighbor_list) + max_neighbor_atom_indices = { + base_neighbors[i].GetIdx() + for i, x in enumerate(num_neighbor_list) + if x == max_value + } + return mol2_loseH_C_idx in max_neighbor_atom_indices + else: # Hofmann + min_value = min(num_neighbor_list) + min_neighbor_atom_indices = { + base_neighbors[i].GetIdx() + for i, x in enumerate(num_neighbor_list) + if x == min_value + } + return mol2_loseH_C_idx in min_neighbor_atom_indices + else: + raise ValueError( + "The leaving group might be too big for the Zaitsev filter" + ) + return True + + @property + def meta_required(self) -> interfaces.MetaKeyPacket: + return interfaces.MetaKeyPacket( + operator_keys={ + "Reaction_direction", + "regioselectivity", + } + ) + + def get_smiles_from_file(file_name): def is_valid_smiles(smiles_string): try: @@ -443,6 +786,41 @@ def is_valid_smiles(smiles_string): return clean_list +def validate_regioselectivity(regioselectivity): + allowed_values = { + "Markovnikov", + "Anti-Markovnikov", + "Zaitsev", + "Hofmann", + "Baeyer-Villiger", + } + if regioselectivity is None: + return None + elif regioselectivity == "all": + return { + "Markovnikov", + "Anti-Markovnikov", + "Zaitsev", + "Hofmann", + "Baeyer-Villiger", + } + elif isinstance(regioselectivity, (set, list, tuple)): + # Check if all elements in the collection are valid + if not all(item in allowed_values for item in regioselectivity): + raise ValueError( + "Invalid regioselectivity values. Allowed values:" + "Markovnikov, Anti-Markovnikov, Zaitsev, Hofmann," + "Baeyer-Villiger" + ) + else: + raise ValueError( + "Invalid regioselectivity values. Allowed values: None, all, or a" + "collection containing Markovnikov, Anti-Markovnikov, Zaitsev," + "Hofmann, Baeyer-Villiger" + ) + return regioselectivity # If all checks pass, input is valid + + def generate_network( job_name="default_job", starters=False, @@ -454,10 +832,14 @@ def generate_network( max_atoms=None, # {"C": 20} allow_multiple_reactants="default", # forward allowed, retro no targets=None, # string or list, set, etc. + regioselectivity=None, # None, "all", or use a selection: + # ("Markovnikov","Anti-Markovnikov","Zaitsev","Hofmann","Baeyer-Villiger") ): if not starters: raise Exception("At least one starter is needed to generate a network") + user_regio = validate_regioselectivity(regioselectivity) + starters = get_smiles_from_file(starters) helpers = get_smiles_from_file(helpers) targets = get_smiles_from_file(targets) @@ -505,6 +887,7 @@ def generate_network( "allowed_elements": smarts.allowed_elements, "Reaction_type": smarts.reaction_type, "Reaction_direction": direction, + "regioselectivity": smarts.regioselectivity, }, ) if smarts.kekulize_flag is True: @@ -526,6 +909,7 @@ def generate_network( "allowed_elements": smarts.allowed_elements, "Reaction_type": smarts.reaction_type, "Reaction_direction": direction, + "regioselectivity": smarts.regioselectivity, }, ) @@ -549,6 +933,7 @@ def generate_network( >> Enol_filter_forward() >> Check_balance_filter() >> Allowed_Elements_Filter() + >> Regioselectivity_filter(user_regio, "forward") >> Chem_Rxn_dH_Calculator( "dH", "forward", molecule_thermo_calculator ) @@ -564,6 +949,7 @@ def generate_network( >> Enol_filter_retro() >> Allowed_Elements_Filter() >> Check_balance_filter() + >> Regioselectivity_filter(user_regio, "retro") >> Chem_Rxn_dH_Calculator("dH", "retro", molecule_thermo_calculator) >> Rxn_dH_Filter(max_rxn_thermo_change, "dH") )