diff --git a/cluster_all.sh b/cluster_all.sh new file mode 100644 index 0000000..d4df88b --- /dev/null +++ b/cluster_all.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +DATA_DIR=~/datasets/seed_datasets_current +SCHEMA=/datasetDoc.json +OUTPUT_DATA=clusters/clusters.csv +OUTPUT_SCHEMA=clustersDatasetDoc.json +DATASET_FOLDER_SUFFIX=_dataset +DATASETS=(26_radon_seed 32_wikiqa 60_jester 185_baseball 196_autoMpg 313_spectrometer 38_sick 1491_one_hundred_plants_margin 27_wordLevels 57_hypothyroid 299_libras_move 534_cps_85_wages 1567_poker_hand 22_handgeometry) +HAS_HEADER=1 +CLUSTER_FUNCTION=fileupload +REST_ENDPOINT=HTTP://localhost:5000 +DATA_SERVER=HTTP://10.108.4.104 + +for DATASET in "${DATASETS[@]}" +do + echo "--------------------------------------------------------------------------------" + echo " Clustering $DATASET dataset" + echo "--------------------------------------------------------------------------------" + go run cmd/distil-cluster/main.go \ + --rest-endpoint="$REST_ENDPOINT" \ + --cluster-function="$CLUSTER_FUNCTION" \ + --dataset="$DATA_DIR/${DATASET}/TRAIN/dataset_TRAIN" \ + --media-path="$DATA_SERVER/${DATASET}" \ + --schema="$DATA_DIR/${DATASET}/TRAIN/dataset_TRAIN/$SCHEMA" \ + --output="$DATA_DIR/${DATASET}/TRAIN/dataset_TRAIN" \ + --output-data="$OUTPUT_DATA" \ + --output-schema="$OUTPUT_SCHEMA" \ + --has-header=$HAS_HEADER +done diff --git a/cmd/distil-cluster/main.go b/cmd/distil-cluster/main.go new file mode 100644 index 0000000..9534858 --- /dev/null +++ b/cmd/distil-cluster/main.go @@ -0,0 +1,158 @@ +package main + +import ( + "os" + "path" + "runtime" + "strings" + + "github.com/pkg/errors" + "github.com/unchartedsoftware/plog" + "github.com/urfave/cli" + + "github.com/unchartedsoftware/distil-ingest/feature" + "github.com/unchartedsoftware/distil-ingest/metadata" + "github.com/unchartedsoftware/distil-ingest/rest" +) + +func splitAndTrim(arg string) []string { + var res []string + if arg == "" { + return res + } + split := strings.Split(arg, ",") + for _, str := range split { + res = append(res, strings.TrimSpace(str)) + } + return res +} + +func main() { + + runtime.GOMAXPROCS(runtime.NumCPU()) + + app := cli.NewApp() + app.Name = "distil-cluster" + app.Version = "0.1.0" + app.Usage = "Cluster D3M datasets" + app.UsageText = "distil-cluster --rest-endpoint= --cluster-function= --dataset= --output=" + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "rest-endpoint", + Value: "", + Usage: "The REST endpoint url", + }, + cli.StringFlag{ + Name: "cluster-function", + Value: "", + Usage: "The clustering function to use", + }, + cli.StringFlag{ + Name: "dataset", + Value: "", + Usage: "The dataset source path", + }, + cli.StringFlag{ + Name: "schema", + Value: "", + Usage: "The schema source path", + }, + cli.StringFlag{ + Name: "filetype", + Value: "csv", + Usage: "The dataset file type", + }, + cli.StringFlag{ + Name: "output", + Value: "", + Usage: "The clustering output file path", + }, + cli.StringFlag{ + Name: "media-path", + Value: "", + Usage: "The path to the folder containing the media subfolder that is accessible for clustering", + }, + cli.StringFlag{ + Name: "output-schema", + Value: "", + Usage: "The path to use as output for the clustered schema document", + }, + cli.StringFlag{ + Name: "output-data", + Value: "", + Usage: "The path to use as output for the clustered data", + }, + cli.BoolFlag{ + Name: "has-header", + Usage: "Whether or not the CSV file has a header row", + }, + } + app.Action = func(c *cli.Context) error { + if c.String("rest-endpoint") == "" { + return cli.NewExitError("missing commandline flag `--rest-endpoint`", 1) + } + if c.String("cluster-function") == "" { + return cli.NewExitError("missing commandline flag `--cluster-function`", 1) + } + if c.String("dataset") == "" { + return cli.NewExitError("missing commandline flag `--dataset`", 1) + } + + clusterFunction := c.String("cluster-function") + restBaseEndpoint := c.String("rest-endpoint") + datasetPath := c.String("dataset") + mediaPath := c.String("media-path") + outputSchema := c.String("output-schema") + outputData := c.String("output-data") + schemaPath := c.String("schema") + outputFilePath := c.String("output") + hasHeader := c.Bool("has-header") + + // initialize REST client + log.Infof("Using REST interface at `%s` ", restBaseEndpoint) + client := rest.NewClient(restBaseEndpoint) + + // create feature folder + clusterPath := path.Join(outputFilePath, "clusters") + if dirExists(clusterPath) { + // delete existing data to overwrite with latest + os.RemoveAll(clusterPath) + log.Infof("Deleted data at %s", clusterPath) + } + if err := os.MkdirAll(clusterPath, 0777); err != nil && !os.IsExist(err) { + log.Errorf("%v", err) + return cli.NewExitError(errors.Cause(err), 2) + } + os.Remove(path.Join(outputFilePath, "clusterDatasetDoc.json")) + + // create featurizer + featurizer := rest.NewFeaturizer(clusterFunction, client) + + // load metadata from original schema + meta, err := metadata.LoadMetadataFromOriginalSchema(schemaPath) + if err != nil { + log.Errorf("%v", err) + return cli.NewExitError(errors.Cause(err), 2) + } + + // featurize data + err = feature.ClusterDataset(meta, featurizer, datasetPath, mediaPath, outputFilePath, outputData, outputSchema, hasHeader) + if err != nil { + log.Errorf("%v", err) + return cli.NewExitError(errors.Cause(err), 2) + } + + log.Infof("Clustered data written to %s", outputFilePath) + + return nil + } + // run app + app.Run(os.Args) +} + +func dirExists(path string) bool { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + return true +} diff --git a/feature/cluster.go b/feature/cluster.go new file mode 100644 index 0000000..f91e3ce --- /dev/null +++ b/feature/cluster.go @@ -0,0 +1,146 @@ +package feature + +import ( + "bytes" + "encoding/csv" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strconv" + + "github.com/pkg/errors" + "github.com/unchartedsoftware/plog" + + "github.com/unchartedsoftware/distil-ingest/metadata" + "github.com/unchartedsoftware/distil-ingest/rest" +) + +// ClusterDataset clusters data based on referenced data resources +// in the metadata. The clusters are added as a variable in the metadata. +func ClusterDataset(meta *metadata.Metadata, imageFeaturizer *rest.Featurizer, sourcePath string, mediaPath string, outputFolder string, outputPathData string, outputPathSchema string, hasHeader bool) error { + // find the main data resource + mainDR := meta.GetMainDataResource() + + // cluster image columns + log.Infof("adding clusters to schema") + colsToFeaturize := addFeaturesToSchema(meta, mainDR, "_cluster_") + + // read the data to process every row + log.Infof("opening data from source") + dataPath := path.Join(sourcePath, mainDR.ResPath) + csvFile, err := os.Open(dataPath) + if err != nil { + return errors.Wrap(err, "failed to open data file") + } + defer csvFile.Close() + reader := csv.NewReader(csvFile) + + // initialize csv writer + output := &bytes.Buffer{} + writer := csv.NewWriter(output) + + // write the header as needed + if hasHeader { + header := make([]string, len(mainDR.Variables)) + for _, v := range mainDR.Variables { + header[v.Index] = v.Name + } + err = writer.Write(header) + if err != nil { + return errors.Wrap(err, "error writing header to output") + } + _, err = reader.Read() + if err != nil { + return errors.Wrap(err, "failed to read header from file") + } + } + + // build the list of files to submit for clustering + files := make([]string, 0) + lines := make([][]string, 0) + log.Infof("reading data from source") + for { + line, err := reader.Read() + if err == io.EOF { + break + } else if err != nil { + return errors.Wrap(err, "failed to read line from file") + } + lines = append(lines, line) + + // featurize the row as necessary + for index, colDR := range colsToFeaturize { + imagePath := fmt.Sprintf("%s/%s", mediaPath, path.Join(colDR.originalResPath, line[index])) + files = append(files, imagePath) + } + } + + // cluster the files + log.Infof("Clustering data with featurizer") + clusteredImages, err := clusterImages(files, imageFeaturizer) + if err != nil { + return errors.Wrap(err, "failed to cluster images using featurizer") + } + + // append and output the new clustered data + log.Infof("Adding cluster labels to source data") + for _, l := range lines { + for index, colDR := range colsToFeaturize { + imagePath := fmt.Sprintf("%s/%s", mediaPath, path.Join(colDR.originalResPath, l[index])) + l = append(l, clusteredImages[imagePath]) + } + + writer.Write(l) + if err != nil { + return errors.Wrap(err, "error storing featured output") + } + } + + // output the data + log.Infof("Writing data to output") + dataPathToWrite := path.Join(outputFolder, outputPathData) + writer.Flush() + err = ioutil.WriteFile(dataPathToWrite, output.Bytes(), 0644) + if err != nil { + return errors.Wrap(err, "error writing feature output") + } + + // main DR should point to new file + mainDR.ResPath = outputPathData + + // output the schema + log.Infof("Writing schema to output") + schemaPathToWrite := path.Join(outputFolder, outputPathSchema) + err = meta.WriteSchema(schemaPathToWrite) + + return err +} + +func clusterImages(filepaths []string, featurizer *rest.Featurizer) (map[string]string, error) { + feature, err := featurizer.ClusterImages(filepaths) + if err != nil { + return nil, errors.Wrap(err, "failed to cluster images") + } + + preds, ok := feature.Image["pred_class"].(map[string]interface{}) + if !ok { + return nil, errors.Errorf("image feature objects in unexpected format") + } + + clusters := make(map[string]string) + for i, c := range preds { + index, err := strconv.ParseInt(i, 10, 64) + if err != nil { + return nil, errors.Wrap(err, "failed parse file index") + } + cluster, ok := c.(float64) + if !ok { + return nil, errors.Errorf("failed to parse file cluster") + } + clusters[filepaths[index]] = strconv.Itoa(int(cluster)) + } + + return clusters, nil +} diff --git a/feature/image.go b/feature/image.go index 3422874..32d0108 100644 --- a/feature/image.go +++ b/feature/image.go @@ -42,7 +42,7 @@ func FeaturizeDataset(meta *metadata.Metadata, imageFeaturizer *rest.Featurizer, // featurize image columns log.Infof("adding features to schema") - colsToFeaturize := addFeaturesToSchema(meta, mainDR) + colsToFeaturize := addFeaturesToSchema(meta, mainDR, "_feature_") // read the data to process every row log.Infof("opening data from source") @@ -68,10 +68,6 @@ func FeaturizeDataset(meta *metadata.Metadata, imageFeaturizer *rest.Featurizer, if err != nil { return errors.Wrap(err, "error writing header to output") } - } - - // skip header - if hasHeader { _, err = reader.Read() if err != nil { return errors.Wrap(err, "failed to read header from file") @@ -125,7 +121,7 @@ func FeaturizeDataset(meta *metadata.Metadata, imageFeaturizer *rest.Featurizer, return err } -func addFeaturesToSchema(meta *metadata.Metadata, mainDR *metadata.DataResource) map[int]*potentialFeature { +func addFeaturesToSchema(meta *metadata.Metadata, mainDR *metadata.DataResource, namePrefix string) map[int]*potentialFeature { colsToFeaturize := make(map[int]*potentialFeature) for _, v := range mainDR.Variables { if v.RefersTo != nil && v.RefersTo["resID"] != nil { @@ -137,7 +133,7 @@ func addFeaturesToSchema(meta *metadata.Metadata, mainDR *metadata.DataResource) // check if needs to be featurized if res.CanBeFeaturized() { // create the new resource to hold the featured output - indexName := fmt.Sprintf("_feature_%s", v.Name) + indexName := fmt.Sprintf("%s%s", namePrefix, v.Name) // add the feature variable refVariable := &metadata.Variable{ diff --git a/featurize_all.sh b/featurize_all.sh index 8eec6b1..1124ffa 100644 --- a/featurize_all.sh +++ b/featurize_all.sh @@ -1,7 +1,7 @@ #!/bin/bash DATA_DIR=~/datasets/seed_datasets_current -SCHEMA=/datasetDoc.json +SCHEMA=/clustersDatasetDoc.json OUTPUT_DATA=features/features.csv OUTPUT_SCHEMA=featuresDatasetDoc.json DATASET_FOLDER_SUFFIX=_dataset diff --git a/rest/client.go b/rest/client.go index 1b75eb9..cd6bdf6 100644 --- a/rest/client.go +++ b/rest/client.go @@ -2,6 +2,7 @@ package rest import ( "bytes" + "encoding/json" "fmt" "io" "io/ioutil" @@ -118,3 +119,35 @@ func (c *Client) PostRequest(function string, params map[string]string) ([]byte, return result, nil } + +// PostRequestRaw submits a post request with the provided parameters +// submitted as a raw string. +func (c *Client) PostRequestRaw(function string, params map[string]interface{}) ([]byte, error) { + url := fmt.Sprintf("%s/%s", c.baseEndpoint, function) + b, err := json.Marshal(params) + if err != nil { + return nil, errors.Wrap(err, "unable to marshal parameters") + } + + // interface requires double marshalling to have a raw string + b, err = json.Marshal(string(b)) + if err != nil { + return nil, errors.Wrap(err, "unable to marshal (*2) parameters") + } + + res, err := http.Post(url, "application/json", bytes.NewReader(b)) + if err != nil { + return nil, errors.Wrap(err, "Unable to post request") + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("bad status: %s", res.Status) + } + + result, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, errors.Wrap(err, "Unable to read result") + } + + return result, nil +} diff --git a/rest/image.go b/rest/image.go index 4c522b9..edf176b 100644 --- a/rest/image.go +++ b/rest/image.go @@ -2,10 +2,15 @@ package rest import ( "encoding/json" + "fmt" "github.com/pkg/errors" ) +const ( + minClusterCount = 5 +) + // ImageResult represents a REST image feature result. type ImageResult struct { Image map[string]interface{} `json:"image"` @@ -45,3 +50,36 @@ func (f *Featurizer) FeaturizeImage(filename string) (*ImageResult, error) { Image: imageData, }, nil } + +// ClusterImages places images into similar clusters. +func (f *Featurizer) ClusterImages(filenames []string) (*ImageResult, error) { + if len(filenames) < minClusterCount { + imageClusters := make(map[string]interface{}) + for i := range filenames { + imageClusters[fmt.Sprintf("%d", i)] = i + } + images := &ImageResult{ + Image: make(map[string]interface{}), + } + images.Image["pred_class"] = imageClusters + return images, nil + } + + params := map[string]interface{}{ + "image_paths": filenames, + } + result, err := f.client.PostRequestRaw(f.functionName, params) + if err != nil { + return nil, errors.Wrap(err, "Unable to cluster file") + } + + // response is a json of objects and text found in the image + imageData := make(map[string]interface{}, 0) + err = json.Unmarshal(result, &imageData) + if err != nil { + return nil, errors.Wrap(err, "Unable to unmarshal image response") + } + return &ImageResult{ + Image: imageData, + }, nil +}