From 87bf13642cabbe1ca2cd159ba1e5048025a26607 Mon Sep 17 00:00:00 2001 From: Olivier Mastropietro Date: Tue, 6 Jun 2017 22:11:13 -0400 Subject: [PATCH 1/4] sync resnet --- .../synchronous_resnet/resnet_controller.py | 150 +++++ example/synchronous_resnet/resnet_worker.py | 566 ++++++++++++++++++ 2 files changed, 716 insertions(+) create mode 100644 example/synchronous_resnet/resnet_controller.py create mode 100644 example/synchronous_resnet/resnet_worker.py diff --git a/example/synchronous_resnet/resnet_controller.py b/example/synchronous_resnet/resnet_controller.py new file mode 100644 index 0000000..6970ff8 --- /dev/null +++ b/example/synchronous_resnet/resnet_controller.py @@ -0,0 +1,150 @@ +from __future__ import absolute_import, print_function +import os +import sys +import time + +import numpy + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) +from platoon.channel import Controller + + +class ResNetController(Controller): + """ + This multi-process controller implements patience-based early-stopping SGD + """ + + def __init__(self, seed, patience, default_args): + """ + Initialize the ResNetController + + Parameters + ---------- + max_mb : int + Max number of minibatches to train on. + patience: : int + Training stops when this many minibatches have been trained on + without any reported improvement. + valid_freq : int + Number of minibatches to train on between every monitoring step. + default_args : dict + Arguments of default class Controller + """ + super(ResNetController, self).__init__(**default_args) + self.nb_worker = len(self._devices) + # map ids to members of range(nb_worker) + self.worker_ids_dict = dict(zip(self._workers, [i for i in range(len(self._workers))])) + + self.patience = patience + self.seed = seed + + self.valid_history_errs = [[None for i in range(self.nb_worker)]] + self.test_history_errs = [[None for i in range(self.nb_worker)]] + self.bad_counter = 0 + self._epoch = 0 + self.best_dict = dict(best__epoch=-1, best_valid=numpy.inf) + + + def handle_control(self, req, worker_id, req_info): + """ + Handles a control_request received from a worker + + Parameters + ---------- + req : str or dict + Control request received from a worker. + The control request can be one of the following + 1) "next" : request by a worker to be informed of its next action + to perform. The answers from the server can be 'train' (the + worker should keep training on its training data), 'valid' (the + worker should perform monitoring on its validation set and test + set) or 'stop' (the worker should stop training). + 2) dict of format {"done":N} : used by a worker to inform the + server that is has performed N more training iterations and + synced its parameters. The server will respond 'stop' if the + maximum number of training minibatches has been reached. + 3) dict of format {"valid_err":x, "test_err":x2} : used by a worker + to inform the server that it has performed a monitoring step + and obtained the included errors on the monitoring datasets. + The server will respond "best" if this is the best reported + validation error so far, otherwise it will respond 'stop' if + the patience has been exceeded. + """ + control_response = "" + worker_id = self.worker_ids_dict[worker_id] + + if req == 'pred_errors': + if self.valid_history_errs[self._epoch][worker_id] is not None: + # if a worker tries to add a valid error where there is no None + # it means it tries to index after or before current _epoch + raise RuntimeError('Worker got out of synch!') + self.valid_history_errs[self._epoch][worker_id] = req_info['valid_err'] + self.test_history_errs[self._epoch][worker_id] = req_info['test_err'] + + if not any([i is None for i in self.valid_history_errs[self._epoch]]): + print('Epoch %d is done'%req_info['epoch']) + valid_err = sum(self.valid_history_errs[self._epoch]) / float(self.nb_worker) + + if valid_err <= self.best_dict['best_valid']: + self.best_dict['best_epoch'] = self._epoch + self.best_dict['best_valid'] = valid_err + self.bad_counter = 0 + control_response = 'best' + print("Best error valid:", valid_err) + else: + self.bad_counter += 1 + self.valid_history_errs += [[None for i in range(self.nb_worker)]] + self.test_history_errs += [[None for i in range(self.nb_worker)]] + self._epoch += 1 + + elif req == 'splits': + # the controller never loads the dataset but the worker doesn't + # know how many workers there are + train_len = req_info['train_len'] // self.nb_worker + valid_len = req_info['valid_len'] // self.nb_worker + test_len = req_info['test_len'] // self.nb_worker + splits = dict(train_splits=[train_len * worker_id, train_len * (worker_id + 1)], + valid_splits=[valid_len * worker_id, valid_len * (worker_id + 1)], + test_splits=[test_len * worker_id, test_len * (worker_id + 1)]) + control_response = splits + + # kind of when the training start but not really + self.start_time = time.time() + + elif req == 'seed': + control_response = self.seed + + if self.bad_counter > self.patience: + print("Early stopping!") + end_time = time.time() - self.start_time + # should terminate with best printing and best dumping of params + # and then close everything + print("Best error valid:", self.best_dict['best_valid']) + test_err = sum(self.test_history_errs[self.best_dict['best_epoch']]) / \ + float(self.nb_worker) + print("Best error test:", test_err) + print( ("Training took %.1fs" % (end_time)), file=sys.stderr) + control_response = 'stop' + + return control_response + + +def resnet_control(saveFreq=1110, saveto=None): + parser = Controller.default_parser() + parser.add_argument('--seed', default=1234, type=int, + required=False, help='Maximum mini-batches to train upon in total.') + parser.add_argument('--patience', default=10, type=int, required=False, + help='Maximum patience when failing to get better validation results.') + args = parser.parse_args() + + l = ResNetController(seed=args.seed, + patience=args.patience, + default_args=Controller.default_arguments(args)) + + print("Controller is ready") + return l.serve() + +if __name__ == '__main__': + rcode = resnet_control() + if rcode != 0: + sys.exit(rcode) diff --git a/example/synchronous_resnet/resnet_worker.py b/example/synchronous_resnet/resnet_worker.py new file mode 100644 index 0000000..10e119e --- /dev/null +++ b/example/synchronous_resnet/resnet_worker.py @@ -0,0 +1,566 @@ +from __future__ import absolute_import, print_function +from collections import OrderedDict +import six +from six import iteritems +from six.moves import range + +import numpy +import theano +import theano.tensor as tensor +from theano import config + +import lasagne +from lasagne.utils import floatX +from lasagne.layers import InputLayer +from lasagne.layers import Conv2DLayer as ConvLayer +from lasagne.layers import BatchNormLayer +from lasagne.layers import Pool2DLayer as PoolLayer +from lasagne.layers import NonlinearityLayer +from lasagne.layers import ElemwiseSumLayer +from lasagne.layers import DenseLayer +from lasagne.nonlinearities import rectify, softmax + +from platoon.channel import Worker +from platoon.training.global_dynamics import AverageSGD + +worker = None + +def load_data(): + """ + create synthetic data + """ + def trgt_reshape(trgt): + return trgt.reshape((trgt.shape[0],1)) + + targets = numpy.arange(1000) + train_targets = trgt_reshape(numpy.repeat(targets, 2)) + train_data = numpy.random.random((train_targets.shape[0],3,224,224)) + valid_targets = trgt_reshape(numpy.repeat(targets, 1)) + valid_data = numpy.random.random((valid_targets.shape[0],3,224,224)) + test_targets = trgt_reshape(numpy.repeat(targets, 1)) + test_data = numpy.random.random((test_targets.shape[0],3,224,224)) + + rval = ([numpy_floatX(train_data), numpy_int32(train_targets)], + [numpy_floatX(valid_data), numpy_int32(valid_targets)], + [numpy_floatX(test_data), numpy_int32(test_targets)]) + return rval + + +def numpy_floatX(data): + return numpy.asarray(data, dtype=config.floatX) + +def numpy_int32(data): + return data.astype(numpy.int32) + + +def get_minibatches_idx(n, minibatch_size, shuffle=False): + """ + Used to shuffle the dataset at each iteration. + """ + + idx_list = numpy.arange(n, dtype="int32") + + if shuffle: + numpy.random.shuffle(idx_list) + + minibatches = [] + minibatch_start = 0 + for i in range(n // minibatch_size): + minibatches.append(idx_list[minibatch_start: + minibatch_start + minibatch_size]) + minibatch_start += minibatch_size + + if (minibatch_start != n): + # Make a minibatch out of what is left + minibatches.append(idx_list[minibatch_start:]) + + return zip(range(len(minibatches)), minibatches) + + +def infer_bc_pattern(shape): + pattern = [True if i == 1 else False for i in shape] + return tuple(pattern) + + +def sgd(lr, tparams, grads, x, y, cost): + """ Stochastic Gradient Descent + + :note: A more complicated version of sgd then needed. This is + done like that for adadelta and rmsprop. + + """ + # New set of shared variable that will contain the gradient + # for a mini-batch. + gshared = [theano.shared(p.get_value() * 0., name='%s_grad' % p.name, + broadcastable=infer_bc_pattern(p.get_value().shape)) + for p in tparams] + gsup = [(gs, g) for gs, g in zip(gshared, grads)] + #import ipdb; ipdb.set_trace() + #for i, gpair in enumerate(gsup): + # g = gpair[0] + # u = gpair[1] + # if g.broadcastable != u.broadcastable: + # #gsup[i] = (tensor.patternbroadcast(g, u.broadcastable), u) + # u.broadcastable = g.broadcastable + #import ipdb; ipdb.set_trace() + + # Function that computes gradients for a mini-batch, but do not + # updates the weights. + f_grad_shared = theano.function([x, y], cost, updates=gsup, + name='sgd_f_grad_shared') + + pup = [(p, p - lr * g) for p, g in zip(tparams, gshared)] + + # Function that updates the weights from the previously computed + # gradient. + f_update = theano.function([lr], [], updates=pup, + name='sgd_f_update') + + return f_grad_shared, f_update + + +def adadelta(lr, tparams, grads, x, y, cost): + """ + An adaptive learning rate optimizer + + Parameters + ---------- + lr : Theano SharedVariable + Initial learning rate + tpramas: Theano SharedVariable + Model parameters + grads: Theano variable + Gradients of cost w.r.t to parameres + x: Theano variable + Model inputs + mask: Theano variable + Sequence mask + y: Theano variable + Targets + cost: Theano variable + Objective fucntion to minimize + + Notes + ----- + For more information, see [ADADELTA]_. + + .. [ADADELTA] Matthew D. Zeiler, *ADADELTA: An Adaptive Learning + Rate Method*, arXiv:1212.5701. + """ + + zipped_grads = [theano.shared(p.get_value() * numpy_floatX(0.), + name='%s_grad' % k) + for k, p in iteritems(tparams)] + running_up2 = [theano.shared(p.get_value() * numpy_floatX(0.), + name='%s_rup2' % k) + for k, p in iteritems(tparams)] + running_grads2 = [theano.shared(p.get_value() * numpy_floatX(0.), + name='%s_rgrad2' % k) + for k, p in iteritems(tparams)] + + zgup = [(zg, g) for zg, g in zip(zipped_grads, grads)] + rg2up = [(rg2, 0.95 * rg2 + 0.05 * (g ** 2)) + for rg2, g in zip(running_grads2, grads)] + + f_grad_shared = theano.function([x, y], cost, updates=zgup + rg2up, + name='adadelta_f_grad_shared') + + updir = [-tensor.sqrt(ru2 + 1e-6) / tensor.sqrt(rg2 + 1e-6) * zg + for zg, ru2, rg2 in zip(zipped_grads, + running_up2, + running_grads2)] + ru2up = [(ru2, 0.95 * ru2 + 0.05 * (ud ** 2)) + for ru2, ud in zip(running_up2, updir)] + param_up = [(p, p + ud) for p, ud in zip(tparams.values(), updir)] + + f_update = theano.function([lr], [], updates=ru2up + param_up, + on_unused_input='ignore', + name='adadelta_f_update') + + return f_grad_shared, f_update + + +def rmsprop(lr, tparams, grads, x, y, cost): + """ + A variant of SGD that scales the step size by running average of the + recent step norms. + + Parameters + ---------- + lr : Theano SharedVariable + Initial learning rate + tpramas: Theano SharedVariable + Model parameters + grads: Theano variable + Gradients of cost w.r.t to parameres + x: Theano variable + Model inputs + mask: Theano variable + Sequence mask + y: Theano variable + Targets + cost: Theano variable + Objective fucntion to minimize + + Notes + ----- + For more information, see [Hint2014]_. + + .. [Hint2014] Geoff Hinton, *Neural Networks for Machine Learning*, + lecture 6a, + http://cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf + """ + + zipped_grads = [theano.shared(p.get_value() * numpy_floatX(0.), + name='%s_grad' % k) + for k, p in iteritems(tparams)] + running_grads = [theano.shared(p.get_value() * numpy_floatX(0.), + name='%s_rgrad' % k) + for k, p in iteritems(tparams)] + running_grads2 = [theano.shared(p.get_value() * numpy_floatX(0.), + name='%s_rgrad2' % k) + for k, p in iteritems(tparams)] + + zgup = [(zg, g) for zg, g in zip(zipped_grads, grads)] + rgup = [(rg, 0.95 * rg + 0.05 * g) for rg, g in zip(running_grads, grads)] + rg2up = [(rg2, 0.95 * rg2 + 0.05 * (g ** 2)) + for rg2, g in zip(running_grads2, grads)] + + f_grad_shared = theano.function([x, y], cost, + updates=zgup + rgup + rg2up, + name='rmsprop_f_grad_shared') + + updir = [theano.shared(p.get_value() * numpy_floatX(0.), + name='%s_updir' % k) + for k, p in iteritems(tparams)] + updir_new = [(ud, 0.9 * ud - 1e-4 * zg / tensor.sqrt(rg2 - rg ** 2 + 1e-4)) + for ud, zg, rg, rg2 in zip(updir, zipped_grads, running_grads, + running_grads2)] + param_up = [(p, p + udn[1]) + for p, udn in zip(tparams.values(), updir_new)] + f_update = theano.function([lr], [], updates=updir_new + param_up, + on_unused_input='ignore', + name='rmsprop_f_update') + + return f_grad_shared, f_update + +def build_simple_block(incoming_layer, names, + num_filters, filter_size, stride, pad, + use_bias=False, nonlin=rectify): + """Creates stacked Lasagne layers ConvLayer -> BN -> (ReLu) + + Parameters: + ---------- + incoming_layer : instance of Lasagne layer + Parent layer + + names : list of string + Names of the layers in block + + num_filters : int + Number of filters in convolution layer + + filter_size : int + Size of filters in convolution layer + + stride : int + Stride of convolution layer + + pad : int + Padding of convolution layer + + use_bias : bool + Whether to use bias in conlovution layer + + nonlin : function + Nonlinearity type of Nonlinearity layer + + Returns + ------- + tuple: (net, last_layer_name) + net : dict + Dictionary with stacked layers + last_layer_name : string + Last layer name + """ + net = [] + net.append(( + names[0], + ConvLayer(incoming_layer, num_filters, filter_size, stride, pad, + flip_filters=False, nonlinearity=None) if use_bias + else ConvLayer(incoming_layer, num_filters, filter_size, stride, pad, b=None, + flip_filters=False, nonlinearity=None) + )) + + net.append(( + names[1], + BatchNormLayer(net[-1][1]) + )) + if nonlin is not None: + net.append(( + names[2], + NonlinearityLayer(net[-1][1], nonlinearity=nonlin) + )) + + return dict(net), net[-1][0] + + + +simple_block_name_pattern = ['res%s_branch%i%s', 'bn%s_branch%i%s', 'res%s_branch%i%s_relu'] + +def build_residual_block(incoming_layer, ratio_n_filter=1.0, ratio_size=1.0, has_left_branch=False, + upscale_factor=4, ix=''): + """Creates two-branch residual block + + Parameters: + ---------- + incoming_layer : instance of Lasagne layer + Parent layer + + ratio_n_filter : float + Scale factor of filter bank at the input of residual block + + ratio_size : float + Scale factor of filter size + + has_left_branch : bool + if True, then left branch contains simple block + + upscale_factor : float + Scale factor of filter bank at the output of residual block + + ix : int + Id of residual block + + Returns + ------- + tuple: (net, last_layer_name) + net : dict + Dictionary with stacked layers + last_layer_name : string + Last layer name + """ + net = {} + + # right branch + net_tmp, last_layer_name = build_simple_block( + incoming_layer, map(lambda s: s % (ix, 2, 'a'), simple_block_name_pattern), + int(lasagne.layers.get_output_shape(incoming_layer)[1]*ratio_n_filter), + 1, int(1.0/ratio_size), 0) + net.update(net_tmp) + + net_tmp, last_layer_name = build_simple_block( + net[last_layer_name], map(lambda s: s % (ix, 2, 'b'), simple_block_name_pattern), + lasagne.layers.get_output_shape(net[last_layer_name])[1], 3, 1, 1) + net.update(net_tmp) + + net_tmp, last_layer_name = build_simple_block( + net[last_layer_name], map(lambda s: s % (ix, 2, 'c'), simple_block_name_pattern), + lasagne.layers.get_output_shape(net[last_layer_name])[1]*upscale_factor, 1, 1, 0, + nonlin=None) + net.update(net_tmp) + + right_tail = net[last_layer_name] + left_tail = incoming_layer + + # left branch + if has_left_branch: + net_tmp, last_layer_name = build_simple_block( + incoming_layer, map(lambda s: s % (ix, 1, ''), simple_block_name_pattern), + int(lasagne.layers.get_output_shape(incoming_layer)[1]*4*ratio_n_filter), + 1, int(1.0/ratio_size), 0, + nonlin=None) + net.update(net_tmp) + left_tail = net[last_layer_name] + + net['res%s' % ix] = ElemwiseSumLayer([left_tail, right_tail], coeffs=1) + net['res%s_relu' % ix] = NonlinearityLayer(net['res%s' % ix], nonlinearity=rectify) + + return net, 'res%s_relu' % ix + + +def build_resnet(): + net = {} + net['input'] = InputLayer((None, 3, 224, 224)) + sub_net, parent_layer_name = build_simple_block( + net['input'], ['conv1', 'bn_conv1', 'conv1_relu'], + 64, 7, 2, 3, use_bias=True) + net.update(sub_net) + net['pool1'] = PoolLayer(net[parent_layer_name], pool_size=3, stride=2, + pad=0, mode='max', ignore_border=False) + + block_size = list('abc') + parent_layer_name = 'pool1' + for c in block_size: + if c == 'a': + sub_net, parent_layer_name = build_residual_block(net[parent_layer_name], + 1, 1, True, 4, ix='2%s' % c) + else: + sub_net, parent_layer_name = build_residual_block(net[parent_layer_name], + 1.0/4, 1, False, 4, ix='2%s' % c) + net.update(sub_net) + + block_size = list('abcd') + for c in block_size: + if c == 'a': + sub_net, parent_layer_name = build_residual_block(net[parent_layer_name], + 1.0/2, 1.0/2, True, 4, ix='3%s' % c) + else: + sub_net, parent_layer_name = build_residual_block(net[parent_layer_name], + 1.0/4, 1, False, 4, ix='3%s' % c) + net.update(sub_net) + + block_size = list('abcdef') + for c in block_size: + if c == 'a': + sub_net, parent_layer_name = build_residual_block(net[parent_layer_name], + 1.0/2, 1.0/2, True, 4, ix='4%s' % c) + else: + sub_net, parent_layer_name = build_residual_block(net[parent_layer_name], + 1.0/4, 1, False, 4, ix='4%s' % c) + net.update(sub_net) + + block_size = list('abc') + for c in block_size: + if c == 'a': + sub_net, parent_layer_name = build_residual_block(net[parent_layer_name], + 1.0/2, 1.0/2, True, 4, ix='5%s' % c) + else: + sub_net, parent_layer_name = build_residual_block(net[parent_layer_name], + 1.0/4, 1, False, 4, ix='5%s' % c) + net.update(sub_net) + + net['pool5'] = PoolLayer(net[parent_layer_name], pool_size=7, stride=1, pad=0, + mode='average_exc_pad', ignore_border=False) + net['fc1000'] = DenseLayer(net['pool5'], num_units=1000, nonlinearity=None) + net['prob'] = NonlinearityLayer(net['fc1000'], nonlinearity=softmax) + + return net + + +def split_data_for_worker(dataset, splits, name): + data = dataset[0][splits[name][0]:splits[name][1]] + targets = dataset[1][splits[name][0]:splits[name][1]] + return [data, targets] + + +def pred_error(f_pred, data, iterator): + """ + Just compute the error + f_pred: Theano fct computing the prediction + """ + valid_err = 0. + i = 0 + for _, valid_index in iterator: + x = [data[0][t] for t in valid_index] + y = [data[1][t] for t in valid_index] + valid_err += f_pred(x, y) + i += 1 + + return valid_err / i + + +def train_resnet( + batch_size=12, # The batch size during training. + valid_batch_size=12, # The batch size used for validation/test set. + validFreq=5, + lrate=1e-4, + optimizer=sgd, +): + + # Each worker needs the same seed in order to draw the same parameters. + # This will also make them shuffle the batches the same way, but splits are + # different so doesnt matter + seed = worker.send_req('seed') + numpy.random.seed(seed) + + print('Loading data') + train, valid, test = load_data() + + print('Building model') + resnet = build_resnet() + params = lasagne.layers.get_all_params(resnet.values(), trainable=True) + + print("Using all_reduce worker's interface!") + asgd = AverageSGD(worker) + asgd.make_rule(params) + print("Params init done") + + x = tensor.ftensor4('x') + y = tensor.imatrix('y') + + prob = lasagne.layers.get_output(resnet['prob'], x, deterministic=False) + cost = tensor.nnet.categorical_crossentropy(prob, y.flatten()).mean() + + grads = tensor.grad(cost, wrt=params) + lr = tensor.scalar(name='lr') + f_grad_shared, f_update = optimizer(lr, params, grads, + x, y, cost) + + v_prob = lasagne.layers.get_output(resnet['prob'], x, deterministic=True) + v_mc = tensor.mean(tensor.neq(tensor.argmax(prob, axis=1), y.flatten())) + f_pred = theano.function([x,y], v_mc) + + print('Optimization') + splits = worker.send_req('splits', {'train_len': train[0].shape[0], + 'valid_len': valid[0].shape[0], + 'test_len' : test[0].shape[0]}) + train = split_data_for_worker(train, splits, 'train_splits') + valid = split_data_for_worker(valid, splits, 'valid_splits') + test = split_data_for_worker(test, splits, 'test_splits') + + kf_valid = get_minibatches_idx(valid[0].shape[0], valid_batch_size) + kf_test = get_minibatches_idx(test[0].shape[0], valid_batch_size) + + def train_iter(): + while True: + kf = get_minibatches_idx(train[0].shape[0], batch_size, shuffle=True) + for _, train_index in kf: + y = [train[1][t] for t in train_index] + x = [train[0][t] for t in train_index] + yield x, y + + train_it = train_iter() + nb_train = train[0].shape[0] // batch_size + + epoch = 0 + while True: + for i in range(nb_train): + x, y = next(train_it) + cost = f_grad_shared(x, y) + f_update(lrate) + asgd() + + print('Train cost:', cost) + + if numpy.mod(epoch, validFreq) == 0: + # do validation + # trick : each worker can do their valid without talking to the controller + # even if they finish before another worker, they will wait in the next + # epoch at the calling of all_reduce when they need to sync again + valid_err = pred_error(f_pred, valid, kf_valid) + test_err = pred_error(f_pred, test, kf_test) + + # they do need to send the result to the controller + res = worker.send_req('pred_errors', dict(test_err=float(test_err), + valid_err=float(valid_err), epoch=epoch)) + + if res == 'best': + # should save the param at best + pass + + if res == 'stop': + break + epoch += 1 + + # Release all shared resources. + worker.close() + + +if __name__ == '__main__': + # See function train for all possible parameter and there definition. + parser = Worker.default_parser() + args = parser.parse_args() + + worker = Worker(**Worker.default_arguments(args)) + train_resnet() From 188ddba9758af8b2d6dc86ac99e2ba08470e63d4 Mon Sep 17 00:00:00 2001 From: Olivier Mastropietro Date: Tue, 20 Jun 2017 17:49:52 -0400 Subject: [PATCH 2/4] some small timing changes --- .../synchronous_resnet/resnet_controller.py | 4 ++ example/synchronous_resnet/resnet_worker.py | 42 +++++++++---------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/example/synchronous_resnet/resnet_controller.py b/example/synchronous_resnet/resnet_controller.py index 6970ff8..6d639db 100644 --- a/example/synchronous_resnet/resnet_controller.py +++ b/example/synchronous_resnet/resnet_controller.py @@ -97,6 +97,10 @@ def handle_control(self, req, worker_id, req_info): self.test_history_errs += [[None for i in range(self.nb_worker)]] self._epoch += 1 + elif req == 'time': + print("Epoch time", time.time() - self.start_time) + control_response = 'stop' + elif req == 'splits': # the controller never loads the dataset but the worker doesn't # know how many workers there are diff --git a/example/synchronous_resnet/resnet_worker.py b/example/synchronous_resnet/resnet_worker.py index 10e119e..56a4960 100644 --- a/example/synchronous_resnet/resnet_worker.py +++ b/example/synchronous_resnet/resnet_worker.py @@ -1,5 +1,6 @@ from __future__ import absolute_import, print_function from collections import OrderedDict +import time import six from six import iteritems from six.moves import range @@ -29,15 +30,11 @@ def load_data(): """ create synthetic data """ - def trgt_reshape(trgt): - return trgt.reshape((trgt.shape[0],1)) - - targets = numpy.arange(1000) - train_targets = trgt_reshape(numpy.repeat(targets, 2)) + train_targets = numpy.random.randint(1000, size=(2048,1)) train_data = numpy.random.random((train_targets.shape[0],3,224,224)) - valid_targets = trgt_reshape(numpy.repeat(targets, 1)) + valid_targets = numpy.random.randint(1000, size=(1024,1)) valid_data = numpy.random.random((valid_targets.shape[0],3,224,224)) - test_targets = trgt_reshape(numpy.repeat(targets, 1)) + test_targets = numpy.random.randint(1000, size=(1024,1)) test_data = numpy.random.random((test_targets.shape[0],3,224,224)) rval = ([numpy_floatX(train_data), numpy_int32(train_targets)], @@ -95,14 +92,6 @@ def sgd(lr, tparams, grads, x, y, cost): broadcastable=infer_bc_pattern(p.get_value().shape)) for p in tparams] gsup = [(gs, g) for gs, g in zip(gshared, grads)] - #import ipdb; ipdb.set_trace() - #for i, gpair in enumerate(gsup): - # g = gpair[0] - # u = gpair[1] - # if g.broadcastable != u.broadcastable: - # #gsup[i] = (tensor.patternbroadcast(g, u.broadcastable), u) - # u.broadcastable = g.broadcastable - #import ipdb; ipdb.set_trace() # Function that computes gradients for a mini-batch, but do not # updates the weights. @@ -461,13 +450,13 @@ def pred_error(f_pred, data, iterator): def train_resnet( - batch_size=12, # The batch size during training. - valid_batch_size=12, # The batch size used for validation/test set. + batch_size=64, # The batch size during training. + valid_batch_size=64, # The batch size used for validation/test set. validFreq=5, lrate=1e-4, optimizer=sgd, ): - + print(theano.config.profile) # Each worker needs the same seed in order to draw the same parameters. # This will also make them shuffle the batches the same way, but splits are # different so doesnt matter @@ -514,7 +503,7 @@ def train_resnet( def train_iter(): while True: - kf = get_minibatches_idx(train[0].shape[0], batch_size, shuffle=True) + kf = get_minibatches_idx(train[0].shape[0], batch_size, shuffle=False) for _, train_index in kf: y = [train[1][t] for t in train_index] x = [train[0][t] for t in train_index] @@ -523,13 +512,24 @@ def train_iter(): train_it = train_iter() nb_train = train[0].shape[0] // batch_size + # first pass in function so it doesnt bias the next time count + # because of the dnn flags + dummy_x = numpy_floatX(numpy.random.random((batch_size,3,224,224))) + dummy_y = numpy_int32(numpy.random.randint(1000, size=(batch_size,1))) + dumz = f_grad_shared(dummy_x, dummy_y) + epoch = 0 while True: for i in range(nb_train): x, y = next(train_it) + func_time = time.time() cost = f_grad_shared(x, y) f_update(lrate) + print("Func call time", time.time() - func_time) + overhead_time = time.time() asgd() + print("Overhead time", time.time() - overhead_time) + res = worker.send_req('time') print('Train cost:', cost) @@ -549,8 +549,8 @@ def train_iter(): # should save the param at best pass - if res == 'stop': - break + if res == 'stop': + break epoch += 1 # Release all shared resources. From 7ec91674f127677f98b7faa5365ee1ccd7409b7b Mon Sep 17 00:00:00 2001 From: Reyhane Askari Date: Mon, 28 Aug 2017 10:40:47 -0400 Subject: [PATCH 3/4] sync_shared added --- example/synchronous_resnet/resnet_worker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/example/synchronous_resnet/resnet_worker.py b/example/synchronous_resnet/resnet_worker.py index 56a4960..ed56c99 100644 --- a/example/synchronous_resnet/resnet_worker.py +++ b/example/synchronous_resnet/resnet_worker.py @@ -444,6 +444,7 @@ def pred_error(f_pred, data, iterator): x = [data[0][t] for t in valid_index] y = [data[1][t] for t in valid_index] valid_err += f_pred(x, y) + f_pred.sync_shared() i += 1 return valid_err / i @@ -524,7 +525,9 @@ def train_iter(): x, y = next(train_it) func_time = time.time() cost = f_grad_shared(x, y) + cost.sync_shared() f_update(lrate) + f_update.sync_shared() print("Func call time", time.time() - func_time) overhead_time = time.time() asgd() From 663bdd8805b8ce8c025c1165c1a9c22a68cc44ea Mon Sep 17 00:00:00 2001 From: Reyhane Askari Date: Mon, 28 Aug 2017 12:45:03 -0400 Subject: [PATCH 4/4] minor fix --- example/synchronous_resnet/resnet_worker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/example/synchronous_resnet/resnet_worker.py b/example/synchronous_resnet/resnet_worker.py index ed56c99..e08b739 100644 --- a/example/synchronous_resnet/resnet_worker.py +++ b/example/synchronous_resnet/resnet_worker.py @@ -444,7 +444,6 @@ def pred_error(f_pred, data, iterator): x = [data[0][t] for t in valid_index] y = [data[1][t] for t in valid_index] valid_err += f_pred(x, y) - f_pred.sync_shared() i += 1 return valid_err / i @@ -525,13 +524,13 @@ def train_iter(): x, y = next(train_it) func_time = time.time() cost = f_grad_shared(x, y) - cost.sync_shared() f_update(lrate) - f_update.sync_shared() print("Func call time", time.time() - func_time) overhead_time = time.time() asgd() print("Overhead time", time.time() - overhead_time) + f_grad_shared.sync_shared() + f_update.sync_shared() res = worker.send_req('time') print('Train cost:', cost)