-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathTransformations.java
executable file
·159 lines (128 loc) · 6 KB
/
Transformations.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// camel-k: language=java property=file:transformation.properties
// camel-k: dependency=camel:jacksonxml
// camel-k: dependency=camel:http
// camel-k: dependency=camel:gson
// camel-k: dependency=camel:jdbc
// camel-k: dependency=camel:csv
// camel-k: dependency=mvn:org.postgresql:postgresql:jar:42.2.13
// camel-k: dependency=mvn:org.apache.commons:commons-dbcp2:jar:2.7.0
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
public class Transformations extends RouteBuilder {
@Override
public void configure() throws Exception {
// The following processors store relevant info as properties
Processor processCsv = new CSVProcessor();
Processor processXML = new XMLProcessor();
// Preparing properties to build a GeoJSON Feature
Processor processDB = new DBProcessor();
// Just collects all features in a collection for the final GeoJSON
Processor buildGeoJSON = new GeoJSONProcessor();
// Aggregate all messages into one message with the list of bodies
AggregationStrategy aggregationStrategy = new CollectToListStrategy();
// This is the actual route
from("timer:java?period=100000")
// Reference URL for air quality e-Reporting on EEA
// https://www.eea.europa.eu/data-and-maps/data/aqereporting-2
// We start by reading our data.csv file, looping on each row
.to("{{source.csv}}").unmarshal("customCSV").split(body()).streaming()
// we store on exchange properties all the data we are interested in
.process(processCsv)
// on each row, we query an XML API service
.setBody().constant("").setHeader(Exchange.HTTP_METHOD, constant("GET"))
.setHeader(Exchange.HTTP_QUERY, simple("lat=${exchangeProperty.lat}&lon=${exchangeProperty.lon}&format=xml"))
.to("{{openstreetmap.url}}").unmarshal().jacksonXml()
// we store on exchange properties all the data we are interested in
.process(processXML)
// now we query the postgres database for more data
.setBody().simple("SELECT info FROM descriptions WHERE id like '${exchangeProperty.pollutant}'")
.to("jdbc:postgresBean?readSize=1")
// we store on exchange properties all the data we are interested in
.process(processDB)
// we collect all rows into one message
.aggregate(constant(true), aggregationStrategy).completionSize(5).process(buildGeoJSON).marshal()
.json(JsonLibrary.Gson)
.to("log:info?showStreams=true")
// and finally store the result on the postgres database
.setBody(simple("INSERT INTO measurements (geojson) VALUES ('${body}')")).to("jdbc:postgresBean")
// Write some log to know it finishes properly
.log("Information stored");
}
private static final class CollectToListStrategy extends AbstractListAggregationStrategy<Object> {
@Override
public Object getValue(Exchange exchange) {
return exchange.getMessage().getBody();
}
}
private static final class GeoJSONProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> res = new HashMap<String, Object>();
res.put("features", exchange.getMessage().getBody());
res.put("type", "FeatureCollection");
exchange.getIn().setBody(res);
}
}
private static final class DBProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
List<Object> body = exchange.getMessage().getBody(List.class);
Map<String, Object> outputBody = new HashMap<String, Object>();
outputBody.put("unit", exchange.getProperty("unit"));
outputBody.put("level", exchange.getProperty("level"));
outputBody.put("pollutant", exchange.getProperty("pollutant"));
outputBody.put("address", exchange.getProperty("address"));
// If we got any response from the DB, add it
if (body.size() > 0) {
outputBody.put("info", body.get(0).toString());
}
List<String> coordinates = new ArrayList<String>();
coordinates.add(exchange.getProperty("lat", String.class));
coordinates.add(exchange.getProperty("lon", String.class));
Map<String, Object> geometry = new HashMap<String, Object>();
geometry.put("type", "Point");
geometry.put("coordinates", coordinates);
Map<String, Object> res = new HashMap<String, Object>();
res.put("geometry", geometry);
res.put("properties", outputBody);
res.put("type", "Feature");
exchange.getIn().setBody(res);
}
}
private static final class XMLProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, String> body = exchange.getIn().getBody(Map.class);
exchange.setProperty("address", body.get("addressparts"));
}
}
private static final class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, String> body = exchange.getIn().getBody(Map.class);
if (body != null) {
extractValue(exchange, body, "Latitude of station", "lat");
extractValue(exchange, body, "Longitude of station", "lon");
extractValue(exchange, body, "Unit", "unit");
extractValue(exchange, body, "Air pollution level", "level");
extractValue(exchange, body, "Air pollutant", "pollutant");
}
}
private void extractValue(Exchange exchange, Map<String, String> body, String param, String keyName) {
if (body.containsKey(param)) {
exchange.setProperty(keyName, body.get(param));
}
}
}
}