-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtrain_all_models.py
185 lines (160 loc) · 6.84 KB
/
train_all_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import logging
import traceback
from copy import deepcopy
from functools import partial
import torch
import wandb
from torch import multiprocessing
from tqdm import tqdm
from config import get_config, seed_all
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
TRAINABLE_MODELS = ["pe", "pe-discrete"]
ENVIRONMENTS = ["oderl-pendulum", "oderl-cartpole", "oderl-cancer", "oderl-acrobot"]
SAMPLING_POLICIES = ["discrete_monitoring", "discrete_planning", "continuous_planning", "active_observing_control"]
RETRAIN = False
FORCE_RETRAIN = False
START_FROM_CHECKPOINT = False
MODEL_TRAIN_SEED = 0
PRINT_SETTINGS = False
from mppi_with_model_active_observing import mppi_with_model_evaluate_single_step_active_observing
from train_utils import train_model
def train_model_wrapper(args, **kwargs):
try:
(env_name, model_name) = args
from config import dotdict, seed_all
config = kwargs["config"]
config = dotdict(config)
kwargs["config"] = config
logger = create_logger_in_process(config.log_path)
logger.info(f"[Now training model] {model_name} \t {env_name}")
seed_all(config.seed_start)
model, results = train_model(model_name, env_name, **kwargs)
results["errored"] = False
except Exception as e:
logger.exception(f"[Error] {e}")
logger.info(
f"[Failed training model] {env_name} {model_name} delay={delay} \t model_seed={MODEL_TRAIN_SEED} \t | error={e}"
)
traceback.print_exc()
results = {"errored": True}
print("")
results.update({"model_name": model_name, "env_name": env_name})
logger.info(f"[Training Result] {model_name} result={results}")
return results
def mppi_with_model_evaluate_single_step_wrapper(args, **kwargs):
try:
(env_name, model_name, threshold_percent, sampling_policy, seed) = args
from config import dotdict, seed_all
seed_all(seed)
config = kwargs["config"]
config = dotdict(deepcopy(config))
config.observing_var_threshold = threshold_percent
kwargs["config"] = config
logger = create_logger_in_process(config.log_path)
logger.info(f"[Now evaluating policy] {(env_name, model_name, threshold_percent, sampling_policy, seed)}")
results = mppi_with_model_evaluate_single_step_active_observing(
model_name=model_name,
env_name=env_name,
sampling_policy=sampling_policy,
seed=seed,
planner="mppi_active_observing",
**kwargs,
)
results["errored"] = False
except Exception as e:
logger.exception(f"[Error] {e}")
logger.info(
f"[Failed evaluating policy] {(env_name, model_name, threshold_percent, sampling_policy, seed)}\t| error={e}"
)
traceback.print_exc()
results = {"errored": True}
print("")
results.update(
{
"model_name": model_name,
"env_name": env_name,
"seed": seed,
"observing_var_threshold": threshold_percent,
"sampling_policy": sampling_policy,
}
)
return results
def train_all_models(config, wandb=None):
# Re-train all the models
model_training_results_l = []
pool_outer = multiprocessing.Pool(config.collect_expert_cores_per_env_sampler)
if config.retrain:
train_all_model_inputs = [
(env_name, model_name) for env_name in ENVIRONMENTS for model_name in TRAINABLE_MODELS
]
logger.info(f"Going to train for {len(train_all_model_inputs)} tasks")
with multiprocessing.Pool(
len(train_all_model_inputs)
) as pool_outer: # 12, or len(train_all_model_inputs) if GPU memory supports training all together.
multi_wrapper_train_model = partial(
train_model_wrapper,
config=dict(config),
wandb=None,
model_seed=config.model_seed,
retrain=config.retrain,
start_from_checkpoint=config.start_from_checkpoint,
force_retrain=config.force_retrain,
print_settings=config.print_settings,
evaluate_model_when_trained=False,
)
for i, result in tqdm(
enumerate(pool_outer.imap_unordered(multi_wrapper_train_model, train_all_model_inputs)),
total=len(train_all_model_inputs),
smoothing=0,
):
logger.info(f"[Model Completed training] {result}")
model_training_results_l.append(result)
# Tune the thresholds manually following method as described in the paper.
logger.info("[ACTION ITEM] Please now tune the thresholds manually following method as described in the paper.")
def generate_log_file_path(file, log_folder="logs"):
import logging
import os
import time
file_name = os.path.basename(os.path.realpath(file)).split(".py")[0]
from pathlib import Path
Path(f"./{log_folder}").mkdir(parents=True, exist_ok=True)
path_run_name = "{}-{}".format(file_name, time.strftime("%Y%m%d-%H%M%S"))
return f"{log_folder}/{path_run_name}_log.txt"
def create_logger_in_process(log_file_path):
logger = multiprocessing.get_logger()
if not logger.hasHandlers():
formatter = logging.Formatter("%(processName)s| %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s")
stream_handler = logging.StreamHandler()
file_handler = logging.FileHandler(log_file_path)
stream_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
return logger
if __name__ == "__main__":
log_path = generate_log_file_path(__file__)
logger = create_logger_in_process(log_path)
defaults = get_config()
defaults["log_path"] = log_path
defaults["multi_process_results"] = True # debug mode
if defaults["multi_process_results"]:
torch.multiprocessing.set_start_method("spawn")
defaults["retrain"] = RETRAIN
defaults["force_retrain"] = FORCE_RETRAIN
defaults["start_from_checkpoint"] = START_FROM_CHECKPOINT
defaults["print_settings"] = PRINT_SETTINGS
defaults["model_train_seed"] = MODEL_TRAIN_SEED
defaults["sweep_mode"] = True # Real run settings
# Test run settings
defaults["end_training_after_seconds"] = int(
1350 * 6.0 * 100.0
) # Train for a long time, i.e. until models converge. Usually after three days per model.
wandb.init(config=defaults, project=defaults["wandb_project"]) # , mode="disabled")
config = wandb.config
seed_all(0)
logger.info(f"Starting run \t | See log at : {log_path}")
train_all_models(config, wandb)
wandb.finish()
logger.info("Run over. Fin.")
logger.info(f"[Log found at] {log_path}")