Skip to content
This repository has been archived by the owner on May 10, 2023. It is now read-only.

Analytics Zoo training in Python #1

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added AR_mem/__init__.py
Empty file.
14 changes: 10 additions & 4 deletions AR_mem/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

# AR_memory
class Model(object):
def __init__(self, config):
def __init__(self, config, input_x=None, memories=None, targets=None):
self.config = config
self.global_step = tf.Variable(0, trainable=False, name="global_step")
self.regularizer = layers.l2_regularizer(self.config.l2_lambda)
self.sess = None
self.saver = None
self.input_x = input_x
self.memories = memories
self.targets = targets
self._build_model()

def _build_model(self):
Expand Down Expand Up @@ -54,9 +57,12 @@ def _build_model(self):
self.initialize_session()

def add_placeholder(self):
self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x")
self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets")
self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32,
if self.input_x is None:
self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x")
if self.targets is None:
self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets")
if self.memories is None:
self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32,
name="memories")
# self.targets = tf.placeholder(shape=[None], dtype=tf.int32, name="targets")
self.dropout = tf.placeholder(dtype=tf.float32, name="dropout")
Expand Down
137 changes: 137 additions & 0 deletions data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,143 @@ def get_datasets_from_dir(preprocessed_dir, batch_size, train_cells=1.0, valid_c
# return current batch
yield train_X, train_Y, train_M, valid_X, valid_Y, valid_M, test_X, test_Y, test_M


# get dataset from given filename
def read_npz_file(filename):
read_npz = np.load(filename)
return read_npz['X'].reshape(-1, 10, 8), read_npz['Y'].reshape(-1, 8), read_npz['M'].reshape(-1, 77, 8)


# get feature label list for training
def get_feature_label_list(data_seq):
X, Y, M = data_seq
length = X.shape[0]
return [([X[i], M[i]], Y[i]) for i in range(length)]


# get dataset from given preprocessed_dir parallelly by spark
def get_datasets_from_dir_spark(sc, preprocessed_dir, batch_size, train_cells=1.0, valid_cells=0, test_cells=0):
# logger
logger = logging.getLogger()

# load preprocessed files from dir & get total rows

cmd = "hdfs dfs -ls " + preprocessed_dir
out = os.popen(cmd)
preprocessed_files = [fileline.split(' ')[-1][:-1] for fileline in out.readlines()[1:]]
# for fileline in out.readlines()[1:]:
# file_path = fileline.split(' ')[-1]
# preprocessed_files.append(file_path)

# preprocessed_files = os.listdir(preprocessed_dir)
n_preprocessed_files = len(preprocessed_files)
print("n_preprocessed_files: ", n_preprocessed_files)

# split train / valid / test set
if train_cells <= 1.0:
n_train_set = round(n_preprocessed_files * train_cells)
else:
n_train_set = int(train_cells)

if valid_cells <= 1.0:
n_valid_set = round(n_preprocessed_files * valid_cells)
else:
n_valid_set = int(valid_cells)

if test_cells <= 1.0:
n_test_set = round(n_preprocessed_files * test_cells)
else:
n_test_set = int(test_cells)

# split by index
idx_cells = np.random.permutation(n_preprocessed_files)
idx_train = idx_cells[:n_train_set]
idx_valid = idx_cells[n_train_set:n_train_set + n_valid_set]
idx_test = idx_cells[n_train_set + n_valid_set:n_train_set + n_valid_set + n_test_set]

train_files = [preprocessed_files[j] for j in idx_train]
valid_files = [preprocessed_files[j] for j in idx_valid]
test_files = [preprocessed_files[j] for j in idx_test]

assert n_train_set + n_valid_set + n_test_set <= n_preprocessed_files

# define train_set properties
if not os.path.exists("/tmp/skt"):
os.mkdir("/tmp/skt")
basename = os.path.basename(train_files[0])
# destfile = os.path.join(dir, basename)
destfile = os.path.join("/tmp/skt", basename)
cmd = ' '.join(["hdfs dfs -get", train_files[0], destfile])
status = os.system(cmd)
if status != 0:
print("error code: " + str(status))
exit(status)

n_rows_per_file = np.load(destfile)['X'].shape[0]
n_total_rows = n_train_set * n_rows_per_file


# log dataset info
logger.info('')
logger.info('Dataset Summary')
logger.info(' - Used {:6d} cells of {:6d} total cells ({:2.2f}%)'.format(n_train_set + n_valid_set + n_test_set,
n_preprocessed_files, (
n_train_set + n_valid_set + n_test_set) / n_preprocessed_files * 100))
logger.info(' - Train Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_train_set, n_train_set / (
n_train_set + n_valid_set + n_test_set) * 100))
logger.info(' - Valid Dataset: {:6d} cells ({:02.2f}% of used cells)'.format(n_valid_set, n_valid_set / (
n_train_set + n_valid_set + n_test_set) * 100))
logger.info(' - Test Dataset : {:6d} cells ({:02.2f}% of used cells)'.format(n_test_set, n_test_set / (
n_train_set + n_valid_set + n_test_set) * 100))
logger.info('')
logger.info('Trainset Summary')
logger.info(' - Row / Cell: {:9d} rows / cell'.format(n_rows_per_file))
logger.info(' - Train Cell: {:9d} cells'.format(n_train_set))
logger.info(' - Total Rows: {:9d} rows'.format(n_total_rows))
logger.info(' - Batch Size: {:9d} rows / batch'.format(batch_size))
logger.info(' - Batch Step: {:9d} batches / epoch'.format(math.ceil(n_total_rows / batch_size)))
logger.info('')

