Skip to content

Commit

Permalink
refactor, styling, and add some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Quantisan committed Feb 16, 2013
1 parent fa7a9e5 commit 03d07ac
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 126 deletions.
9 changes: 5 additions & 4 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
:uberjar-name "copa.jar"
:aot [copa.core]
:main copa.core
:min-lein-version "2.0.0"
:source-paths ["src/main/clj"]
:dependencies [[org.clojure/clojure "1.4.0"]
[cascalog "1.10.0"]
[cascalog "1.10.1-SNAPSHOT"]
[cascalog-more-taps "0.3.1-SNAPSHOT"]
[clojure-csv/clojure-csv "1.3.2"]
[clojure-csv/clojure-csv "2.0.0-alpha2"]
[org.clojars.sunng/geohash "1.0.1"]
[org.clojure/clojure-contrib "1.2.0"]
[date-clj "1.0.1"] ]
[date-clj "1.0.1"]]
:exclusions [org.clojure/clojure]
:profiles {:dev {:dependencies [[midje-cascalog "0.4.0"]]}
:provided {:dependencies [[org.apache.hadoop/hadoop-core "0.20.2-dev"]]}})
252 changes: 131 additions & 121 deletions src/main/clj/copa/core.clj
Original file line number Diff line number Diff line change
@@ -1,121 +1,110 @@
(ns copa.core
(:use [cascalog.api]
[cascalog.more-taps :only (hfs-delimited)]
[clojure.contrib.generic.math-functions]
[date-clj])
(:require [clojure.string :as s]
[cascalog [ops :as c] [vars :as v]]
[cascalog [ops :as c]]
[clojure-csv.core :as csv]
[geohash.core :as geo])
(:gen-class))

(defn parse-gis [line]
"leverages parse-csv to parse complex CSV format in the (unclean) GIS export"
(first (csv/parse-csv line)))
(def parse-csv
"parse complex CSV format in the unclean GIS export"
(comp first csv/parse-csv))

