diff --git a/negotiator.py b/negotiator.py index f4351cf..0a0679f 100644 --- a/negotiator.py +++ b/negotiator.py @@ -65,6 +65,16 @@ class NoProtocol(BaseProtocol): def __init__(self, num_regions, num_discrete_action_levels) -> None: self.stages = [] super().__init__(num_regions, num_discrete_action_levels) + + def reset(self): + self.minimum_mitigation_rate_all_regions = np.zeros(self.num_regions) + + def get_protocol_state(self): + protocol_state = { + "minimum_mitigation_rate_all_regions": self.minimum_mitigation_rate_all_regions + / self.num_discrete_action_levels, + } + return protocol_state class BilateralNegotiatorWithOnlyTariff(BaseProtocol): @@ -1423,7 +1433,7 @@ class BasicClub(BaseProtocol): """ - def __init__(self, rice): + def __init__(self, num_regions, num_discrete_action_levels): """ Defines necessary parameters for communication with rice class @@ -1431,235 +1441,115 @@ def __init__(self, rice): Args: Rice: instance of RICE-N model """ - self.rice = rice self.stages = [ { "function": self.proposal_step, - "numberActions": ( - [self.rice.num_discrete_action_levels] - ), + "action_space": [num_discrete_action_levels], }, { "function": self.evaluation_step, - "numberActions": [2] * self.rice.num_regions, + "action_space": [2] * num_regions, }, ] - self.num_negotiation_stages = len(self.stages) + super().__init__(num_regions, num_discrete_action_levels) def reset(self): """ Add any negotiator specific values to global state """ + self.minimum_mitigation_rate_all_regions = np.zeros(self.num_regions) + self.proposed_mitigation_rate = np.zeros(self.num_regions) + self.proposal_decisions = np.zeros((self.num_regions, self.num_regions)) - for key in [ - "proposed_mitigation_rate", - ]: - self.rice.set_global_state( - key = key, - value = np.zeros(self.rice.num_regions), - timestep = self.rice.timestep, - ) - def proposal_step(self, actions=None): + def get_protocol_state(self): + protocol_state = { + "stage": np.array([self.stage_idx]) / self.num_stages, + "proposed_mitigation_rate": self.proposed_mitigation_rate + / self.num_discrete_action_levels, + "minimum_mitigation_rate_all_regions": self.minimum_mitigation_rate_all_regions + / self.num_discrete_action_levels, + "proposal_decisions": self.proposal_decisions, + "received_proposal_decisions": self.proposal_decisions.T, + } + return protocol_state + + def get_pub_priv_features(self): + public_features = ["stage", "proposed_mitigation_rate", "minimum_mitigation_rate_all_regions"] + private_features = [ + # "proposal_decisions", + "received_proposal_decisions", + ] + return public_features, private_features + + def proposal_step(self, actions: dict): """ Update Proposal States and Observations using proposal actions Update Stage to 1 - Evaluation """ - assert self.rice.negotiation_on - assert self.rice.stage == 1 - - assert isinstance(actions, dict) - assert len(actions) == self.rice.num_regions - - action_offset_index = len( - self.rice.savings_action_nvec - + self.rice.mitigation_rate_action_nvec - + self.rice.export_action_nvec - + self.rice.import_actions_nvec - + self.rice.tariff_actions_nvec - ) + assert self.stage_idx == 0 #each country proposes a mitigation rate 0-num_discrete_action_levels (10) - proposed_mitigation_rate = [ - actions[region_id][ - action_offset_index - ] - / self.rice.num_discrete_action_levels - for region_id in range(self.rice.num_regions) - ] - - self.rice.set_global_state( - "proposed_mitigation_rate", np.array(proposed_mitigation_rate), self.rice.timestep - ) - - obs = self.rice.generate_observation() - rew = {region_id: 0.0 for region_id in range(self.rice.num_regions)} - done = {"__all__": 0} - info = {} - return obs, rew, done, info + self.proposed_mitigation_rate = np.array( + [actions[region_id] for region_id in range(self.num_regions)] + ).squeeze() - def evaluation_step(self, actions=None): + def evaluation_step(self, actions: dict): """ Update minimum mitigation rates """ - assert self.rice.negotiation_on - - assert self.rice.stage == 2 - - assert isinstance(actions, dict) - assert len(actions) == self.rice.num_regions - - action_offset_index = len( - self.rice.savings_action_nvec - + self.rice.mitigation_rate_action_nvec - + self.rice.export_action_nvec - + self.rice.import_actions_nvec - + self.rice.tariff_actions_nvec - + self.rice.proposal_actions_nvec - ) - num_evaluation_actions = len(self.stages[1]["numberActions"]) + assert self.stage_idx == 1 - proposal_decisions = np.array( - [ - actions[region_id][ - action_offset_index : action_offset_index + num_evaluation_actions - ] - for region_id in range(self.rice.num_regions) - ] + self.proposal_decisions = np.array( + [actions[region_id] for region_id in range(self.num_regions)] ) # Force set the evaluation for own proposal to accept - for region_id in range(self.rice.num_regions): - proposal_decisions[region_id, region_id] = 1 - - #update global states - self.rice.set_global_state("proposal_decisions", proposal_decisions, self.rice.timestep) + for region_id in range(self.num_regions): + self.proposal_decisions[region_id, region_id] = 1 - proposed_mitigation_rates = np.array([self.rice.global_state["proposed_mitigation_rate"]["value"][ - self.rice.timestep, j - ] for j in range(self.rice.num_regions)]) - for region_id in range(self.rice.num_regions): - - result_mitigation = proposed_mitigation_rates * proposal_decisions[region_id, :] - self.rice.global_state["minimum_mitigation_rate_all_regions"]["value"][ - self.rice.timestep, region_id - ] = max( - result_mitigation - ) - obs = self.rice.generate_observation() - rew = {region_id: 0.0 for region_id in range(self.rice.num_regions)} - done = {"__all__": 0} - info = {} - return obs, rew, done, info + mmrar = (self.proposed_mitigation_rate * self.proposal_decisions).max(axis=1) + self.minimum_mitigation_rate_all_regions = mmrar - def generate_action_mask(self): + def get_partial_action_mask(self): """ Generate action masks. """ - mask_dict = {region_id: None for region_id in range(self.rice.num_regions)} - for region_id in range(self.rice.num_regions): - mask = self.rice.default_agent_action_mask.copy() - if self.rice.negotiation_on: - - #mask mitigation as per own club - minimum_mitigation_rate = int( - round( - self.rice.global_state["minimum_mitigation_rate_all_regions"][ - "value" - ][self.rice.timestep, region_id] - * self.rice.num_discrete_action_levels - ) - ) - mitigation_mask = np.array( - [0 for _ in range(minimum_mitigation_rate)] - + [ - 1 - for _ in range( - self.rice.num_discrete_action_levels - - minimum_mitigation_rate - ) - ] - ) - mask_start = sum(self.rice.savings_action_nvec) - mask_end = mask_start + sum(self.rice.mitigation_rate_action_nvec) - mask[mask_start:mask_end] = mitigation_mask - - # tariff masks - tariff_masks = [] - for other_region_id in range(self.rice.num_regions): - - other_region_mitigation_rate = int(round(self.rice.global_state["minimum_mitigation_rate_all_regions"]["value"][self.rice.timestep, other_region_id]\ - * self.rice.num_discrete_action_levels)) - - # if other region is self or self not in bilateral negotiation - if (other_region_id == region_id): - - # make no change to tariff policy - regional_tariff_mask = np.array( - [1 for _ in range(self.rice.num_discrete_action_levels)] - ) - - # if other region's mitigation rate less than yours - elif other_region_mitigation_rate < minimum_mitigation_rate: - - # tariff the 1-mitigation rate - region_tariff_rate = int( - round( - self.rice.num_discrete_action_levels - - self.rice.global_state[ - "minimum_mitigation_rate_all_regions" - ]["value"][self.rice.timestep, other_region_id] - ) - ) + action_mask_dict = defaultdict(dict) + for region_id in range(self.num_regions): + minimum_mitigation_rate = int(self.minimum_mitigation_rate_all_regions[region_id]) + mitigation_mask = [0] * int(minimum_mitigation_rate) + [1] * int( + self.num_discrete_action_levels - minimum_mitigation_rate + ) - regional_tariff_mask = np.array( - [0 for _ in range(region_tariff_rate)] - + [ - 1 - for _ in range( - self.rice.num_discrete_action_levels - - region_tariff_rate - ) - ] - ) + # tariff masks + tariff_mask = [] + for other_region_id in range(self.num_regions): + other_region_mitigation_rate = int(self.minimum_mitigation_rate_all_regions[other_region_id]) - # if other regions mitigation >= your own bonus - else: - # set tarrif cap - region_tariff_rate = int( - round( - self.rice.num_discrete_action_levels - - self.rice.global_state[ - "minimum_mitigation_rate_all_regions" - ]["value"][self.rice.timestep, other_region_id] - ) - ) + # if other region is self or self not in bilateral negotiation + if (other_region_id == region_id): + # make no change to tariff policy + regional_tariff_mask = [1] * self.num_discrete_action_levels - regional_tariff_mask = np.array( - [1 for _ in range(region_tariff_rate)] - + [ - 0 - for _ in range( - self.rice.num_discrete_action_levels - - region_tariff_rate - ) - ] - ) - tariff_masks.append(regional_tariff_mask) + # if other region's mitigation rate less than yours + elif other_region_mitigation_rate < minimum_mitigation_rate: + # tariff the 1-mitigation rate + region_tariff_rate = self.num_discrete_action_levels - other_region_mitigation_rate + regional_tariff_mask = [0] * region_tariff_rate + [1] * other_region_mitigation_rate - mask_start_tariff = sum( - self.rice.savings_action_nvec - + self.rice.mitigation_rate_action_nvec - + self.rice.export_action_nvec - + self.rice.import_actions_nvec - ) + # if other regions mitigation >= your own bonus + else: + # set tarrif cap + region_tariff_rate = self.num_discrete_action_levels - other_region_mitigation_rate + regional_tariff_mask = [1] * region_tariff_rate + [0] * other_region_mitigation_rate - mask_end_tariff = mask_start_tariff + sum(self.rice.tariff_actions_nvec) - mask[mask_start_tariff:mask_end_tariff] = np.concatenate(tariff_masks) + tariff_mask.extend(regional_tariff_mask) - mask_dict[region_id] = mask + action_mask_dict[region_id]["tariff"] = tariff_mask + action_mask_dict[region_id]["mitigation"] = mitigation_mask - return mask_dict + return action_mask_dict class BilateralNegotiator(BaseProtocol): diff --git a/rice.py b/rice.py index c09a804..e0da0b2 100644 --- a/rice.py +++ b/rice.py @@ -419,9 +419,9 @@ def generate_observation(self) -> dict: "damages_all_regions", "abatement_cost_all_regions", "production_all_regions", - "utility_all_regions", + # "utility_all_regions", "social_welfare_all_regions", - "reward_all_regions", + # "reward_all_regions", ] # Features concerning two regions diff --git a/scripts/create_submission_zip.py b/scripts/create_submission_zip.py index d0fc112..acd9251 100644 --- a/scripts/create_submission_zip.py +++ b/scripts/create_submission_zip.py @@ -8,13 +8,30 @@ """ Script to create the zipped submission file from the results directory """ +import logging +import os import shutil +from argparse import ArgumentParser from pathlib import Path -from scripts.evaluate_submission import get_results_dir, validate_dir +import yaml + +BACKWARDS_COMPAT_CONFIG = """ +trainer: + num_envs: 1 # number of environment replicas + rollout_fragment_length: 100 # divide episodes into fragments of this many steps each during rollouts. + train_batch_size: 2000 # total batch size used for training per iteration (across all the environments) + num_episodes: 100 # number of episodes to run the training for + framework: torch # framework setting. + # Note: RLlib supports TF as well, but our end-to-end pipeline is built for Pytorch only. + # === Hardware Settings === + num_workers: 1 # number of rollout worker actors to create for parallel sampling. + # Note: Setting the num_workers to 0 will force rollouts to be done in the trainer actor. + num_gpus: 0 # number of GPUs to allocate to the trainer process. This can also be fractional (e.g., 0.3 GPUs). +""" -def prepare_submission(results_dir: Path): +def prepare_submission(results_dir: Path) -> Path: """ # Validate all the submission files and compress into a .zip. Note: This method is also invoked in the trainer script itself! @@ -24,25 +41,117 @@ def prepare_submission(results_dir: Path): assert isinstance(results_dir, Path) # Validate the results directory - # validate_dir(results_dir) + _, success, comment = validate_dir(results_dir) + if not success: + raise FileNotFoundError(comment) + + # Remove all the checkpoint state files from the tmp directory except for the last one + policy_models = list(results_dir.glob("*.state_dict")) + policy_models = sorted(policy_models, key=lambda x: x.stat().st_mtime) + + # assemble list of files to copy + files_to_copy = list(results_dir.glob("*.py")) + files_to_copy.extend(list(results_dir.glob(".*"))) + files_to_copy.append(results_dir / "rice_rllib.yaml") + files_to_copy.append(policy_models[-1]) # Make a temporary copy of the results directory for zipping results_dir_copy = results_dir.parent / "tmp_copy" - shutil.copytree(results_dir, results_dir_copy) + results_dir_copy.mkdir(parents=True) - # Remove all the checkpoint state files from the tmp directory except for the last one - policy_models = list(results_dir_copy.glob("*.state_dict")) - policy_models = sorted(policy_models, key=lambda x: x.stat().st_mtime) - _ = [policy_model.unlink() for policy_model in policy_models[:-1]] + for file in files_to_copy: + shutil.copy(file, results_dir_copy / file.name) # Create the submission file and delete the temporary copy submission_file = Path("submissions") / results_dir.name shutil.make_archive(submission_file, "zip", results_dir_copy) - print("NOTE: The submission file is created at:", submission_file.with_suffix(".zip")) + print("NOTE: The submission file is created at:\t\t\t", submission_file.with_suffix(".zip")) + + # open rice config yaml file in copied directory + config_path = results_dir_copy / "rice_rllib.yaml" + with open(config_path, "r", encoding="utf8") as fp: + run_config = yaml.safe_load(fp) + + # modify the rice_config yaml to work with the original code + backwards_config = yaml.safe_load(BACKWARDS_COMPAT_CONFIG) + run_config["trainer"] = backwards_config["trainer"] + del run_config["logging"] + + # write rice_config yaml file to tmp directory + with open(config_path, "w", encoding="utf8") as fp: + yaml.dump(run_config, fp, default_flow_style=False) + + # Create the backwards compatible submission file and delete the temporary copy + submission_file = Path("submissions") / "backwards_compatible" / results_dir.name + shutil.make_archive(submission_file, "zip", results_dir_copy) + print("NOTE: The backwards compatible submission file is created at:\t", submission_file.with_suffix(".zip")) + + # delete temporary directory shutil.rmtree(results_dir_copy) - return submission_file + return submission_file.with_suffix(".zip") + + +def validate_dir(results_dir: Path): + """ + Validate that all the required files are present in the 'results' directory. + """ + assert isinstance(results_dir, Path) + framework = None + files = set(os.listdir(results_dir)) + if ".warpdrive" in files: + framework = "warpdrive" + # Warpdrive was used for training + for file in [ + "rice.py", + "rice_helpers.py", + "rice_cuda.py", + "rice_step.cu", + "rice_warpdrive.yaml", + ]: + if file not in files: + success = False + logging.error( + "%s is not present in the results directory: %s!", file, results_dir + ) + comment = f"{file} is not present in the results directory!" + break + success = True + comment = "Valid submission" + elif ".rllib" in files: + framework = "rllib" + # RLlib was used for training + for file in ["rice.py", "rice_helpers.py", "rice_rllib.yaml"]: + if file not in files: + success = False + logging.error( + "%s is not present in the results directory: %s!", file, results_dir + ) + comment = f"{file} is not present in the results directory!" + break + success = True + comment = "Valid submission" + else: + success = False + logging.error( + "Missing identifier file! Either the .rllib or the .warpdrive " + "file must be present in the results directory: %s", + results_dir, + ) + comment = "Missing identifier file!" + return framework, success, comment if __name__ == "__main__": - prepare_submission(results_dir=get_results_dir()[0]) + parser = ArgumentParser() + parser.add_argument( + "--results_dir", + "-r", + type=str, + help="the directory where all the submission files are saved. Can also be " + "the zipped file containing all the submission files.", + required=True, + ) + args = parser.parse_args() + results_dir = Path(args.results_dir) + prepare_submission(results_dir) diff --git a/scripts/evaluate_submission.py b/scripts/evaluate_submission.py index 6a99824..008fadb 100644 --- a/scripts/evaluate_submission.py +++ b/scripts/evaluate_submission.py @@ -24,6 +24,7 @@ import pickle as pkl from pathlib import Path from visualizeOutputs import construct_stacked_bar_chart +from create_submission_zip import prepare_submission, validate_dir _path = Path(os.path.abspath(__file__)) @@ -37,9 +38,6 @@ import pickle as pkl import numpy as np -from collections import Counter -import matplotlib.pyplot as plt -import pandas as pd # Set logger level e.g., DEBUG, INFO, WARNING, ERROR. logging.getLogger().setLevel(logging.ERROR) @@ -100,12 +98,12 @@ def get_imports(framework=None): assert framework is not None if framework == "rllib": from train_with_rllib import ( - create_trainer, fetch_episode_states, fetch_episode_states_freerider, fetch_episode_states_tariff, load_model_checkpoints, ) + from train import create_trainer elif framework == "warpdrive": from train_with_warp_drive import ( create_trainer, @@ -171,89 +169,7 @@ def get_results_dir(): raise ValueError("Cannot obtain the results directory") from err -def validate_dir(results_dir=None): - """ - Validate that all the required files are present in the 'results' directory. - """ - assert results_dir is not None - framework = None - - files = os.listdir(results_dir) - if ".warpdrive" in files: - framework = "warpdrive" - # Warpdrive was used for training - for file in [ - "rice.py", - "rice_helpers.py", - "rice_cuda.py", - "rice_step.cu", - "rice_warpdrive.yaml", - ]: - if file not in files: - success = False - logging.error( - "%s is not present in the results directory: %s!", file, results_dir - ) - comment = f"{file} is not present in the results directory!" - break - success = True - comment = "Valid submission" - elif ".rllib" in files: - framework = "rllib" - # RLlib was used for training - for file in ["rice.py", "rice_helpers.py", "rice_rllib.yaml"]: - if file not in files: - success = False - logging.error( - "%s is not present in the results directory: %s!", file, results_dir - ) - comment = f"{file} is not present in the results directory!" - break - success = True - comment = "Valid submission" - else: - success = False - logging.error( - "Missing identifier file! Either the .rllib or the .warpdrive " - "file must be present in the results directory: %s", - results_dir, - ) - comment = "Missing identifier file!" - - return framework, success, comment - - -def prepare_submission(results_dir: Path): - """ - # Validate all the submission files and compress into a .zip. - Note: This method is also invoked in the trainer script itself! - So if you ran the training script, you may not need to re-run this. - Args results_dir: the directory where all the training files were saved. - """ - assert isinstance(results_dir, Path) - - # Validate the results directory - # validate_dir(results_dir) - - # Make a temporary copy of the results directory for zipping - results_dir_copy = results_dir.parent / "tmp_copy" - shutil.copytree(results_dir, results_dir_copy) - - # Remove all the checkpoint state files from the tmp directory except for the last one - policy_models = list(results_dir_copy.glob("*.state_dict")) - policy_models = sorted(policy_models, key=lambda x: x.stat().st_mtime) - _ = [policy_model.unlink() for policy_model in policy_models[:-1]] - - # Create the submission file and delete the temporary copy - submission_file = Path("submissions") / results_dir.name - shutil.make_archive(submission_file, "zip", results_dir_copy) - print("NOTE: The submission file is created at:", submission_file.with_suffix(".zip")) - shutil.rmtree(results_dir_copy) - - return submission_file.with_suffix(".zip") - - -def compute_metrics(fetch_episode_states, trainer, framework, submission_file, env_config, log_config=None, num_episodes=1): +def compute_metrics(fetch_episode_states, trainer, framework, submission_file, log_config=None, num_episodes=1, include_c_e_idx=True): """ Generate episode rollouts and compute metrics. """ @@ -460,7 +376,7 @@ def perform_evaluation( eval_metrics = {} assert num_episodes > 0 - framework, success, comment = validate_dir(results_directory) + framework, success, comment = validate_dir(Path(results_directory)) submission_file = prepare_submission(Path(results_directory)) if success: @@ -507,9 +423,7 @@ def perform_evaluation( # Create trainer object try: - trainer, _ = create_trainer( - run_config, source_dir=results_directory, seed=eval_seed - ) + trainer = create_trainer(run_config, seed=eval_seed) # Load model checkpoints try: @@ -522,7 +436,6 @@ def perform_evaluation( trainer, framework, submission_file, - run_config["env"], run_config["logging"], num_episodes=num_episodes, ) diff --git a/scripts/visualizeOutputs.py b/scripts/visualizeOutputs.py index 5242e14..429e560 100644 --- a/scripts/visualizeOutputs.py +++ b/scripts/visualizeOutputs.py @@ -6,7 +6,7 @@ import os from fixed_paths import PUBLIC_REPO_DIR -def construct_stacked_bar_chart(global_states, wandb, +def construct_stacked_bar_chart(global_states, num_discrete_actions = 10, field = "mitigation_rate_all_regions"): @@ -52,7 +52,7 @@ def construct_stacked_bar_chart(global_states, wandb, plt.ylabel(f"# of Countries of a Given {y_axis_field}") plt.xlabel("Timesteps") plt.title(f"{field} Distribution") - wandb.log({f"{y_axis_field} Counts Across Time":plt}) + return plt