forked from voiler/PopulationBasedTraining
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
133 lines (122 loc) · 5.26 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import argparse
import os
import pathlib
import numpy as np
import torch
import torch.nn as nn
import torch.multiprocessing as _mp
from torchvision.datasets import MNIST
import torchvision.transforms as transforms
from model import Net
from trainer import Trainer
from utils import get_optimizer, exploit_and_explore
mp = _mp.get_context('spawn')
class Worker(mp.Process):
def __init__(self, batch_size, epoch, max_epoch, train_data, test_data, population, finish_tasks,
device):
super().__init__()
self.epoch = epoch
self.population = population
self.finish_tasks = finish_tasks
self.max_epoch = max_epoch
self.batch_size = batch_size
self.device = device
model = Net().to(device)
optimizer = get_optimizer(model)
self.trainer = Trainer(model=model,
optimizer=optimizer,
loss_fn=nn.CrossEntropyLoss(),
train_data=train_data,
test_data=test_data,
batch_size=self.batch_size,
device=self.device)
def run(self):
while True:
if self.epoch.value > self.max_epoch:
break
# Train
task = self.population.get()
self.trainer.set_id(task['id'])
checkpoint_path = "checkpoints/task-%03d.pth" % task['id']
if os.path.isfile(checkpoint_path):
self.trainer.load_checkpoint(checkpoint_path)
try:
self.trainer.train()
score = self.trainer.eval()
self.trainer.save_checkpoint(checkpoint_path)
self.finish_tasks.put(dict(id=task['id'], score=score))
except KeyboardInterrupt:
break
class Explorer(mp.Process):
def __init__(self, epoch, max_epoch, population, finish_tasks, hyper_params):
super().__init__()
self.epoch = epoch
self.population = population
self.finish_tasks = finish_tasks
self.max_epoch = max_epoch
self.hyper_params = hyper_params
def run(self):
while True:
if self.epoch.value > self.max_epoch:
break
if self.population.empty() and self.finish_tasks.full():
print("Exploit and explore")
tasks = []
while not self.finish_tasks.empty():
tasks.append(self.finish_tasks.get())
tasks = sorted(tasks, key=lambda x: x['score'], reverse=True)
print('Best score on', tasks[0]['id'], 'is', tasks[0]['score'])
print('Worst score on', tasks[-1]['id'], 'is', tasks[-1]['score'])
fraction = 0.2
cutoff = int(np.ceil(fraction * len(tasks)))
tops = tasks[:cutoff]
bottoms = tasks[len(tasks) - cutoff:]
for bottom in bottoms:
top = np.random.choice(tops)
top_checkpoint_path = "checkpoints/task-%03d.pth" % top['id']
bot_checkpoint_path = "checkpoints/task-%03d.pth" % bottom['id']
exploit_and_explore(top_checkpoint_path, bot_checkpoint_path, self.hyper_params)
with self.epoch.get_lock():
self.epoch.value += 1
for task in tasks:
self.population.put(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Population Based Training")
parser.add_argument("--device", type=str, default='cuda',
help="")
parser.add_argument("--population_size", type=int, default=10,
help="")
parser.add_argument("--batch_size", type=int, default=20,
help="")
args = parser.parse_args()
# mp.set_start_method("spawn")
mp = mp.get_context('forkserver')
device = args.device
if not torch.cuda.is_available():
device = 'cpu'
population_size = args.population_size
batch_size = args.batch_size
max_epoch = 20
pathlib.Path('checkpoints').mkdir(exist_ok=True)
checkpoint_str = "checkpoints/task-%03d.pth"
population = mp.Queue(maxsize=population_size)
finish_tasks = mp.Queue(maxsize=population_size)
epoch = mp.Value('i', 0)
for i in range(population_size):
population.put(dict(id=i, score=0))
hyper_params = {'optimizer': ["lr", "momentum"], "batch_size": True}
train_data_path = test_data_path = './data'
train_data = MNIST(train_data_path, True, transforms.ToTensor(), download=True)
test_data = MNIST(test_data_path, False, transforms.ToTensor(), download=True)
workers = [Worker(batch_size, epoch, max_epoch, train_data, test_data, population, finish_tasks, device)
for _ in range(3)]
workers.append(Explorer(epoch, max_epoch, population, finish_tasks, hyper_params))
[w.start() for w in workers]
[w.join() for w in workers]
task = []
while not finish_tasks.empty():
task.append(finish_tasks.get())
while not population.empty():
task.append(population.get())
task = sorted(task, key=lambda x: x['score'], reverse=True)
print('best score on', task[0]['id'], 'is', task[0]['score'])