Skip to content
This repository has been archived by the owner on May 10, 2023. It is now read-only.

Analytics zoo training in Python #11

Open
wants to merge 17 commits into
base: analytics-zoo
Choose a base branch
from
14 changes: 10 additions & 4 deletions ARMem/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

# AR_memory
class Model(object):
def __init__(self, config):
def __init__(self, config, input_x=None, memories=None, targets=None):
self.config = config
self.global_step = tf.Variable(0, trainable=False, name="global_step")
self.regularizer = layers.l2_regularizer(self.config.l2_lambda)
self.sess = None
self.saver = None
self.input_x = input_x
self.memories = memories
self.targets = targets
self._build_model()

def _build_model(self):
Expand Down Expand Up @@ -54,9 +57,12 @@ def _build_model(self):
self.initialize_session()

def add_placeholder(self):
self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x")
self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets")
self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32,
if self.input_x is None:
self.input_x = tf.placeholder(shape=[None, self.config.nsteps, self.config.nfeatures],dtype=tf.float32, name="x")
if self.targets is None:
self.targets = tf.placeholder(shape=[None, self.config.nfeatures], dtype=tf.float32, name="targets")
if self.memories is None:
self.memories = tf.placeholder(shape=[None, (self.config.nsteps+1) * self.config.msteps, self.config.nfeatures], dtype=tf.float32,
name="memories")
# self.targets = tf.placeholder(shape=[None], dtype=tf.int32, name="targets")
self.dropout = tf.placeholder(dtype=tf.float32, name="dropout")
Expand Down
4 changes: 2 additions & 2 deletions flashbase-ml-pipeline/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
</dependency>
<dependency>
<groupId>com.intel.analytics.zoo</groupId>
<artifactId>analytics-zoo-bigdl_0.8.0-spark_${spark.version}</artifactId>
<version>0.5.1</version>
<artifactId>analytics-zoo-bigdl_0.9.1-spark_${spark.version}</artifactId>
<version>0.6.0</version>
<scope>provided</scope>
</dependency>

Expand Down
5 changes: 4 additions & 1 deletion run_inference_mem_model_zoo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ if [ -z "${ANALYTICS_ZOO_HOME}" ]; then
fi

# bash $ANALYTICS_ZOO_HOME/bin/spark-submit-with-zoo.sh --master local[4] inference_mem_model_zoo.py
bash $ANALYTICS_ZOO_HOME/bin/spark-submit-with-zoo.sh --master local[36] --driver-memory 32g inference_mem_model_zoo.py
bash $ANALYTICS_ZOO_HOME/bin/spark-submit-python-with-zoo.sh \
--master local[36] \
--driver-memory 32g \
inference_mem_model_zoo.py
5 changes: 5 additions & 0 deletions run_train_mem_model_zoo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

${ANALYTICS_ZOO_HOME}/bin/spark-submit-python-with-zoo.sh \
--master local[4] \
--driver-memory 20g \
train_mem_model_zoo.py /home/jwang/git/ARMemNet-BigDL_jennie/data/aggregated_5min_scaled.csv 2700 1000
4 changes: 1 addition & 3 deletions scala-inference/bin/run-scala-inference.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,5 @@ fi
full_path=$(realpath $0)
dir_path=$(dirname $full_path)

bash $dir_path/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 false


bash $ANALYTICS_ZOO_HOME/bin/spark-submit-scala-with-zoo.sh --driver-memory 20g --class Main target/mem-inference-1.0-SNAPSHOT-jar-with-dependencies.jar ../tfnet ../data/test_x.npy ../data/test_m.npy 65536 false

41 changes: 41 additions & 0 deletions train_mem_model_zoo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from zoo import init_nncontext
from zoo.tfpark import TFOptimizer, TFDataset
from bigdl.optim.optimizer import *
from data_utils import load_agg_selected_data_mem
from ARMem.config import Config
from ARMem.model import Model


if __name__ == "__main__":

data_path = sys.argv[1]
batch_size = int(sys.argv[2])
num_epochs = int(sys.argv[3])

config = Config()
config.data_path = data_path
config.latest_model=False

# init or get SparkContext
sc = init_nncontext()

# create train data
train_x, dev_x, test_x, train_y, dev_y, test_y, train_m, dev_m, test_m, test_dt = \
load_agg_selected_data_mem(data_path=config.data_path,
x_len=config.x_len,
y_len=config.y_len,
foresight=config.foresight,
cell_ids=config.test_cell_ids,
dev_ratio=config.dev_ratio,
test_len=config.test_len,
seed=config.seed)

model_dir = config.model_dir

dataset = TFDataset.from_ndarrays([train_x, train_m, train_y], batch_size=batch_size, val_tensors=[dev_x, dev_m, dev_y],)

model = Model(config, dataset.tensors[0], dataset.tensors[1], dataset.tensors[2])
optimizer = TFOptimizer.from_loss(model.loss, Adam(config.lr), metrics={"rse": model.rse, "smape": model.smape, "mae": model.mae})

optimizer.optimize(end_trigger=MaxEpoch(num_epochs))