From 01f834d3e373e20ad5acc252ac1ff4557cae19ce Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 4 Dec 2019 13:39:36 -0800 Subject: [PATCH 01/10] add zoo train --- AR_mem/model.py | 14 +++++++++---- train_mem_model_zoo.py | 47 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) create mode 100644 train_mem_model_zoo.py diff --git a/AR_mem/model.py b/AR_mem/model.py index a78bfa4..99c4889 100644 --- a/AR_mem/model.py +++ b/AR_mem/model.py @@ -4,12 +4,15 @@ # AR_memory class Model(object): - def __init__(self, config): + def __init__(self, config, input_x=None, memories=None, targets=None): self.config = config self.global_step = tf.Variable(0, trainable=False, name="global_step") self.regularizer = layers.l2_regularizer(self.config.l2_lambda) self.sess = None self.saver = None + self.input_x = input_x + self.memories = memories + self.targets = targets self._build_model() def _build_model(self): @@ -54,9 +57,12 @@ def _build_model(self): self.initialize_session() def add_placeholder(self): - self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x") - self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets") - self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32, + if self.input_x is None: + self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x") + if self.targets is None: + self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets") + if self.memories is None: + self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32, name="memories") # self.targets = tf.placeholder(shape=[None], dtype=tf.int32, name="targets") self.dropout = tf.placeholder(dtype=tf.float32, name="dropout") diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py new file mode 100644 index 0000000..2e2203a --- /dev/null +++ b/train_mem_model_zoo.py @@ -0,0 +1,47 @@ +from zoo import init_nncontext +from zoo.tfpark import TFOptimizer, TFDataset +from bigdl.optim.optimizer import * +from data_utils import load_agg_selected_data_mem_train +from AR_mem.config import Config +from AR_mem.model import Model + + +if __name__ == "__main__": + + data_path = sys.argv[1] + batch_size = int(sys.argv[2]) + num_epochs = int(sys.argv[3]) + model_dir = sys.argv[4] + + config = Config() + config.data_path = data_path + config.latest_model=False + + # init or get SparkContext + sc = init_nncontext() + + # create train data + train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \ + load_agg_selected_data_mem_train(data_path=config.data_path, + x_len=config.x_len, + y_len=config.y_len, + foresight=config.foresight, + cell_ids=config.train_cell_ids, + dev_ratio=config.dev_ratio, + test_len=config.test_len, + seed=config.seed) + + print("train_x shape: ", train_x.shape) + print("train_m shape: ", train_m.shape) + print("train_y shape: ", train_y.shape) + + + # model_dir = config.model_dir + + dataset = TFDataset.from_ndarrays([train_x, train_m, train_y], batch_size=batch_size, val_tensors=[dev_x, dev_m, dev_y],) + + model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) + optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae}, model_dir=model_dir) + + optimizer.optimize(end_trigger=MaxEpoch(num_epochs)) + From 8eb7a39ea1dcd96864d5e7a131d4454ee9518d82 Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 18 Dec 2019 00:58:01 -0800 Subject: [PATCH 02/10] update train code --- train_mem_model_zoo.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py index 2e2203a..b61d840 100644 --- a/train_mem_model_zoo.py +++ b/train_mem_model_zoo.py @@ -4,6 +4,8 @@ from data_utils import load_agg_selected_data_mem_train from AR_mem.config import Config from AR_mem.model import Model +import tensorflow as tf +from zoo.common import set_core_number if __name__ == "__main__": @@ -31,17 +33,12 @@ test_len=config.test_len, seed=config.seed) - print("train_x shape: ", train_x.shape) - print("train_m shape: ", train_m.shape) - print("train_y shape: ", train_y.shape) - - - # model_dir = config.model_dir - dataset = TFDataset.from_ndarrays([train_x, train_m, train_y], batch_size=batch_size, val_tensors=[dev_x, dev_m, dev_y],) model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) - optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae}, model_dir=model_dir) + optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), + metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae}, + model_dir=model_dir) optimizer.optimize(end_trigger=MaxEpoch(num_epochs)) From 7d1d868add47231cbb1c3d783eea78e89d499e93 Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 18 Dec 2019 01:24:13 -0800 Subject: [PATCH 03/10] update core-num and thread-num --- train_mem_model_zoo.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py index b61d840..bdbb303 100644 --- a/train_mem_model_zoo.py +++ b/train_mem_model_zoo.py @@ -4,6 +4,7 @@ from data_utils import load_agg_selected_data_mem_train from AR_mem.config import Config from AR_mem.model import Model +from time import time import tensorflow as tf from zoo.common import set_core_number @@ -14,6 +15,14 @@ batch_size = int(sys.argv[2]) num_epochs = int(sys.argv[3]) model_dir = sys.argv[4] + if len(sys.argv) > 5: + core_num = sys.argv[5] + else: + core_num = 4 + if len(sys.argv) > 6: + thread_num = sys.argv[6] + else: + thread_num = 10 config = Config() config.data_path = data_path @@ -21,6 +30,7 @@ # init or get SparkContext sc = init_nncontext() + set_core_number(core_num) # create train data train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \ @@ -38,7 +48,15 @@ model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae}, - model_dir=model_dir) + model_dir=model_dir, + session_config=tf.ConfigProto(inter_op_parallelism_threads=1, + intra_op_parallelism_threads=thread_num) + ) + start_time = time() optimizer.optimize(end_trigger=MaxEpoch(num_epochs)) + end_time = time() + + print("Elapsed training time {} secs".format(end_time - start_time)) + From d3c1da48ae1a49d9dd13f4ebd9359c703fc91d2f Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 18 Dec 2019 13:18:30 -0800 Subject: [PATCH 04/10] add run script --- run_train_mem_model_zoo.sh | 5 +++++ train_mem_model_zoo.py | 8 ++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) create mode 100755 run_train_mem_model_zoo.sh diff --git a/run_train_mem_model_zoo.sh b/run_train_mem_model_zoo.sh new file mode 100755 index 0000000..e453367 --- /dev/null +++ b/run_train_mem_model_zoo.sh @@ -0,0 +1,5 @@ +export OMP_NUM_THREADS=10 +${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ + --master local[4] \ + --driver-memory 20g \ + train_mem_model_zoo.py /home/nvkvs/data/aggregated_5min_scaled.csv 128 10 /home/nvkvs/ARMemNet-jennie/model diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py index bdbb303..9c01421 100644 --- a/train_mem_model_zoo.py +++ b/train_mem_model_zoo.py @@ -15,12 +15,14 @@ batch_size = int(sys.argv[2]) num_epochs = int(sys.argv[3]) model_dir = sys.argv[4] + + # For tuning if len(sys.argv) > 5: - core_num = sys.argv[5] + core_num = int(sys.argv[5]) else: core_num = 4 if len(sys.argv) > 6: - thread_num = sys.argv[6] + thread_num = int(sys.argv[6]) else: thread_num = 10 @@ -30,6 +32,8 @@ # init or get SparkContext sc = init_nncontext() + + # tuning set_core_number(core_num) # create train data From d01c77e810580a83df1ff65f142987e3154a907f Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 18 Dec 2019 13:20:39 -0800 Subject: [PATCH 05/10] update run script name --- run_train_mem_model_zoo.sh | 5 ----- 1 file changed, 5 deletions(-) delete mode 100755 run_train_mem_model_zoo.sh diff --git a/run_train_mem_model_zoo.sh b/run_train_mem_model_zoo.sh deleted file mode 100755 index e453367..0000000 --- a/run_train_mem_model_zoo.sh +++ /dev/null @@ -1,5 +0,0 @@ -export OMP_NUM_THREADS=10 -${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ - --master local[4] \ - --driver-memory 20g \ - train_mem_model_zoo.py /home/nvkvs/data/aggregated_5min_scaled.csv 128 10 /home/nvkvs/ARMemNet-jennie/model From d88a26d566c6256c28ba9a87f5f7df9734fe74f4 Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 18 Dec 2019 13:21:19 -0800 Subject: [PATCH 06/10] update run script name --- run_train_mem_model_zoo_local.sh | 5 +++++ 1 file changed, 5 insertions(+) create mode 100755 run_train_mem_model_zoo_local.sh diff --git a/run_train_mem_model_zoo_local.sh b/run_train_mem_model_zoo_local.sh new file mode 100755 index 0000000..e453367 --- /dev/null +++ b/run_train_mem_model_zoo_local.sh @@ -0,0 +1,5 @@ +export OMP_NUM_THREADS=10 +${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ + --master local[4] \ + --driver-memory 20g \ + train_mem_model_zoo.py /home/nvkvs/data/aggregated_5min_scaled.csv 128 10 /home/nvkvs/ARMemNet-jennie/model From 310fbb14273080111f46586b0b0833fa2be72e42 Mon Sep 17 00:00:00 2001 From: jenniew Date: Thu, 16 Jan 2020 15:19:02 +0800 Subject: [PATCH 07/10] change preprocess with spark --- data_utils.py | 221 +++++++++++++++++++++++++++++++ run_train_mem_model_zoo_local.sh | 6 +- train_mem_model_zoo.py | 52 ++++++-- 3 files changed, 263 insertions(+), 16 deletions(-) diff --git a/data_utils.py b/data_utils.py index 33bb016..b2b8026 100644 --- a/data_utils.py +++ b/data_utils.py @@ -3,6 +3,10 @@ import numpy as np import logging import pickle +import os +import math +import tqdm + logger = logging.getLogger() @@ -469,5 +473,222 @@ def load_agg_data_all(data_path='../data/aggregated_data_5min_scaled.csv', ncell return full_x +# get dataset from given preprocessed_dir +def get_datasets_from_dir(preprocessed_dir, batch_size, train_cells=1.0, valid_cells=0, test_cells=0): + # logger + logger = logging.getLogger() + + # load preprocessed files from dir & get total rows + preprocessed_files = os.listdir(preprocessed_dir) + n_preprocessed_files = len(preprocessed_files) + + # split train / valid / test set + if train_cells <= 1.0: + n_train_set = round(n_preprocessed_files * train_cells) + else: + n_train_set = int(train_cells) + + if valid_cells <= 1.0: + n_valid_set = round(n_preprocessed_files * valid_cells) + else: + n_valid_set = int(valid_cells) + + if test_cells <= 1.0: + n_test_set = round(n_preprocessed_files * test_cells) + else: + n_test_set = int(test_cells) + + # split by index + idx_cells = np.random.permutation(n_preprocessed_files) + idx_train = idx_cells[:n_train_set] + idx_valid = idx_cells[n_train_set:n_train_set + n_valid_set] + idx_test = idx_cells[n_train_set + n_valid_set:n_train_set + n_valid_set + n_test_set] + + train_files = [preprocessed_files[j] for j in idx_train] + valid_files = [preprocessed_files[j] for j in idx_valid] + test_files = [preprocessed_files[j] for j in idx_test] + + assert n_train_set + n_valid_set + n_test_set <= n_preprocessed_files + + # get valid sets & test sets + valid_X, valid_Y, valid_M = read_npz_files(preprocessed_dir, valid_files) + test_X, test_Y, test_M = read_npz_files(preprocessed_dir, test_files) + + # define train_set properties + n_rows_per_file = np.load(os.path.join(preprocessed_dir, train_files[0]))['X'].shape[0] + n_total_rows = n_train_set * n_rows_per_file + + # log dataset info + logger.info('') + logger.info('Dataset Summary') + logger.info(' - Used {:6d} cells of {:6d} total cells ({:2.2f}%)'.format(n_train_set + n_valid_set + n_test_set, + n_preprocessed_files, ( + n_train_set + n_valid_set + n_test_set) / n_preprocessed_files * 100)) + logger.info(' - Train Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_train_set, n_train_set / ( + n_train_set + n_valid_set + n_test_set) * 100)) + logger.info(' - Valid Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_valid_set, n_valid_set / ( + n_train_set + n_valid_set + n_test_set) * 100)) + logger.info(' - Test Dataset : {:6d} cells ({:02.2f}% of used cells)'.format(n_test_set, n_test_set / ( + n_train_set + n_valid_set + n_test_set) * 100)) + logger.info('') + logger.info('Trainset Summary') + logger.info(' - Row / Cell: {:9d} rows / cell'.format(n_rows_per_file)) + logger.info(' - Train Cell: {:9d} cells'.format(n_train_set)) + logger.info(' - Total Rows: {:9d} rows'.format(n_total_rows)) + logger.info(' - Batch Size: {:9d} rows / batch'.format(batch_size)) + logger.info(' - Batch Step: {:9d} batches / epoch'.format(math.ceil(n_total_rows / batch_size))) + logger.info('') + + # iter trainset + # for i in tqdm(range(0, n_total_rows, batch_size)): + for i in tqdm(range(0, n_total_rows, n_total_rows)): + row_idx_s = i # start row's index for batch + # row_idx_e = i + batch_size # end row's index for batch + row_idx_e = i + n_total_rows + + # for last iter + if row_idx_e >= n_total_rows: + row_idx_e = n_total_rows + + file_read_idx_s = math.floor( + row_idx_s / n_rows_per_file) # file index which contains start row index (aka start file) + file_read_idx_e = math.ceil( + row_idx_e / n_rows_per_file) # file index which contains end row index (aka end file) + + rows_read_idx_s = row_idx_s % n_rows_per_file # start row index on start file + rows_read_idx_e = row_idx_e % n_rows_per_file # end row index on end file + + train_X, train_Y, train_M = None, None, None + + # read files for batch + for j in range(file_read_idx_s, file_read_idx_e): + read_npz = np.load(os.path.join(preprocessed_dir, train_files[j])) + + if j == file_read_idx_s: + train_X = read_npz['X'][rows_read_idx_s:] + train_Y = read_npz['Y'][rows_read_idx_s:] + train_M = read_npz['M'][rows_read_idx_s:] + elif j == file_read_idx_e - 1: + train_X = np.vstack((train_X, read_npz['X'][:rows_read_idx_e])) + train_Y = np.vstack((train_Y, read_npz['Y'][:rows_read_idx_e])) + train_M = np.vstack((train_M, read_npz['M'][:rows_read_idx_e])) + else: + train_X = np.vstack((train_X, read_npz['X'])) + train_Y = np.vstack((train_Y, read_npz['Y'])) + train_M = np.vstack((train_M, read_npz['M'])) + + train_X, train_Y, train_M = train_X.reshape(-1, 10, 8), train_Y.reshape(-1, 8), train_M.reshape(-1, 77, 8) + + # # log + # logger.info('X : {}, {}, {}'.format(train_X.shape, valid_X.shape, test_X.shape)) + # logger.info('Y : {}, {}, {}'.format(train_Y.shape, valid_Y.shape, test_Y.shape)) + # logger.info('M : {}, {}, {}'.format(train_M.shape, valid_M.shape, test_M.shape)) + # logger.info('Feed data : X{}, Y{}, M{}'.format(train_X.shape, train_Y.shape, train_M.shape)) + + # return current batch + yield train_X, train_Y, train_M, valid_X, valid_Y, valid_M, test_X, test_Y, test_M + +# get dataset from given filenames +def read_npz_files(preprocessed_dir, files_to_read): + X, Y, M = None, None, None + + for filename in files_to_read: + read_npz = np.load(os.path.join(preprocessed_dir, filename)) + + if X is None: + X, Y, M = read_npz['X'], read_npz['Y'], read_npz['M'] + else: + X = np.vstack((X, read_npz['X'])) + Y = np.vstack((Y, read_npz['Y'])) + M = np.vstack((M, read_npz['M'])) + + return X.reshape(-1, 10, 8), Y.reshape(-1, 8), M.reshape(-1, 77, 8) + +# get dataset from given filename +def read_npz_file(preprocessed_dir, filename): + read_npz = np.load(os.path.join(preprocessed_dir, filename)) + return read_npz['X'].reshape(-1, 10, 8), read_npz['Y'].reshape(-1, 8), read_npz['M'].reshape(-1, 77, 8) + +# get feature label list for training +def get_feature_label_list(data_seq): + X, Y, M = data_seq + length = X.shape[0] + return [([X[i], M[i]], Y[i]) for i in range(length)] + +# get dataset from given preprocessed_dir parallelly by spark +def get_datasets_from_dir_spark(sc, preprocessed_dir, batch_size, train_cells=1.0, valid_cells=0, test_cells=0): + # logger + logger = logging.getLogger() + + # load preprocessed files from dir & get total rows + preprocessed_files = os.listdir(preprocessed_dir) + n_preprocessed_files = len(preprocessed_files) + + # split train / valid / test set + if train_cells <= 1.0: + n_train_set = round(n_preprocessed_files * train_cells) + else: + n_train_set = int(train_cells) + + if valid_cells <= 1.0: + n_valid_set = round(n_preprocessed_files * valid_cells) + else: + n_valid_set = int(valid_cells) + + if test_cells <= 1.0: + n_test_set = round(n_preprocessed_files * test_cells) + else: + n_test_set = int(test_cells) + + # split by index + idx_cells = np.random.permutation(n_preprocessed_files) + idx_train = idx_cells[:n_train_set] + idx_valid = idx_cells[n_train_set:n_train_set + n_valid_set] + idx_test = idx_cells[n_train_set + n_valid_set:n_train_set + n_valid_set + n_test_set] + + train_files = [preprocessed_files[j] for j in idx_train] + valid_files = [preprocessed_files[j] for j in idx_valid] + test_files = [preprocessed_files[j] for j in idx_test] + + assert n_train_set + n_valid_set + n_test_set <= n_preprocessed_files + + # define train_set properties + n_rows_per_file = np.load(os.path.join(preprocessed_dir, train_files[0]))['X'].shape[0] + n_total_rows = n_train_set * n_rows_per_file + + # log dataset info + logger.info('') + logger.info('Dataset Summary') + logger.info(' - Used {:6d} cells of {:6d} total cells ({:2.2f}%)'.format(n_train_set + n_valid_set + n_test_set, + n_preprocessed_files, ( + n_train_set + n_valid_set + n_test_set) / n_preprocessed_files * 100)) + logger.info(' - Train Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_train_set, n_train_set / ( + n_train_set + n_valid_set + n_test_set) * 100)) + logger.info(' - Valid Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_valid_set, n_valid_set / ( + n_train_set + n_valid_set + n_test_set) * 100)) + logger.info(' - Test Dataset : {:6d} cells ({:02.2f}% of used cells)'.format(n_test_set, n_test_set / ( + n_train_set + n_valid_set + n_test_set) * 100)) + logger.info('') + logger.info('Trainset Summary') + logger.info(' - Row / Cell: {:9d} rows / cell'.format(n_rows_per_file)) + logger.info(' - Train Cell: {:9d} cells'.format(n_train_set)) + logger.info(' - Total Rows: {:9d} rows'.format(n_total_rows)) + logger.info(' - Batch Size: {:9d} rows / batch'.format(batch_size)) + logger.info(' - Batch Step: {:9d} batches / epoch'.format(math.ceil(n_total_rows / batch_size))) + logger.info('') + + train_data = sc.parallelize(train_files).\ + map(lambda file: read_npz_file(preprocessed_dir, file)).\ + flatMap(lambda data_seq: get_feature_label_list(data_seq)) + val_data = sc.parallelize(valid_files). \ + map(lambda file: read_npz_file(preprocessed_dir, file)).\ + flatMap(lambda data_seq: get_feature_label_list(data_seq)) + test_data = sc.parallelize(test_files). \ + map(lambda file: read_npz_file(preprocessed_dir, file)).\ + flatMap(lambda data_seq: get_feature_label_list(data_seq)) + + return train_data, val_data, test_data + + if __name__ == "__main__": full_x = load_agg_data_all() \ No newline at end of file diff --git a/run_train_mem_model_zoo_local.sh b/run_train_mem_model_zoo_local.sh index e453367..a68f7e9 100755 --- a/run_train_mem_model_zoo_local.sh +++ b/run_train_mem_model_zoo_local.sh @@ -1,5 +1,7 @@ +#!/usr/bin/env bash export OMP_NUM_THREADS=10 + ${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ - --master local[4] \ + --master local[*] \ --driver-memory 20g \ - train_mem_model_zoo.py /home/nvkvs/data/aggregated_5min_scaled.csv 128 10 /home/nvkvs/ARMemNet-jennie/model + train_mem_model_zoo.py /Users/wangjiao/data/skt 128 10 /Users/wangjiao/git/ARMemNet-jennie/model diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py index 9c01421..c6cc671 100644 --- a/train_mem_model_zoo.py +++ b/train_mem_model_zoo.py @@ -1,7 +1,7 @@ from zoo import init_nncontext from zoo.tfpark import TFOptimizer, TFDataset from bigdl.optim.optimizer import * -from data_utils import load_agg_selected_data_mem_train +from data_utils import load_agg_selected_data_mem_train, get_datasets_from_dir, get_datasets_from_dir_spark from AR_mem.config import Config from AR_mem.model import Model from time import time @@ -29,6 +29,7 @@ config = Config() config.data_path = data_path config.latest_model=False + config.batch_size = batch_size # init or get SparkContext sc = init_nncontext() @@ -37,19 +38,42 @@ set_core_number(core_num) # create train data - train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \ - load_agg_selected_data_mem_train(data_path=config.data_path, - x_len=config.x_len, - y_len=config.y_len, - foresight=config.foresight, - cell_ids=config.train_cell_ids, - dev_ratio=config.dev_ratio, - test_len=config.test_len, - seed=config.seed) - - dataset = TFDataset.from_ndarrays([train_x, train_m, train_y], batch_size=batch_size, val_tensors=[dev_x, dev_m, dev_y],) - - model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) + # train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \ + # load_agg_selected_data_mem_train(data_path=config.data_path, + # x_len=config.x_len, + # y_len=config.y_len, + # foresight=config.foresight, + # cell_ids=config.train_cell_ids, + # dev_ratio=config.dev_ratio, + # test_len=config.test_len, + # seed=config.seed) + + # config.batch_size is useless as we force get_datasets_from_dir return the entire data + # train_X, train_Y, train_M, valid_X, valid_Y, valid_M, _, _, _ =\ + # get_datasets_from_dir(sc, config.data_path, config.batch_size, + # train_cells=config.num_cells_train, + # valid_cells=config.num_cells_valid, + # test_cells=config.num_cells_test)[0] + # + # dataset = TFDataset.from_ndarrays([train_X, train_M, train_Y], batch_size=batch_size, + # val_tensors=[valid_X, valid_M, valid_Y],) + # + # model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) + + train_rdd, val_rdd, test_rdd = \ + get_datasets_from_dir_spark(sc, config.data_path, config.batch_size, + train_cells=config.num_cells_train, + valid_cells=config.num_cells_valid, + test_cells=config.num_cells_test) + + dataset = TFDataset.from_rdd(train_rdd, + features=[(tf.float32, [10, 8]), (tf.float32, [77, 8])], + labels=(tf.float32, [8]), + batch_size=config.batch_size, + val_rdd=val_rdd) + + model = Model(config, dataset.tensors[0][0], dataset.tensors[0][1], dataset.tensors[1]) + optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae}, model_dir=model_dir, From fefbecfbfe0a249892666fb5c3375a53b1a9f1f4 Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 22 Jan 2020 11:39:51 +0800 Subject: [PATCH 08/10] update preprocess --- data_utils.py | 155 ++++++++++++++++++------------- run_train_mem_model_zoo_local.sh | 5 +- 2 files changed, 94 insertions(+), 66 deletions(-) diff --git a/data_utils.py b/data_utils.py index b2b8026..aaabb20 100644 --- a/data_utils.py +++ b/data_utils.py @@ -7,7 +7,6 @@ import math import tqdm - logger = logging.getLogger() @@ -29,7 +28,6 @@ def load_agg_data( test_len=7, seed=None, ): - data = pd.read_csv(data_path, index_col=0) data.index = pd.to_datetime(data.index) @@ -42,9 +40,9 @@ def load_agg_data( full_y_ = [] full_dt_ = [] - for cell_id in range(ncells): # config + for cell_id in range(ncells): # config - cell_data = data[data['CELL_NUM']==cell_id] + cell_data = data[data['CELL_NUM'] == cell_id] grouped = cell_data.groupby(pd.Grouper(freq='D')) @@ -61,14 +59,15 @@ def load_agg_data( source_x = group[col_list].sort_index().values.reshape(-1, ndim_x).astype('float32') source_y = group[col_list].sort_index().values.reshape(-1, ndim_y).astype('float32') - slided_x = np.array([source_x[i:i + x_len] for i in range(0, len(source_x) - x_len - foresight - y_len + 1)]) + slided_x = np.array( + [source_x[i:i + x_len] for i in range(0, len(source_x) - x_len - foresight - y_len + 1)]) y_start_idx = x_len + foresight slided_y = np.array([source_y[i:i + y_len] for i in range(y_start_idx, len(source_y) - y_len + 1)]) slided_dt = np.array([group_index[i:i + y_len] for i in range(y_start_idx, len(source_y) - y_len + 1)]) - cell_x = np.concatenate([cell_x, slided_x],axis=0) - cell_y = np.concatenate([cell_y, slided_y],axis=0) - cell_dt = np.concatenate([cell_dt, slided_dt],axis=0) + cell_x = np.concatenate([cell_x, slided_x], axis=0) + cell_y = np.concatenate([cell_y, slided_y], axis=0) + cell_dt = np.concatenate([cell_dt, slided_dt], axis=0) full_x_.append(cell_x) full_y_.append(cell_y) @@ -99,10 +98,10 @@ def load_agg_data( if y_len == 1: d = full_dt_tmp[i] else: - d = full_dt[i,0] + d = full_dt[i, 0] if d == str(start_dt): - test_ind = i+1 + test_ind = i + 1 break assert test_ind != -1 @@ -110,7 +109,7 @@ def load_agg_data( tr_x = full_x[:test_ind] tr_y = full_y[:test_ind] - if seed : + if seed: np.random.seed(seed) dev_len = int(len(tr_x) * dev_ratio) dev_ind = np.random.permutation(len(tr_x))[:dev_len] @@ -139,7 +138,6 @@ def load_agg_selected_data_mem_train( test_len=7, seed=None, ): - data = pd.read_csv(data_path, index_col=0) data.index = pd.to_datetime(data.index) @@ -148,14 +146,14 @@ def load_agg_selected_data_mem_train( ndim_x = len(col_list) ndim_y = ndim_x - full_m_lst=[] + full_m_lst = [] full_y_lst = [] full_x_lst = [] full_cell_lst = [] cell_list = cell_ids - for cell_id in cell_list: # config - cell_data = data[data['CELL_NUM']==cell_id] + for cell_id in cell_list: # config + cell_data = data[data['CELL_NUM'] == cell_id] grouped = cell_data.groupby(pd.Grouper(freq='D')) m_lst = [] @@ -169,9 +167,10 @@ def load_agg_selected_data_mem_train( source_x = group[col_list].sort_index().values.reshape(-1, ndim_x).astype('float32') source_y = group[col_list].sort_index().values.reshape(-1, ndim_y).astype('float32') - slided_x = np.array([source_x[i:i + x_len] for i in range(0, len(source_x) - x_len - foresight - y_len + 1)]) + slided_x = np.array( + [source_x[i:i + x_len] for i in range(0, len(source_x) - x_len - foresight - y_len + 1)]) y_start_idx = x_len + foresight - slided_y = np.array([source_y[i:i + y_len] for i in range(y_start_idx, len(source_y) - y_len+1)]) + slided_y = np.array([source_y[i:i + y_len] for i in range(y_start_idx, len(source_y) - y_len + 1)]) slided_dt = np.array([group_index[i:i + y_len] for i in range(y_start_idx, len(source_y) - y_len + 1)]) m_lst.append(slided_x) @@ -193,22 +192,22 @@ def load_agg_selected_data_mem_train( # [slided, ncells, day, nsteps, nfeatures] print("after day window sliding") - full_m = np.stack(full_m_lst, axis=1) # [slided, ncells, day, nsteps+1, nfeatures] - full_x = np.stack(full_x_lst, axis=1) # [slided, ncells, day, nsteps, nfeatures] - full_y = np.stack(full_y_lst, axis=1) # [slided, ncells, day, 1, nfeatures] + full_m = np.stack(full_m_lst, axis=1) # [slided, ncells, day, nsteps+1, nfeatures] + full_x = np.stack(full_x_lst, axis=1) # [slided, ncells, day, nsteps, nfeatures] + full_y = np.stack(full_y_lst, axis=1) # [slided, ncells, day, 1, nfeatures] full_cell = np.stack(full_cell_lst, axis=1) # [slided, ncells, day, 1] for arg in [full_m, full_x, full_y, full_cell]: print(arg.shape) # memory sliding for each cell - x_start_day = mem_len+1 + x_start_day = mem_len + 1 total_m = [] total_x = [] total_y = [] total_cell = [] for i in range(x_start_day, full_m.shape[2]): - total_m.append(full_m[:,:,i-mem_len:i,:,:]) + total_m.append(full_m[:, :, i - mem_len:i, :, :]) total_x.append(full_x[:, :, i, :, :]) total_y.append(full_y[:, :, i, :, :]) total_cell.append(full_cell[:, :, i, :]) @@ -228,7 +227,7 @@ def load_agg_selected_data_mem_train( # squeezing total_y = np.squeeze(total_y) # total_y = np.expand_dims(total_y, axis=1) ## warning : only when using 1 cell !! - total_cell= np.squeeze(total_cell) + total_cell = np.squeeze(total_cell) print("after memory sliding") for arg in [total_x, total_y, total_cell, total_m]: print(arg.shape) @@ -237,7 +236,7 @@ def _time_concat(arg): '''making shape [slided * ncells, nsteps, nfeatures]''' shapes = arg.shape right = [shapes[i] for i in range(2, len(shapes))] - out = np.reshape(arg, [-1]+right) + out = np.reshape(arg, [-1] + right) return out tr_x = _time_concat(total_x) @@ -245,7 +244,7 @@ def _time_concat(arg): tr_c = _time_concat(total_cell) tr_m = _time_concat(total_m) - if seed : + if seed: np.random.seed(seed) dev_len = int(len(tr_x) * dev_ratio) @@ -279,7 +278,6 @@ def load_agg_selected_data_mem( test_len=7, seed=None, ): - data = pd.read_csv(data_path, index_col=0) data.index = pd.to_datetime(data.index) @@ -288,15 +286,15 @@ def load_agg_selected_data_mem( ndim_x = len(col_list) ndim_y = ndim_x - full_m_lst=[] + full_m_lst = [] full_y_lst = [] full_dt_lst = [] full_x_lst = [] full_cell_lst = [] cell_list = [18] - for cell_id in cell_list: # config + for cell_id in cell_list: # config - cell_data = data[data['CELL_NUM']==cell_id] + cell_data = data[data['CELL_NUM'] == cell_id] grouped = cell_data.groupby(pd.Grouper(freq='D')) m_lst = [] @@ -310,9 +308,10 @@ def load_agg_selected_data_mem( source_x = group[col_list].sort_index().values.reshape(-1, ndim_x).astype('float32') source_y = group[col_list].sort_index().values.reshape(-1, ndim_y).astype('float32') - slided_x = np.array([source_x[i:i + x_len] for i in range(0, len(source_x) - x_len - foresight - y_len + 1)]) + slided_x = np.array( + [source_x[i:i + x_len] for i in range(0, len(source_x) - x_len - foresight - y_len + 1)]) y_start_idx = x_len + foresight - slided_y = np.array([source_y[i:i + y_len] for i in range(y_start_idx, len(source_y) - y_len+1)]) + slided_y = np.array([source_y[i:i + y_len] for i in range(y_start_idx, len(source_y) - y_len + 1)]) slided_dt = np.array([group_index[i:i + y_len] for i in range(y_start_idx, len(source_y) - y_len + 1)]) m_lst.append(slided_x) @@ -335,9 +334,9 @@ def load_agg_selected_data_mem( # [slided, ncells, day, nsteps, nfeatures] print("after day window sliding") - full_m = np.stack(full_m_lst, axis=1) # [slided, ncells, day, nsteps+1, nfeatures] - full_x = np.stack(full_x_lst, axis=1) # [slided, ncells, day, nsteps, nfeatures] - full_y = np.stack(full_y_lst, axis=1) # [slided, ncells, day, 1, nfeatures] + full_m = np.stack(full_m_lst, axis=1) # [slided, ncells, day, nsteps+1, nfeatures] + full_x = np.stack(full_x_lst, axis=1) # [slided, ncells, day, nsteps, nfeatures] + full_y = np.stack(full_y_lst, axis=1) # [slided, ncells, day, 1, nfeatures] full_dt = np.stack(full_dt_lst, axis=1) # [slided, ncells, day, 1] full_cell = np.stack(full_cell_lst, axis=1) # [slided, ncells, day, 1] @@ -345,14 +344,14 @@ def load_agg_selected_data_mem( print(arg.shape) # memory sliding for each cell - x_start_day = mem_len+1 + x_start_day = mem_len + 1 total_m = [] total_x = [] total_y = [] total_dt = [] total_cell = [] for i in range(x_start_day, full_m.shape[2]): - total_m.append(full_m[:,:,i-mem_len:i,:,:]) + total_m.append(full_m[:, :, i - mem_len:i, :, :]) total_x.append(full_x[:, :, i, :, :]) total_y.append(full_y[:, :, i, :, :]) total_dt.append(full_dt[:, :, i, :]) @@ -373,10 +372,10 @@ def load_agg_selected_data_mem( # squeezing total_y = np.squeeze(total_y) - total_y = np.expand_dims(total_y, axis=1) ## warning : only when using 1 cell !! + total_y = np.expand_dims(total_y, axis=1) ## warning : only when using 1 cell !! total_dt_cell0 = np.squeeze(total_dt_cell0) total_dt = np.squeeze(total_dt) - total_cell= np.squeeze(total_cell) + total_cell = np.squeeze(total_cell) print("after memory sliding") for arg in [total_x, total_y, total_dt, total_cell, total_m]: print(arg.shape) @@ -391,10 +390,10 @@ def load_agg_selected_data_mem( if y_len == 1: d = total_dt_cell0[i] else: - d = total_dt_cell0[i,0] + d = total_dt_cell0[i, 0] if d == str(start_dt): - test_ind = i+1 + test_ind = i + 1 break assert test_ind != -1 print("test ind: {}".format(test_ind)) @@ -408,7 +407,7 @@ def _time_concat(arg): '''making shape [slided * ncells, nsteps, nfeatures]''' shapes = arg.shape right = [shapes[i] for i in range(2, len(shapes))] - out = np.reshape(arg, [-1]+right) + out = np.reshape(arg, [-1] + right) return out # [slided * ncells, nsteps, nf] @@ -422,7 +421,7 @@ def _time_concat(arg): te_m = _time_concat(total_m[test_ind:]) te_c = _time_concat(total_cell[test_ind:]) - if seed : + if seed: np.random.seed(seed) dev_len = int(len(tr_x) * dev_ratio) dev_ind = np.random.permutation(len(tr_x))[:dev_len] @@ -446,23 +445,22 @@ def _time_concat(arg): def load_agg_data_all(data_path='../data/aggregated_data_5min_scaled.csv', ncells=20, test_len=7): - data = pd.read_csv(data_path, index_col=0) data.index = pd.to_datetime(data.index) full_x = [] - for cell_id in range(ncells): # config - cell_data = data[data['CELL_NUM']==cell_id] + for cell_id in range(ncells): # config + cell_data = data[data['CELL_NUM'] == cell_id] # Find last test_len days to generate cell vectors end_dt = cell_data.index.date[-1] from datetime import timedelta - start_dt = end_dt-timedelta(days=6) - cell_x = cell_data[start_dt : end_dt+timedelta(days=1)] + start_dt = end_dt - timedelta(days=6) + cell_x = cell_data[start_dt: end_dt + timedelta(days=1)] full_x.append(cell_x) - full_x = np.stack(full_x, axis=0) # [ncells, t, d] + full_x = np.stack(full_x, axis=0) # [ncells, t, d] full_x = np.expand_dims(full_x, axis=0) full_x = full_x[:, :, :, :-1] @@ -523,13 +521,13 @@ def get_datasets_from_dir(preprocessed_dir, batch_size, train_cells=1.0, valid_c logger.info('Dataset Summary') logger.info(' - Used {:6d} cells of {:6d} total cells ({:2.2f}%)'.format(n_train_set + n_valid_set + n_test_set, n_preprocessed_files, ( - n_train_set + n_valid_set + n_test_set) / n_preprocessed_files * 100)) + n_train_set + n_valid_set + n_test_set) / n_preprocessed_files * 100)) logger.info(' - Train Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_train_set, n_train_set / ( - n_train_set + n_valid_set + n_test_set) * 100)) + n_train_set + n_valid_set + n_test_set) * 100)) logger.info(' - Valid Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_valid_set, n_valid_set / ( - n_train_set + n_valid_set + n_test_set) * 100)) + n_train_set + n_valid_set + n_test_set) * 100)) logger.info(' - Test Dataset : {:6d} cells ({:02.2f}% of used cells)'.format(n_test_set, n_test_set / ( - n_train_set + n_valid_set + n_test_set) * 100)) + n_train_set + n_valid_set + n_test_set) * 100)) logger.info('') logger.info('Trainset Summary') logger.info(' - Row / Cell: {:9d} rows / cell'.format(n_rows_per_file)) @@ -588,6 +586,7 @@ def get_datasets_from_dir(preprocessed_dir, batch_size, train_cells=1.0, valid_c # return current batch yield train_X, train_Y, train_M, valid_X, valid_Y, valid_M, test_X, test_Y, test_M + # get dataset from given filenames def read_npz_files(preprocessed_dir, files_to_read): X, Y, M = None, None, None @@ -604,17 +603,20 @@ def read_npz_files(preprocessed_dir, files_to_read): return X.reshape(-1, 10, 8), Y.reshape(-1, 8), M.reshape(-1, 77, 8) + # get dataset from given filename -def read_npz_file(preprocessed_dir, filename): - read_npz = np.load(os.path.join(preprocessed_dir, filename)) +def read_npz_file(filename): + read_npz = np.load(filename) return read_npz['X'].reshape(-1, 10, 8), read_npz['Y'].reshape(-1, 8), read_npz['M'].reshape(-1, 77, 8) + # get feature label list for training def get_feature_label_list(data_seq): X, Y, M = data_seq length = X.shape[0] return [([X[i], M[i]], Y[i]) for i in range(length)] + # get dataset from given preprocessed_dir parallelly by spark def get_datasets_from_dir_spark(sc, preprocessed_dir, batch_size, train_cells=1.0, valid_cells=0, test_cells=0): # logger @@ -661,13 +663,13 @@ def get_datasets_from_dir_spark(sc, preprocessed_dir, batch_size, train_cells=1. logger.info('Dataset Summary') logger.info(' - Used {:6d} cells of {:6d} total cells ({:2.2f}%)'.format(n_train_set + n_valid_set + n_test_set, n_preprocessed_files, ( - n_train_set + n_valid_set + n_test_set) / n_preprocessed_files * 100)) + n_train_set + n_valid_set + n_test_set) / n_preprocessed_files * 100)) logger.info(' - Train Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_train_set, n_train_set / ( - n_train_set + n_valid_set + n_test_set) * 100)) + n_train_set + n_valid_set + n_test_set) * 100)) logger.info(' - Valid Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_valid_set, n_valid_set / ( - n_train_set + n_valid_set + n_test_set) * 100)) + n_train_set + n_valid_set + n_test_set) * 100)) logger.info(' - Test Dataset : {:6d} cells ({:02.2f}% of used cells)'.format(n_test_set, n_test_set / ( - n_train_set + n_valid_set + n_test_set) * 100)) + n_train_set + n_valid_set + n_test_set) * 100)) logger.info('') logger.info('Trainset Summary') logger.info(' - Row / Cell: {:9d} rows / cell'.format(n_rows_per_file)) @@ -677,18 +679,43 @@ def get_datasets_from_dir_spark(sc, preprocessed_dir, batch_size, train_cells=1. logger.info(' - Batch Step: {:9d} batches / epoch'.format(math.ceil(n_total_rows / batch_size))) logger.info('') - train_data = sc.parallelize(train_files).\ - map(lambda file: read_npz_file(preprocessed_dir, file)).\ + # train_data = sc.parallelize(train_files).\ + # map(lambda file: read_npz_file(preprocessed_dir, file)).\ + # flatMap(lambda data_seq: get_feature_label_list(data_seq)) + train_data = sc.parallelize(train_files). \ + mapPartition(copy_to_local_file). \ + map(lambda file: read_npz_file(file)). \ flatMap(lambda data_seq: get_feature_label_list(data_seq)) + # train_data = sc.binaryFiles(preprocessed_dir). \ + # map(lambda stream: np.frombuffer(stream.toArray())). \ + # flatMap(lambda data_seq: get_feature_label_list(data_seq)) val_data = sc.parallelize(valid_files). \ - map(lambda file: read_npz_file(preprocessed_dir, file)).\ + mapPartition(copy_to_local_file). \ + map(lambda file: read_npz_file(file)). \ flatMap(lambda data_seq: get_feature_label_list(data_seq)) test_data = sc.parallelize(test_files). \ - map(lambda file: read_npz_file(preprocessed_dir, file)).\ + mapPartition(copy_to_local_file). \ + map(lambda file: read_npz_file(file)). \ flatMap(lambda data_seq: get_feature_label_list(data_seq)) - return train_data, val_data, test_data +def copy_to_local_file(list_of_files): + final_iterator = [] + import os + for file in list_of_files: + import tempfile + dir = tempfile.mkdtemp() + basename = os.path.basename(file) + destfile = os.path.join(dir, basename) + cmd = "hdfs dfs -get " + file + " " + destfile + status = os.system(cmd) + if status != 0: + print("error code: " + status) + exit(status) + final_iterator.append(destfile) + return iter(final_iterator) + + if __name__ == "__main__": - full_x = load_agg_data_all() \ No newline at end of file + full_x = load_agg_data_all() diff --git a/run_train_mem_model_zoo_local.sh b/run_train_mem_model_zoo_local.sh index a68f7e9..7b58079 100755 --- a/run_train_mem_model_zoo_local.sh +++ b/run_train_mem_model_zoo_local.sh @@ -1,7 +1,8 @@ #!/usr/bin/env bash +export PYSPARK_PYTHON=python3 export OMP_NUM_THREADS=10 ${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ - --master local[*] \ + --master local[2] \ --driver-memory 20g \ - train_mem_model_zoo.py /Users/wangjiao/data/skt 128 10 /Users/wangjiao/git/ARMemNet-jennie/model + train_mem_model_zoo.py hdfs://localhost:9000/skt 128 10 /Users/wangjiao/git/ARMemNet-jennie/model From 2648b8348c1a5bbee35b7cd147ca90a75174be1a Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 29 Jan 2020 12:22:05 +0800 Subject: [PATCH 09/10] update yarn support --- data_utils.py | 49 ++++++++++++++++++++++++--------- run_train_mem_model_zoo_yarn.sh | 13 +++++++++ train_mem_model_zoo.py | 14 +++++++--- 3 files changed, 59 insertions(+), 17 deletions(-) create mode 100755 run_train_mem_model_zoo_yarn.sh diff --git a/data_utils.py b/data_utils.py index 521314a..75c4432 100644 --- a/data_utils.py +++ b/data_utils.py @@ -202,8 +202,17 @@ def get_datasets_from_dir_spark(sc, preprocessed_dir, batch_size, train_cells=1. logger = logging.getLogger() # load preprocessed files from dir & get total rows - preprocessed_files = os.listdir(preprocessed_dir) + + cmd = "hdfs dfs -ls " + preprocessed_dir + out = os.popen(cmd) + preprocessed_files = [fileline.split(' ')[-1][:-1] for fileline in out.readlines()[1:]] + # for fileline in out.readlines()[1:]: + # file_path = fileline.split(' ')[-1] + # preprocessed_files.append(file_path) + + # preprocessed_files = os.listdir(preprocessed_dir) n_preprocessed_files = len(preprocessed_files) + print("n_preprocessed_files: ", n_preprocessed_files) # split train / valid / test set if train_cells <= 1.0: @@ -234,9 +243,21 @@ def get_datasets_from_dir_spark(sc, preprocessed_dir, batch_size, train_cells=1. assert n_train_set + n_valid_set + n_test_set <= n_preprocessed_files # define train_set properties - n_rows_per_file = np.load(os.path.join(preprocessed_dir, train_files[0]))['X'].shape[0] + if not os.path.exists("/tmp/skt"): + os.mkdir("/tmp/skt") + basename = os.path.basename(train_files[0]) + # destfile = os.path.join(dir, basename) + destfile = os.path.join("/tmp/skt", basename) + cmd = ' '.join(["hdfs dfs -get", train_files[0], destfile]) + status = os.system(cmd) + if status != 0: + print("error code: " + str(status)) + exit(status) + + n_rows_per_file = np.load(destfile)['X'].shape[0] n_total_rows = n_train_set * n_rows_per_file + # log dataset info logger.info('') logger.info('Dataset Summary') @@ -262,18 +283,18 @@ def get_datasets_from_dir_spark(sc, preprocessed_dir, batch_size, train_cells=1. # map(lambda file: read_npz_file(preprocessed_dir, file)).\ # flatMap(lambda data_seq: get_feature_label_list(data_seq)) train_data = sc.parallelize(train_files). \ - mapPartition(copy_to_local_file). \ + mapPartitions(copy_to_local_file). \ map(lambda file: read_npz_file(file)). \ flatMap(lambda data_seq: get_feature_label_list(data_seq)) # train_data = sc.binaryFiles(preprocessed_dir). \ # map(lambda stream: np.frombuffer(stream.toArray())). \ # flatMap(lambda data_seq: get_feature_label_list(data_seq)) val_data = sc.parallelize(valid_files). \ - mapPartition(copy_to_local_file). \ + mapPartitions(copy_to_local_file). \ map(lambda file: read_npz_file(file)). \ flatMap(lambda data_seq: get_feature_label_list(data_seq)) test_data = sc.parallelize(test_files). \ - mapPartition(copy_to_local_file). \ + mapPartitions(copy_to_local_file). \ map(lambda file: read_npz_file(file)). \ flatMap(lambda data_seq: get_feature_label_list(data_seq)) return train_data, val_data, test_data @@ -283,15 +304,17 @@ def copy_to_local_file(list_of_files): final_iterator = [] import os for file in list_of_files: - import tempfile - dir = tempfile.mkdtemp() + if not os.path.exists("/tmp/skt"): + os.mkdir("/tmp/skt") basename = os.path.basename(file) - destfile = os.path.join(dir, basename) - cmd = "hdfs dfs -get " + file + " " + destfile - status = os.system(cmd) - if status != 0: - print("error code: " + status) - exit(status) + destfile = os.path.join("/tmp/skt", basename) + if not os.path.exists(destfile): + cmd = "hdfs dfs -get " + file + " " + destfile + status = os.system(cmd) + if status != 0: + print("error code: " + status) + exit(status) + print("download hdfs file", destfile) final_iterator.append(destfile) return iter(final_iterator) diff --git a/run_train_mem_model_zoo_yarn.sh b/run_train_mem_model_zoo_yarn.sh new file mode 100755 index 0000000..8d3351b --- /dev/null +++ b/run_train_mem_model_zoo_yarn.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +export PYSPARK_PYTHON=python3 +export OMP_NUM_THREADS=10 +export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop + +${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ + --master yarn \ + --deploy-mode client \ + --driver-memory 6g \ + --executor-memory 6g \ + --executor-cores 2 \ + --num-executors 1 \ + train_mem_model_zoo.py hdfs://localhost:9000/skt 128 10 /Users/wangjiao/git/ARMemNet-jennie/model diff --git a/train_mem_model_zoo.py b/train_mem_model_zoo.py index c6cc671..3765201 100644 --- a/train_mem_model_zoo.py +++ b/train_mem_model_zoo.py @@ -1,7 +1,7 @@ from zoo import init_nncontext from zoo.tfpark import TFOptimizer, TFDataset from bigdl.optim.optimizer import * -from data_utils import load_agg_selected_data_mem_train, get_datasets_from_dir, get_datasets_from_dir_spark +from data_utils import get_datasets_from_dir, get_datasets_from_dir_spark from AR_mem.config import Config from AR_mem.model import Model from time import time @@ -60,11 +60,17 @@ # # model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2]) + # train_rdd, val_rdd, test_rdd = \ + # get_datasets_from_dir_spark(sc, config.data_path, config.batch_size, + # train_cells=config.num_cells_train, + # valid_cells=config.num_cells_valid, + # test_cells=config.num_cells_test) + train_rdd, val_rdd, test_rdd = \ get_datasets_from_dir_spark(sc, config.data_path, config.batch_size, - train_cells=config.num_cells_train, - valid_cells=config.num_cells_valid, - test_cells=config.num_cells_test) + train_cells=0.8, + valid_cells=0.2, + test_cells=0) dataset = TFDataset.from_rdd(train_rdd, features=[(tf.float32, [10, 8]), (tf.float32, [77, 8])], From ebc8eae18bd4beb2ef3e53dc24a071e4e20e0a1f Mon Sep 17 00:00:00 2001 From: jenniew Date: Wed, 12 Feb 2020 02:01:00 +0800 Subject: [PATCH 10/10] update for yarn script --- AR_mem/__init__.py | 0 run_train_mem_model_zoo_yarn.sh | 6 ++++++ 2 files changed, 6 insertions(+) create mode 100644 AR_mem/__init__.py diff --git a/AR_mem/__init__.py b/AR_mem/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/run_train_mem_model_zoo_yarn.sh b/run_train_mem_model_zoo_yarn.sh index 8d3351b..b1d9086 100755 --- a/run_train_mem_model_zoo_yarn.sh +++ b/run_train_mem_model_zoo_yarn.sh @@ -3,6 +3,11 @@ export PYSPARK_PYTHON=python3 export OMP_NUM_THREADS=10 export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop +if [ -f train.zip ] + then rm train.zip +fi +zip -r train.zip *.py */*.py + ${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ --master yarn \ --deploy-mode client \ @@ -10,4 +15,5 @@ ${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \ --executor-memory 6g \ --executor-cores 2 \ --num-executors 1 \ + --py-files train.zip \ train_mem_model_zoo.py hdfs://localhost:9000/skt 128 10 /Users/wangjiao/git/ARMemNet-jennie/model