(defn etl-gis [gis trap]
"subquery to parse data sets from the GIS source tap"
(defn load-gis
"Parse GIS csv data"
[in trap]
(<- [?blurb ?misc ?geo ?kind]
(gis ?line)
(parse-gis ?line :> ?blurb ?misc ?geo ?kind)
((hfs-textline in) ?line)
(parse-csv ?line :> ?blurb ?misc ?geo ?kind) ;; hfs-delimited is not forgiving
(:trap (hfs-textline trap))))

(defn avg [a b]
"calculates the average of two decimals"
(/ (+ (read-string a) (read-string b)) 2.0))
(defn avg [& more]
"calculates the average of given numbers"
(div (apply + more) (count more)))

(defn geohash [lat lng]
"calculates a geohash, at a common resolution"
(geo/encode lat lng 6))

(defn parse-tree [misc]
(defn re-seq-chunks [pattern s]
(rest (first (re-seq pattern s))))

(def parse-tree
"parses the special fields in the tree format"
(let [x (re-seq
#"^\s+Private\:\s+(\S+)\s+Tree ID\:\s+(\d+)\s+.*Situs Number\:\s+(\d+)\s+Tree Site\:\s+(\d+)\s+Species\:\s+(\S.*\S)\s+Source.*"
misc)]
(> (count x) 0)
(> (count (first x)) 1)
(first x))) ;; backflips to trap data quality issues in the GIS export

(defn geo-tree [geo]
(partial re-seq-chunks
#"^\s+Private\:\s+(\S+)\s+Tree ID\:\s+(\d+)\s+.*Situs Number\:\s+(\d+)\s+Tree Site\:\s+(\d+)\s+Species\:\s+(\S.*\S)\s+Source.*"))

(def geo-tree
"parses geolocation for tree format"
(let [x (re-seq #"^(\S+),(\S+),(\S+)\s*$" geo)]
(> (count x) 0)
(> (count (first x)) 1)
(first x))) ;; backflips to trap data quality issues in the GIS export
(partial re-seq-chunks #"^(\S+),(\S+),(\S+)\s*$"))

(def trees-fields ["?blurb" "?tree_id" "?situs" "?tree_site" "?species" "?wikipedia"
"?calflora" "?avg_height" "?tree_lat" "?tree_lng" "?tree_alt" "?geohash"])

(defn get-trees [src trap tree_meta]
(defn get-trees [src tree-meta trap]
"subquery to parse/filter the tree data"
(<- [?blurb ?tree_id ?situs ?tree_site
?species ?wikipedia ?calflora ?avg_height
?tree_lat ?tree_lng ?tree_alt ?geohash
]
(<- trees-fields
(src ?blurb ?misc ?geo ?kind)
(re-matches #"^\s+Private.*Tree ID.*" ?misc)
(parse-tree ?misc :> _ ?priv ?tree_id ?situs ?tree_site ?raw_species)
(parse-tree ?misc :> ?priv ?tree_id ?situs ?tree_site ?raw_species)
((c/comp s/trim s/lower-case) ?raw_species :> ?species)
(tree_meta ?species ?wikipedia ?calflora ?min_height ?max_height)
(tree-meta ?species ?wikipedia ?calflora ?min_height ?max_height)
(avg ?min_height ?max_height :> ?avg_height)
(geo-tree ?geo :> _ ?tree_lat ?tree_lng ?tree_alt)
(read-string ?tree_lat :> ?lat)
(read-string ?tree_lng :> ?lng)
(geo-tree ?geo :> ?tree_lat ?tree_lng ?tree_alt)
((c/each read-string) ?tree_lat ?tree_lng :> ?lat ?lng)
(geohash ?lat ?lng :> ?geohash)
(:trap (hfs-textline trap))))

(defn parse-road [misc]
(def parse-road
"parses the special fields in the road format"
(let [x (re-seq
#"^\s+Sequence.*Traffic Count\:\s+(\d+)\s+Traffic Index\:\s+(\w.*\w)\s+Traffic Class\:\s+(\w.*\w)\s+Traffic Date.*\s+Paving Length\:\s+(\d+)\s+Paving Width\:\s+(\d+)\s+Paving Area\:\s+(\d+)\s+Surface Type\:\s+(\w.*\w)\s+Surface Thickness.*\s+Overlay Year\:\s+(\d+)\s.*Bike Lane\:\s+(\w+)\s+Bus Route\:\s+(\w+)\s+Truck Route\:\s+(\w+)\s+Remediation.*$"
misc)]
(> (count x) 0)
(> (count (first x)) 1)
(first x))) ;; backflips to trap data quality issues in the GIS export
(partial re-seq-chunks
#"^\s+Sequence.*Traffic Count\:\s+(\d+)\s+Traffic Index\:\s+(\w.*\w)\s+Traffic Class\:\s+(\w.*\w)\s+Traffic Date.*\s+Paving Length\:\s+(\d+)\s+Paving Width\:\s+(\d+)\s+Paving Area\:\s+(\d+)\s+Surface Type\:\s+(\w.*\w)\s+Surface Thickness.*\s+Overlay Year\:\s+(\d+)\s.*Bike Lane\:\s+(\w+)\s+Bus Route\:\s+(\w+)\s+Truck Route\:\s+(\w+)\s+Remediation.*$"))

(defn estimate-albedo [overlay_year albedo_new albedo_worn]
"calculates an estimator for road albedo, based on road surface age"
(cond
(>= (read-string overlay_year) (- (year (today)) 10.0))
(read-string albedo_new)
:else
(read-string albedo_worn)))
(if (>= overlay_year (- (year (today)) 10.0)) ;; TODO use clj-time
albedo_new
albedo_worn))

(defmapcatop bigram [s]
"generator for bi-grams, from a space-separated list"
(partition 2 1 (s/split s #"\s")))

(defn point->coord
"Takes a point string returns [lng lat alt]"
[p]
(map read-string (s/split p #",")))

(defn midpoint [pt0 pt1]
"calculates the midpoint of two geolocation coordinates"
(let [l0 (s/split pt0 #",")
l1 (s/split pt1 #",")
lat0 (read-string (nth l0 1))
lng0 (read-string (nth l0 0))
alt0 (read-string (nth l0 2))
lat1 (read-string (nth l1 1))
lng1 (read-string (nth l1 0))
alt1 (read-string (nth l1 2)) ]
[ (/ (+ lat0 lat1) 2.0) (/ (+ lng0 lng1) 2.0) (/ (+ alt0 alt1) 2.0) ]))

(defn get-roads [src trap road_meta]
(let [[lng0 lat0 alt0] (point->coord pt0)
[lng1 lat1 alt1] (point->coord pt1)]
[(avg lat0 lat1) (avg lng0 lng1) (avg alt0 alt1)]))

(def roads-fields ["?road_name" "?bike_lane" "?bus_route" "?truck_route" "?albedo"
"?road_lat" "?road_lng" "?road_alt" "?geohash"
"?traffic_count" "?traffic_index" "?traffic_class"
"?paving_length" "?paving_width" "?paving_area" "?surface_type"])

(defn get-roads [src road-meta trap]
"subquery to parse/filter the road data"
(<- [?blurb ?bike_lane ?bus_route ?truck_route ?albedo
?min_lat ?min_lng ?min_alt ?geohash
?traffic_count ?traffic_index ?traffic_class
?paving_length ?paving_width ?paving_area ?surface_type ]
(src ?blurb ?misc ?geo ?kind)
(<- roads-fields
(src ?road_name ?misc ?geo ?kind)
(re-matches #"^\s+Sequence.*Traffic Count.*" ?misc)
(parse-road ?misc :> _
?traffic_count ?traffic_index ?traffic_class
?paving_length ?paving_width ?paving_area ?surface_type
?overlay_year ?bike_lane ?bus_route ?truck_route)
(road_meta ?surface_type ?albedo_new ?albedo_worn)
(parse-road ?misc :>
?traffic_count ?traffic_index ?traffic_class
?paving_length ?paving_width ?paving_area ?surface_type
?overlay_year_str ?bike_lane ?bus_route ?truck_route)
(road-meta ?surface_type ?albedo_new ?albedo_worn)
((c/each read-string) ?overlay_year_str :> ?overlay_year)
(estimate-albedo ?overlay_year ?albedo_new ?albedo_worn :> ?albedo)
(bigram ?geo :> ?pt0 ?pt1)
(midpoint ?pt0 ?pt1 :> ?lat ?lng ?alt)
;; why filter for min? because there are geo duplicates..
(c/min ?lat :> ?min_lat)
(c/min ?lng :> ?min_lng)
(c/min ?alt :> ?min_alt)
(geohash ?min_lat ?min_lng :> ?geohash)
((c/each c/min) ?lat ?lng ?alt :> ?road_lat ?road_lng ?road_alt)
(geohash ?road_lat ?road_lng :> ?geohash)
(:trap (hfs-textline trap))))

(defn get-parks [src trap]
Expand All @@ -126,53 +115,55 @@

(defn tree-distance [tree_lat tree_lng road_lat road_lng]
"calculates distance from a tree to the midpoint of a road segment; TODO IMPROVE GEO MODEL"
(let [y (- (read-string tree_lat) (read-string road_lat))
x (- (read-string tree_lng) (read-string road_lng)) ]
(sqrt (+ (pow y 2.0) (pow x 2.0)))))
(let [y (- tree_lat road_lat)
x (- tree_lng road_lng)]
(Math/sqrt (+ (Math/pow y 2.0) (Math/pow x 2.0)))))

(defn road-metric [traffic_class traffic_count albedo]
"calculates a metric for comparing road segments, approximating a decision tree; TODO USE PMML"
[[(cond
(= traffic_class "local residential") 1.0
(= traffic_class "local business district") 0.5
:else 0.0)
;; scale traffic_count based on distribution mean
(/ (log (/ (read-string traffic_count) 200.0)) 5.0)
(- 1.0 (read-string albedo))]])
;; in practice, we'd probably train a predictive model using decision trees,
[[(condp = traffic_class
"local residential" 1.0
"local business district" 0.5
0.0)
(-> traffic_count (/ 200.0) (Math/log) (/ 5.0)) ;; scale traffic_count based on distribution mean
(- 1.0 albedo)]])
;; in practice, we'd probably train a predictive model using decision trees,
;; regression, etc., plus incorporate customer feedback, QA of the data, etc.


(defn get-shade [trees roads]
"subquery to join the tree and road estimates, to maximize for shade"
(<- [?road_name ?geohash ?road_lat ?road_lng ?road_alt ?road_metric ?tree_metric]
(roads ?road_name _ _ _ ?albedo ?road_lat ?road_lng ?road_alt ?geohash ?traffic_count _ ?traffic_class _ _ _ _)
((select-fields roads ["?road_name" "?albedo" "?road_lat" "?road_lng" "?road_alt" "?geohash" "?traffic_count" "?traffic_class"])
?road_name ?albedo ?road_lat ?road_lng ?road_alt ?geohash ?traffic_count ?traffic_class)
(road-metric ?traffic_class ?traffic_count ?albedo :> ?road_metric)
(trees _ _ _ _ _ _ _ ?avg_height ?tree_lat ?tree_lng ?tree_alt ?geohash)
(read-string ?avg_height :> ?height)
;; limit to trees which are higher than people
(> ?height 2.0)
((select-fields trees ["?avg_height" "?tree_lat" "?tree_lng" "?tree_alt" "?geohash"])
?height ?tree_lat ?tree_lng ?tree_alt ?geohash)
(> ?height 2.0) ;; limit to trees which are higher than people
(tree-distance ?tree_lat ?tree_lng ?road_lat ?road_lng :> ?distance)
;; limit to trees within a one-block radius (not meters)
(<= ?distance 25.0)
(<= ?distance 25.0) ;; limit to trees within a one-block radius (not meters)
(/ ?height ?distance :> ?tree_moment)
(c/sum ?tree_moment :> ?sum_tree_moment)
;; magic number 200000.0 used to scale tree moment, based on median
(/ ?sum_tree_moment 200000.0 :> ?tree_metric)))

(defn date-num [date]
(defn date-num [date] ;; TODO use clj-time
"converts an RFC 3339 timestamp to a monotonically increasing number"
(apply
(fn [yr mon day hr min sec]
(+ (* (+ (* (+ (* (+ (* (+ (* yr 366) mon) 31) day) 24) hr) 60) min) 60) sec))
(map #(Integer/parseInt %) (re-seq #"\d+" date))))
(fn [yr mon day hr min sec]
(-> yr (* 366)
(+ mon) (* 31)
(+ day) (* 24)
(+ hr) (* 60)
(+ min) (* 60)
(+ sec)))
(map #(Integer/parseInt %) (re-seq #"\d+" date))))

(def gps-fields ["?uuid" "?geohash" "?gps_count" "?recent_visit"])

(defn get-gps [gps_logs trap]
"subquery to aggregate and rank GPS tracks per user"
(<- [?uuid ?geohash ?gps_count ?recent_visit]
(gps_logs ?date ?uuid ?gps_lat ?gps_lng ?alt ?speed ?heading ?elapsed ?distance)
(read-string ?gps_lat :> ?lat)
(read-string ?gps_lng :> ?lng)
(<- gps-fields
(gps_logs ?date ?uuid ?lat ?lng ?alt ?speed ?heading ?elapsed ?distance)
(geohash ?lat ?lng :> ?geohash)
(c/count :> ?gps_count)
(date-num ?date :> ?visit)
Expand All @@ -181,29 +172,48 @@
(defn get-reco [tracks shades]
"subquery to recommend road segments based on GPS tracks"
(<- [?uuid ?road ?geohash ?lat ?lng ?alt ?gps_count ?recent_visit ?road_metric ?tree_metric]
(tracks ?uuid ?geohash ?gps_count ?recent_visit)
(tracks :>> gps-fields)
(shades ?road ?geohash ?lat ?lng ?alt ?road_metric ?tree_metric)))

(defn -main
[in meta_tree meta_road logs trap park tree road shade gps reco & args]
(let [gis (hfs-delimited in)
tree_meta (hfs-delimited meta_tree :skip-header? true)
road_meta (hfs-delimited meta_road :skip-header? true)
gps_logs (hfs-delimited logs :delimiter "," :skip-header? true)
src (etl-gis gis (s/join "/" [trap "gis"]))]
(?- (hfs-delimited tree)
(get-trees src (s/join "/" [trap "tree"]) tree_meta))
(?- (hfs-delimited road)
(get-roads src (s/join "/" [trap "road"]) road_meta))
(?- (hfs-delimited park)
(get-parks src (s/join "/" [trap "park"])))
(?- (hfs-delimited shade)
(let [trees (hfs-delimited tree)
roads (hfs-delimited road) ]
(get-shade trees roads)))
(?- (hfs-delimited gps)
(get-gps gps_logs (s/join "/" [trap "logs"])))
(?- (hfs-delimited reco)
(let [tracks (hfs-delimited gps)
shades (hfs-delimited shade) ]
(get-reco tracks shades)))))
(let [gis-stage "out/gis"] ;; should use workflow but local hadoop can't run concurrent tasks
(?- "etl gis data"
(hfs-seqfile gis-stage)
(load-gis in (str trap "/" "gis")))
(?- "parse tree data"
(hfs-delimited tree)
(get-trees (hfs-seqfile gis-stage)
(hfs-delimited meta_tree :skip-header? true
:classes [String String String Integer Integer])
(str trap "/" "tree")))
(?- "parse road data"
(hfs-delimited road)
(get-roads (hfs-seqfile gis-stage)
(hfs-delimited meta_road :skip-header? true
:classes [String Float Float])
(str trap "/" "road")))
(?- "parse parks data"
(hfs-delimited park)
(get-parks (hfs-seqfile gis-stage)
(str trap "/" "park")))
(?- "calculate shades"
(hfs-delimited shade)
(get-shade (name-vars
(hfs-delimited tree ;; save as hfq-seqfile to keep field types to avoid :classes
:classes [String Integer Integer Integer String String String
Double Double Double Double String])
trees-fields)
(name-vars
(hfs-delimited road
:classes [String String String String Double Double Double Double
String Integer String String Integer Integer Integer String])
roads-fields)))
(?- "parse gps data"
(hfs-delimited gps)
(get-gps (hfs-delimited logs :delimiter "," :skip-header? true
:classes [String String Double Double Double Double Double Double Double])
(str trap "/" "logs")))
(?- "recommend road segments"
(hfs-delimited reco)
(get-reco (hfs-delimited gps) (hfs-delimited shade)))))
2 changes: 1 addition & 1 deletion src/scripts/copa.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
library(ggplot2)

# point this to your Hadoop "output" directory
dat_folder <- "~/src/concur/CoPA/output"
dat_folder <- "out" # if pwd is root of project

# examine the "tree" results
d <- read.table(file=paste(dat_folder, "tree/part-00000", sep="/"), sep="\t", quote="", na.strings="NULL", header=TRUE, encoding="UTF8")
Expand Down
12 changes: 12 additions & 0 deletions test/copa/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,15 @@
(:use [copa.core]
[midje sweet cascalog]
[clojure.test]))

(deftest parse-tree-test
(is (= ["-1" "29" "203" "2" "Celtis australis"] (parse-tree " Private: -1 Tree ID: 29 Street_Name: ADDISON AV Situs Number: 203 Tree Site: 2 Species: Celtis australis Source: davey tree Protected: Designated: Heritage: Appraised Value: Hardscape: None Identifier: 40 Active Numeric: 1 Location Feature ID: 13872 Provisional: Install Date: "))))

(deftest geo-tree-test
(is (= ["37.4409634615283" "-122.15648458861" "0.0"] (geo-tree "37.4409634615283,-122.15648458861,0.0 "))))

(deftest point->coord-test
(is (= [-122.138274762396 37.4228142494056 0.0] (point->coord "-122.138274762396,37.4228142494056,0.0"))))

(deftest date-num-test
(is (= 1972376670917 (date-num "2012-09-02T16:35:17Z"))))

0 comments on commit 03d07ac

Please sign in to comment.