Skip to content

Commit

Permalink
add components to create user clustered matrix factorizarion
Browse files Browse the repository at this point in the history
  • Loading branch information
ukclivecox committed Mar 26, 2017
1 parent 6de67e9 commit ff28aa2
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 3 deletions.
34 changes: 34 additions & 0 deletions docker/seldon-control/seldon.conf.in
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
"config": [],
"zk_activate_node": "/config/mf"
},
"mfUserClustersRecommender": {
"config": [],
"zk_activate_node": "/config/mfclusters"
},
"mostPopularRecommender": {
"config": []
},
Expand Down Expand Up @@ -97,6 +101,36 @@
"job_type": "spark"
}
},
"matrix-factorization-clusters": {
"config": {
"activate": true,
"alpha": 1,
"days": 1,
"inputPath": "%SELDON_MODELS%",
"iterations": 5,
"lambda": 0.01,
"outputPath": "%SELDON_MODELS%",
"rank": 30,
"startDay": 1
},
"training": {
"job_info": {
"cmd": "%SPARK_HOME%/bin/spark-submit",
"cmd_args": [
"--class",
"io.seldon.spark.mllib.MfUserClusters",
"--master",
"spark://spark-master:7077",
"%SELDON_SPARK_HOME%/seldon-spark-%SELDON_VERSION%-jar-with-dependencies.jar",
"--client",
"%CLIENT_NAME%",
"--zookeeper",
"%ZK_HOSTS%"
]
},
"job_type": "spark"
}
},
"semvec": {},
"similar-items": {
"config": {
Expand Down
6 changes: 6 additions & 0 deletions kubernetes/conf/models/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ jobs/matrix-factorization-${CLIENT}-${DAY}.json:
mkdir -p jobs
cat matrix-factorization.json.in | sed -e 's|%CLIENT%|$(CLIENT)|' | sed -e 's|%DAY%|$(DAY)|' > jobs/matrix-factorization-${CLIENT}-${DAY}.json

jobs/matrix-factorization-clusters-${CLIENT}-${DAY}.json:
mkdir -p jobs
cat matrix-factorization-clusters.json.in | sed -e 's|%CLIENT%|$(CLIENT)|' | sed -e 's|%DAY%|$(DAY)|' > jobs/matrix-factorization-clusters-${CLIENT}-${DAY}.json

jobs/item-similarity-${CLIENT}-${DAY}.json:
mkdir -p jobs
cat item-similarity.json.in | sed -e 's|%CLIENT%|$(CLIENT)|' | sed -e 's|%DAY%|$(DAY)|' > jobs/item-similarity-${CLIENT}-${DAY}.json
Expand All @@ -24,6 +28,8 @@ jobs/stream-itemsim-dbupload-${CLIENT}.json:

matrix-factorization:jobs/matrix-factorization-${CLIENT}-${DAY}.json

matrix-factorization-clusters:jobs/matrix-factorization-clusters-${CLIENT}-${DAY}.json

item-similarity:jobs/item-similarity-${CLIENT}-${DAY}.json

streaming-itemsim:jobs/stream-itemsim-create-${CLIENT}.json jobs/stream-itemsim-dbupload-${CLIENT}.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class MfUserClusters(private val sc : SparkContext,config : UCMfConfig) {
val curator = new ZkCuratorHandler(zkServer)
if(curator.getCurator.getZookeeperClient.blockUntilConnectedOrTimedOut())
{
val zkPath = "/all_clients/"+client+"/mf"
val zkPath = "/all_clients/"+client+"/mfclusters"
val ensurePath = new EnsurePath(zkPath)
ensurePath.ensure(curator.getCurator.getZookeeperClient)
curator.getCurator.setData().forPath(zkPath,(outputFilesLocation+date).getBytes())
Expand Down Expand Up @@ -297,7 +297,7 @@ object MfUserClusters {
if (config.zkHosts.nonEmpty)
{
val curator = new ZkCuratorHandler(config.zkHosts)
val path = "/all_clients/"+config.client+"/offline/matrix-factorization"
val path = "/all_clients/"+config.client+"/offline/matrix-factorization-clusters"
if (curator.getCurator.checkExists().forPath(path) != null)
{
val bytes = curator.getCurator.getData().forPath(path)
Expand Down
26 changes: 26 additions & 0 deletions python/seldon/luigi/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,29 @@ def run(self):
params = ["seldon-cli","model","--action","train","--client-name",self.client,"--model-name","matrix-factorization"]
res = call(params)
return res


class SeldonMatrixFactorizationClusters(luigi.Task):
"""
User Clustered Matrix factorization using Spark
"""
inputPath = luigi.Parameter(default="/seldon-data/seldon-models/")
outputPath = luigi.Parameter(default="/seldon-data/seldon-models/")
client = luigi.Parameter(default="test")
startDay = luigi.IntParameter(default=1)
days = luigi.IntParameter(default=1)

rank = luigi.IntParameter(default=30)
mf_lambda = luigi.FloatParameter(default=0.01)
alpha = luigi.FloatParameter(default=1)
iterations = luigi.IntParameter(default=5)

def output(self):
return luigi.LocalTarget("{}/{}/matrix-factorization-clusters/{}".format(self.outputPath,self.client,self.startDay))

def run(self):
params = ["seldon-cli","model","--action","add","--client-name",self.client,"--model-name","matrix-factorization-clusters","--inputPath",self.inputPath,"--outputPath",self.outputPath,"--startDay",str(self.startDay),"--days",str(self.days),"--rank",str(self.rank),"--lambda",str(self.mf_lambda),"--alpha",str(self.alpha),"--iterations",str(self.iterations)]
res = call(params)
params = ["seldon-cli","model","--action","train","--client-name",self.client,"--model-name","matrix-factorization-clusters"]
res = call(params)
return res
2 changes: 1 addition & 1 deletion server/src/io/seldon/mf/MfUserClustersModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class MfUserClustersModelManager extends ModelManager<MfUserClustersMode

private static Logger logger = Logger.getLogger(MfUserClustersModelManager.class.getName());
private final ExternalResourceStreamer featuresFileHandler;
public static final String LOC_PATTERN = "mfcluster";
public static final String LOC_PATTERN = "mfclusters";

@Autowired
public MfUserClustersModelManager(ExternalResourceStreamer featuresFileHandler,
Expand Down

0 comments on commit ff28aa2

Please sign in to comment.