From ff28aa231780a13e65d6610f7ff3c313e04fca62 Mon Sep 17 00:00:00 2001 From: Clive Cox Date: Sun, 26 Mar 2017 20:39:35 +0100 Subject: [PATCH] add components to create user clustered matrix factorizarion --- docker/seldon-control/seldon.conf.in | 34 +++++++++++++++++++ kubernetes/conf/models/Makefile | 6 ++++ .../seldon/spark/mllib/MfUserClusters.scala | 4 +-- python/seldon/luigi/spark.py | 26 ++++++++++++++ .../seldon/mf/MfUserClustersModelManager.java | 2 +- 5 files changed, 69 insertions(+), 3 deletions(-) diff --git a/docker/seldon-control/seldon.conf.in b/docker/seldon-control/seldon.conf.in index 92b84db2..5334e92e 100644 --- a/docker/seldon-control/seldon.conf.in +++ b/docker/seldon-control/seldon.conf.in @@ -41,6 +41,10 @@ "config": [], "zk_activate_node": "/config/mf" }, + "mfUserClustersRecommender": { + "config": [], + "zk_activate_node": "/config/mfclusters" + }, "mostPopularRecommender": { "config": [] }, @@ -97,6 +101,36 @@ "job_type": "spark" } }, + "matrix-factorization-clusters": { + "config": { + "activate": true, + "alpha": 1, + "days": 1, + "inputPath": "%SELDON_MODELS%", + "iterations": 5, + "lambda": 0.01, + "outputPath": "%SELDON_MODELS%", + "rank": 30, + "startDay": 1 + }, + "training": { + "job_info": { + "cmd": "%SPARK_HOME%/bin/spark-submit", + "cmd_args": [ + "--class", + "io.seldon.spark.mllib.MfUserClusters", + "--master", + "spark://spark-master:7077", + "%SELDON_SPARK_HOME%/seldon-spark-%SELDON_VERSION%-jar-with-dependencies.jar", + "--client", + "%CLIENT_NAME%", + "--zookeeper", + "%ZK_HOSTS%" + ] + }, + "job_type": "spark" + } + }, "semvec": {}, "similar-items": { "config": { diff --git a/kubernetes/conf/models/Makefile b/kubernetes/conf/models/Makefile index 86ab7852..6a46bcab 100644 --- a/kubernetes/conf/models/Makefile +++ b/kubernetes/conf/models/Makefile @@ -9,6 +9,10 @@ jobs/matrix-factorization-${CLIENT}-${DAY}.json: mkdir -p jobs cat matrix-factorization.json.in | sed -e 's|%CLIENT%|$(CLIENT)|' | sed -e 's|%DAY%|$(DAY)|' > jobs/matrix-factorization-${CLIENT}-${DAY}.json +jobs/matrix-factorization-clusters-${CLIENT}-${DAY}.json: + mkdir -p jobs + cat matrix-factorization-clusters.json.in | sed -e 's|%CLIENT%|$(CLIENT)|' | sed -e 's|%DAY%|$(DAY)|' > jobs/matrix-factorization-clusters-${CLIENT}-${DAY}.json + jobs/item-similarity-${CLIENT}-${DAY}.json: mkdir -p jobs cat item-similarity.json.in | sed -e 's|%CLIENT%|$(CLIENT)|' | sed -e 's|%DAY%|$(DAY)|' > jobs/item-similarity-${CLIENT}-${DAY}.json @@ -24,6 +28,8 @@ jobs/stream-itemsim-dbupload-${CLIENT}.json: matrix-factorization:jobs/matrix-factorization-${CLIENT}-${DAY}.json +matrix-factorization-clusters:jobs/matrix-factorization-clusters-${CLIENT}-${DAY}.json + item-similarity:jobs/item-similarity-${CLIENT}-${DAY}.json streaming-itemsim:jobs/stream-itemsim-create-${CLIENT}.json jobs/stream-itemsim-dbupload-${CLIENT}.json diff --git a/offline-jobs/spark/src/main/scala/io/seldon/spark/mllib/MfUserClusters.scala b/offline-jobs/spark/src/main/scala/io/seldon/spark/mllib/MfUserClusters.scala index e60b6706..65e5f2cf 100644 --- a/offline-jobs/spark/src/main/scala/io/seldon/spark/mllib/MfUserClusters.scala +++ b/offline-jobs/spark/src/main/scala/io/seldon/spark/mllib/MfUserClusters.scala @@ -244,7 +244,7 @@ class MfUserClusters(private val sc : SparkContext,config : UCMfConfig) { val curator = new ZkCuratorHandler(zkServer) if(curator.getCurator.getZookeeperClient.blockUntilConnectedOrTimedOut()) { - val zkPath = "/all_clients/"+client+"/mf" + val zkPath = "/all_clients/"+client+"/mfclusters" val ensurePath = new EnsurePath(zkPath) ensurePath.ensure(curator.getCurator.getZookeeperClient) curator.getCurator.setData().forPath(zkPath,(outputFilesLocation+date).getBytes()) @@ -297,7 +297,7 @@ object MfUserClusters { if (config.zkHosts.nonEmpty) { val curator = new ZkCuratorHandler(config.zkHosts) - val path = "/all_clients/"+config.client+"/offline/matrix-factorization" + val path = "/all_clients/"+config.client+"/offline/matrix-factorization-clusters" if (curator.getCurator.checkExists().forPath(path) != null) { val bytes = curator.getCurator.getData().forPath(path) diff --git a/python/seldon/luigi/spark.py b/python/seldon/luigi/spark.py index 891907f6..94154999 100644 --- a/python/seldon/luigi/spark.py +++ b/python/seldon/luigi/spark.py @@ -82,3 +82,29 @@ def run(self): params = ["seldon-cli","model","--action","train","--client-name",self.client,"--model-name","matrix-factorization"] res = call(params) return res + + +class SeldonMatrixFactorizationClusters(luigi.Task): + """ + User Clustered Matrix factorization using Spark + """ + inputPath = luigi.Parameter(default="/seldon-data/seldon-models/") + outputPath = luigi.Parameter(default="/seldon-data/seldon-models/") + client = luigi.Parameter(default="test") + startDay = luigi.IntParameter(default=1) + days = luigi.IntParameter(default=1) + + rank = luigi.IntParameter(default=30) + mf_lambda = luigi.FloatParameter(default=0.01) + alpha = luigi.FloatParameter(default=1) + iterations = luigi.IntParameter(default=5) + + def output(self): + return luigi.LocalTarget("{}/{}/matrix-factorization-clusters/{}".format(self.outputPath,self.client,self.startDay)) + + def run(self): + params = ["seldon-cli","model","--action","add","--client-name",self.client,"--model-name","matrix-factorization-clusters","--inputPath",self.inputPath,"--outputPath",self.outputPath,"--startDay",str(self.startDay),"--days",str(self.days),"--rank",str(self.rank),"--lambda",str(self.mf_lambda),"--alpha",str(self.alpha),"--iterations",str(self.iterations)] + res = call(params) + params = ["seldon-cli","model","--action","train","--client-name",self.client,"--model-name","matrix-factorization-clusters"] + res = call(params) + return res diff --git a/server/src/io/seldon/mf/MfUserClustersModelManager.java b/server/src/io/seldon/mf/MfUserClustersModelManager.java index 90f430ac..f66bb028 100644 --- a/server/src/io/seldon/mf/MfUserClustersModelManager.java +++ b/server/src/io/seldon/mf/MfUserClustersModelManager.java @@ -45,7 +45,7 @@ public class MfUserClustersModelManager extends ModelManager