Skip to content

Commit

Permalink
calculating shade metrics vs. geo
Browse files Browse the repository at this point in the history
  • Loading branch information
ceteri committed Sep 6, 2012
1 parent 41f6f5d commit a296a58
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 13 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
build/
output/
dot/
.gradle/

*~
*.class
.DS_Store

# Package Files #
*.jar
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ We will also draw some introductory material from these two previous talks:
* ["Intro to Data Science"](http://www.slideshare.net/pacoid/intro-to-data-science-for-enterprise-big-data)
* ["Cascading for the Impatient"](http://www.slideshare.net/pacoid/cascading-for-the-impatient)

For more details, please read the accompanying [wiki page](https://github.com/ceteri/CoPA/wiki).
For more details, please read the accompanying [wiki page](https://github.com/Cascading/CoPA/wiki).


Build Instructions
Expand All @@ -25,14 +25,14 @@ To build the sample app from the command line use:
Before running this sample app, be sure to set your `HADOOP_HOME` environment variable. Then clear the `output` directory, then to run on a desktop/laptop with Apache Hadoop in standalone mode:

rm -rf output
hadoop jar ./build/libs/copa.jar data/copa.csv data/meta_tree.tsv data/meta_road.tsv output/trap output/tsv output/tree output/road output/park
hadoop jar ./build/libs/copa.jar data/copa.csv data/meta_tree.tsv data/meta_road.tsv output/trap output/tsv output/tree output/road output/park output/shade

To view the results, for example the cleaned-up trees data:

ls output
more output/tree/part-00000
more output/shade/part-00000

An example of log captured from a successful build+run is at https://gist.github.com/3020297
An example of log captured from a successful build+run is at https://gist.github.com/3660888

About Cascading
===============
Expand Down
55 changes: 55 additions & 0 deletions src/main/java/copa/AlbedoFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2007-2012 Concurrent, Inc. All Rights Reserved.
*
* Project and contact information: http://www.cascading.org/
*
* This file is part of the Cascading project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package copa;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;


public class AlbedoFunction extends BaseOperation implements Function
{
protected Integer year_new = 0;

public AlbedoFunction( Fields fieldDeclaration, Integer year_new )
{
super( 1, fieldDeclaration );
this.year_new = year_new;
}

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
TupleEntry argument = functionCall.getArguments();
Integer year_construct = argument.getInteger( 0 );
Double albedo_new = argument.getDouble( 1 );
Double albedo_worn = argument.getDouble( 2 );

Double albedo = ( year_construct >= year_new ) ? albedo_new : albedo_worn;

Tuple result = new Tuple();
result.add( albedo );
functionCall.getOutputCollector().add( result );
}
}
54 changes: 54 additions & 0 deletions src/main/java/copa/GeoHashFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2007-2012 Concurrent, Inc. All Rights Reserved.
*
* Project and contact information: http://www.cascading.org/
*
* This file is part of the Cascading project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package copa;

import org.apache.lucene.spatial.geohash.GeoHashUtils;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;


public class GeoHashFunction extends BaseOperation implements Function
{
public GeoHashFunction( Fields fieldDeclaration )
{
super( 1, fieldDeclaration );
}

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
TupleEntry argument = functionCall.getArguments();
Double lat = argument.getDouble( 0 );
Double lng = argument.getDouble( 1 );

GeoHashUtils ghu = new GeoHashUtils();
String geohash = ghu.encode( lat, lng );

Tuple result = new Tuple();
result.add( geohash.substring( 0, 5 ) );
functionCall.getOutputCollector().add( result );
}
}
70 changes: 61 additions & 9 deletions src/main/java/copa/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.expression.ExpressionFilter;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexParser;
import cascading.pipe.CoGroup;
import cascading.pipe.Checkpoint;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.pipe.joiner.InnerJoin;
import cascading.property.AppProps;
Expand All @@ -57,6 +60,7 @@
String treePath = args[ 5 ];
String roadPath = args[ 6 ];
String parkPath = args[ 7 ];
String shadePath = args[ 8 ];

Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
Expand All @@ -71,6 +75,7 @@
Tap treeTap = new Hfs( new TextDelimited( true, "\t" ), treePath );
Tap roadTap = new Hfs( new TextDelimited( true, "\t" ), roadPath );
Tap parkTap = new Hfs( new TextDelimited( true, "\t" ), parkPath );
Tap shadeTap = new Hfs( new TextDelimited( true, "\t" ), shadePath );

// specify a regex to split the GIS dump into known fields
Fields fieldDeclaration = new Fields( "blurb", "misc", "geo", "kind" );
Expand All @@ -80,6 +85,11 @@
Pipe gisPipe = new Each( new Pipe( "gis" ), new Fields( "line" ), parser );
Checkpoint tsvCheck = new Checkpoint( "tsv", gisPipe );

// parse the "park" output
Pipe parkPipe = new Pipe( "park", tsvCheck );
regex = "^\\s+Community Type\\:\\s+Park.*$";
parkPipe = new Each( parkPipe, new Fields( "misc" ), new RegexFilter( regex ) );

// parse the "tree" output
Pipe treePipe = new Pipe( "tree", tsvCheck );
regex = "^\\s+Private\\:\\s+(\\S+)\\s+Tree ID\\:\\s+(\\d+)\\s+.*Situs Number\\:\\s+(\\d+)\\s+Tree Site\\:\\s+(\\d+)\\s+Species\\:\\s+(\\S.*\\S)\\s+Source.*$";
Expand All @@ -99,10 +109,21 @@
ExpressionFunction exprFunc = new ExpressionFunction( new Fields( "tree_species" ), expression, String.class );
treePipe = new Each( treePipe, new Fields( "scrub_species" ), exprFunc, Fields.ALL );

// join with tree metadata
Pipe metaTreePipe = new Pipe( "meta_tree" );
treePipe = new HashJoin( treePipe, new Fields( "tree_species" ), metaTreePipe, new Fields( "species" ), new InnerJoin() );
treePipe = new Rename( treePipe, new Fields( "blurb" ), new Fields( "tree_name" ) );

Fields fieldSelector = new Fields( "blurb", "geo", "priv", "tree_id", "situs", "tree_site", "species", "wikipedia", "calflora", "min_height", "max_height" );
regex = "^(\\S+),(\\S+),(\\S+)\\s*$";
int[] gpsGroups = { 1, 2, 3 };
parser = new RegexParser( new Fields( "lat", "lng", "alt" ), regex, gpsGroups );
treePipe = new Each( treePipe, new Fields( "geo" ), parser, Fields.ALL );

// determine a tree geohash
Fields geohashArguments = new Fields( "lat", "lng" );
treePipe = new Each( treePipe, geohashArguments, new GeoHashFunction( new Fields( "tree_geohash" ) ), Fields.ALL );

Fields fieldSelector = new Fields( "tree_name", "priv", "tree_id", "situs", "tree_site", "species", "wikipedia", "calflora", "min_height", "max_height", "lat", "lng", "alt", "tree_geohash" );
treePipe = new Retain( treePipe, fieldSelector );

// parse the "road" output
Expand All @@ -114,28 +135,59 @@
parser = new RegexParser( roadFields, regex, roadGroups );
roadPipe = new Each( roadPipe, new Fields( "misc" ), parser, Fields.ALL );

// join with road metadata
Pipe metaRoadPipe = new Pipe( "meta_road" );
roadPipe = new HashJoin( roadPipe, new Fields( "surface_type" ), metaRoadPipe, new Fields( "pavement_type" ), new InnerJoin() );
roadPipe = new Rename( roadPipe, new Fields( "blurb" ), new Fields( "road_name" ) );

// estimate albedo based on the road surface age and pavement type
Fields albedoArguments = new Fields( "year_construct", "albedo_new", "albedo_worn" );
roadPipe = new Each( roadPipe, albedoArguments, new AlbedoFunction( new Fields( "albedo" ), 2002 ), Fields.ALL );

// generate road segments, with midpoint, y=mx+b, and road_geohash for each
Fields segmentArguments = new Fields( "geo" );
Fields segmentResults = new Fields( "lat0", "lng0", "alt0", "lat1", "lng1", "alt1", "lat_mid", "lng_mid", "slope", "intercept" );
roadPipe = new Each( roadPipe, segmentArguments, new RoadSegmentFunction( segmentResults ), Fields.ALL );

fieldSelector = new Fields( "blurb", "geo", "year_construct", "traffic_count", "traffic_index", "traffic_class", "paving_length", "paving_width", "paving_area", "surface_type", "bike_lane", "bus_route", "truck_route", "albedo_new", "albedo_worn" );
geohashArguments = new Fields( "lat_mid", "lng_mid" );
roadPipe = new Each( roadPipe, geohashArguments, new GeoHashFunction( new Fields( "road_geohash" ) ), Fields.ALL );

fieldSelector = new Fields( "road_name", "year_construct", "traffic_count", "traffic_index", "traffic_class", "paving_length", "paving_width", "paving_area", "surface_type", "bike_lane", "bus_route", "truck_route", "albedo", "lat0", "lng0", "alt0", "lat1", "lng1", "alt1", "lat_mid", "lng_mid", "slope", "intercept", "road_geohash" );
roadPipe = new Retain( roadPipe, fieldSelector );

// parse the "park" output
Pipe parkPipe = new Pipe( "park", tsvCheck );
regex = "^\\s+Community Type\\:\\s+Park.*$";
parkPipe = new Each( parkPipe, new Fields( "misc" ), new RegexFilter( regex ) );
// join the tree and road pipes to estimate shade
Pipe shadePipe = new Pipe( "shade", roadPipe );
shadePipe = new CoGroup( shadePipe, new Fields( "road_geohash" ), treePipe, new Fields( "tree_geohash" ), new InnerJoin() );

// calculate a rough estimate for distance from tree to road, then filter for "< ~1 block"
Fields treeDistArguments = new Fields( "lat", "lng", "slope", "intercept" );
Fields tree_dist = new Fields( "tree_dist" );
shadePipe = new Each( shadePipe, treeDistArguments, new TreeDistanceFunction( tree_dist ), Fields.ALL );

ExpressionFilter distFilter = new ExpressionFilter( "tree_dist > 0.001", Double.class );
shadePipe = new Each( shadePipe, tree_dist, distFilter );

// expand the geohash to join with a wider radius of logged GPS tracks
expression = "tree_geohash.substring(0, 4)";
exprFunc = new ExpressionFunction( new Fields( "shade_geohash" ), expression, String.class );
shadePipe = new Each( shadePipe, new Fields( "tree_geohash" ), exprFunc, Fields.ALL );

fieldSelector = new Fields( "road_name", "year_construct", "traffic_count", "traffic_index", "traffic_class", "paving_length", "paving_width", "paving_area", "surface_type", "bike_lane", "bus_route", "truck_route", "albedo", "lat0", "lng0", "alt0", "lat1", "lng1", "alt1", "slope", "intercept", "tree_name", "priv", "tree_id", "situs", "tree_site", "species", "wikipedia", "calflora", "min_height", "max_height", "lat", "lng", "alt", "tree_dist", "shade_geohash" );
shadePipe = new Retain( shadePipe, fieldSelector );

// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "copa" )
.addSource( gisPipe, gisTap )
.addTrap( gisPipe, trapTap )
.addCheckpoint( tsvCheck, tsvTap )
.addTailSink( parkPipe, parkTap )
.addSource( metaTreePipe, metaTreeTap )
.addSource( metaRoadPipe, metaRoadTap )
.addTailSink( treePipe, treeTap )
.addTailSink( roadPipe, roadTap )
.addTailSink( parkPipe, parkTap );
.addSink( treePipe, treeTap )
.addSink( roadPipe, roadTap )
.addTailSink( shadePipe, shadeTap )
;

// write a DOT file and run the flow
Flow copaFlow = flowConnector.connect( flowDef );
Expand Down
81 changes: 81 additions & 0 deletions src/main/java/copa/RoadSegmentFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2007-2012 Concurrent, Inc. All Rights Reserved.
*
* Project and contact information: http://www.cascading.org/
*
* This file is part of the Cascading project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package copa;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;


public class RoadSegmentFunction extends BaseOperation implements Function
{
public RoadSegmentFunction( Fields fieldDeclaration )
{
super( 1, fieldDeclaration );
}

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
TupleEntry argument = functionCall.getArguments();
String[] geo_list = argument.getString( 0 ).split( "\\s" );

for( int i = 0; i < ( geo_list.length - 1 ); i++ )
{
String[] p0 = geo_list[i].split( "," );
Double lng0 = new Double( p0[0] );
Double lat0 = new Double( p0[1] );
Double alt0 = new Double( p0[2] );

String[] p1 = geo_list[i + 1].split( "," );
Double lng1 = new Double( p1[0] );
Double lat1 = new Double( p1[1] );
Double alt1 = new Double( p1[2] );

Double lat_mid = ( lat0 + lat1 ) / 2.0;
Double lng_mid = ( lng0 + lng1 ) / 2.0;

Double slope = ( lat1 - lat0 ) / ( lng1 - lng0 );
Double intercept = lat0 - ( slope * lng0 );

Tuple result = new Tuple();

result.add( lat0 );
result.add( lng0 );
result.add( alt0 );

result.add( lat1 );
result.add( lng1 );
result.add( alt1 );

result.add( lat_mid );
result.add( lng_mid );

result.add( slope );
result.add( intercept );

functionCall.getOutputCollector().add( result );
}
}
}
Loading

0 comments on commit a296a58

Please sign in to comment.