# train_data = sc.parallelize(train_files).\
# map(lambda file: read_npz_file(preprocessed_dir, file)).\
# flatMap(lambda data_seq: get_feature_label_list(data_seq))
train_data = sc.parallelize(train_files). \
mapPartitions(copy_to_local_file). \
map(lambda file: read_npz_file(file)). \
flatMap(lambda data_seq: get_feature_label_list(data_seq))
# train_data = sc.binaryFiles(preprocessed_dir). \
# map(lambda stream: np.frombuffer(stream.toArray())). \
# flatMap(lambda data_seq: get_feature_label_list(data_seq))
val_data = sc.parallelize(valid_files). \
mapPartitions(copy_to_local_file). \
map(lambda file: read_npz_file(file)). \
flatMap(lambda data_seq: get_feature_label_list(data_seq))
test_data = sc.parallelize(test_files). \
mapPartitions(copy_to_local_file). \
map(lambda file: read_npz_file(file)). \
flatMap(lambda data_seq: get_feature_label_list(data_seq))
return train_data, val_data, test_data


def copy_to_local_file(list_of_files):
final_iterator = []
import os
for file in list_of_files:
if not os.path.exists("/tmp/skt"):
os.mkdir("/tmp/skt")
basename = os.path.basename(file)
destfile = os.path.join("/tmp/skt", basename)
if not os.path.exists(destfile):
cmd = "hdfs dfs -get " + file + " " + destfile
status = os.system(cmd)
if status != 0:
print("error code: " + status)
exit(status)
print("download hdfs file", destfile)
final_iterator.append(destfile)
return iter(final_iterator)


##
# Legacy util.py functions

Expand Down
8 changes: 8 additions & 0 deletions run_train_mem_model_zoo_local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash
export PYSPARK_PYTHON=python3
export OMP_NUM_THREADS=10

${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \
--master local[2] \
--driver-memory 20g \
train_mem_model_zoo.py hdfs://localhost:9000/skt 128 10 /Users/wangjiao/git/ARMemNet-jennie/model
19 changes: 19 additions & 0 deletions run_train_mem_model_zoo_yarn.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
export PYSPARK_PYTHON=python3
export OMP_NUM_THREADS=10
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

if [ -f train.zip ]
then rm train.zip
fi
zip -r train.zip *.py */*.py

${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \
--master yarn \
--deploy-mode client \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 2 \
--num-executors 1 \
--py-files train.zip \
train_mem_model_zoo.py hdfs://localhost:9000/skt 128 10 /Users/wangjiao/git/ARMemNet-jennie/model
96 changes: 96 additions & 0 deletions train_mem_model_zoo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from zoo import init_nncontext
from zoo.tfpark import TFOptimizer, TFDataset
from bigdl.optim.optimizer import *
from data_utils import get_datasets_from_dir, get_datasets_from_dir_spark
from AR_mem.config import Config
from AR_mem.model import Model
from time import time
import tensorflow as tf
from zoo.common import set_core_number


if __name__ == "__main__":

data_path = sys.argv[1]
batch_size = int(sys.argv[2])
num_epochs = int(sys.argv[3])
model_dir = sys.argv[4]

# For tuning
if len(sys.argv) > 5:
core_num = int(sys.argv[5])
else:
core_num = 4
if len(sys.argv) > 6:
thread_num = int(sys.argv[6])
else:
thread_num = 10

config = Config()
config.data_path = data_path
config.latest_model=False
config.batch_size = batch_size

# init or get SparkContext
sc = init_nncontext()

# tuning
set_core_number(core_num)

# create train data
# train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \
# load_agg_selected_data_mem_train(data_path=config.data_path,
# x_len=config.x_len,
# y_len=config.y_len,
# foresight=config.foresight,
# cell_ids=config.train_cell_ids,
# dev_ratio=config.dev_ratio,
# test_len=config.test_len,
# seed=config.seed)

# config.batch_size is useless as we force get_datasets_from_dir return the entire data
# train_X, train_Y, train_M, valid_X, valid_Y, valid_M, _, _, _ =\
# get_datasets_from_dir(sc, config.data_path, config.batch_size,
# train_cells=config.num_cells_train,
# valid_cells=config.num_cells_valid,
# test_cells=config.num_cells_test)[0]
#
# dataset = TFDataset.from_ndarrays([train_X, train_M, train_Y], batch_size=batch_size,
# val_tensors=[valid_X, valid_M, valid_Y],)
#
# model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2])

# train_rdd, val_rdd, test_rdd = \
# get_datasets_from_dir_spark(sc, config.data_path, config.batch_size,
# train_cells=config.num_cells_train,
# valid_cells=config.num_cells_valid,
# test_cells=config.num_cells_test)

train_rdd, val_rdd, test_rdd = \
get_datasets_from_dir_spark(sc, config.data_path, config.batch_size,
train_cells=0.8,
valid_cells=0.2,
test_cells=0)

dataset = TFDataset.from_rdd(train_rdd,
features=[(tf.float32, [10, 8]), (tf.float32, [77, 8])],
labels=(tf.float32, [8]),
batch_size=config.batch_size,
val_rdd=val_rdd)

model = Model(config, dataset.tensors[0][0], dataset.tensors[0][1], dataset.tensors[1])

optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr),
metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae},
model_dir=model_dir,
session_config=tf.ConfigProto(inter_op_parallelism_threads=1,
intra_op_parallelism_threads=thread_num)
)

start_time = time()
optimizer.optimize(end_trigger=MaxEpoch(num_epochs))
end_time = time()

print("Elapsed training time {} secs".format(end_time - start_time))