forked from siscale/covid-19-elk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogstash-github-covid-19-daily-reports-template.conf
304 lines (273 loc) · 15.6 KB
/
logstash-github-covid-19-daily-reports-template.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
input {
#Use the generator input to update the dataset when the pipeline is first started
generator {
lines => [ "first-time-run" ]
count => 1
tags => "check_github"
}
#The http_poller input plugin is used to schedule checks for dataset updates
#The "schedule" setting is defined to check for updates every new hour (at minute 0)
http_poller {
urls => {
check_github => "https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports"
}
tags => "check_github"
request_timeout => 60
schedule => { "cron" => "0 * * * * UTC" }
codec => "plain"
metadata_target => "http_poller_metadata"
}
}
filter {
#The pipeline will treat two types of events:
#The first type is the initial event that triggers the downloading, parsing, and transforming of the CSVs into time series data
#The second type is the time series data itself
#This 'if' discriminates between the two types. The time series data is treated later
if "check_github" in [tags] {
ruby {
init => '
require "csv"
require "open-uri"
require "digest"
require "json"
#Path to store the MD5 hashes of the downloaded CSV files
#This is used to keep track of any changes to the Github repo and optimize the update process
@github_stored_hashes_path = "/etc/logstash/covid-19-hashes.json"
#Base URL for downloading the time series CSVs
@github_daily_reports_base_url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/"
@states = {"AL"=>"Alabama","AK"=>"Alaska","AS"=>"American Samoa","AZ"=>"Arizona","AR"=>"Arkansas","CA"=>"California","CO"=>"Colorado","CT"=>"Connecticut","DE"=>"Delaware","DC"=>"District of Columbia","FM"=>"Federated States of Micronesia","FL"=>"Florida","GA"=>"Georgia","GU"=>"Guam","HI"=>"Hawaii","ID"=>"Idaho","IL"=>"Illinois","IN"=>"Indiana","IA"=>"Iowa","KS"=>"Kansas","KY"=>"Kentucky","LA"=>"Louisiana","ME"=>"Maine","MH"=>"Marshall Islands","MD"=>"Maryland","MA"=>"Massachusetts","MI"=>"Michigan","MN"=>"Minnesota","MS"=>"Mississippi","MO"=>"Missouri","MT"=>"Montana","NE"=>"Nebraska","NV"=>"Nevada","NH"=>"New Hampshire","NJ"=>"New Jersey","NM"=>"New Mexico","NY"=>"New York","NC"=>"North Carolina","ND"=>"North Dakota","MP"=>"Northern Mariana Islands","OH"=>"Ohio","OK"=>"Oklahoma","OR"=>"Oregon","PW"=>"Palau","PA"=>"Pennsylvania","PR"=>"Puerto Rico","RI"=>"Rhode Island","SC"=>"South Carolina","SD"=>"South Dakota","TN"=>"Tennessee","TX"=>"Texas","UT"=>"Utah","VT"=>"Vermont","VI"=>"Virgin Islands","VA"=>"Virginia","WA"=>"Washington","WV"=>"West Virginia","WI"=>"Wisconsin","WY"=>"Wyoming"}
#Input: a CSV and the date of the CSV
#Output: an array of Logstash events extracted from the CSV that will be sent to Elasticsearch
def create_events(csv, current_date)
events_array = Array.new
#For each event..
csv.each do |row|
#..create a new Logstash event that will store the data
new_event = LogStash::Event.new
#..add current row data to the current event
row.each do |k,v|
#new_event[k] = v
new_event.set(k, v&.strip)
end
#..store the date of the reporting
new_event.set("timestamp", current_date.iso8601(3).to_s)
#..store the date when Logstash processed the event
new_event.set("last_process", Time.now.iso8601(3).to_s)
#..some early events contain empty fields when the reported numbers are 0. Fix this here
new_event.set("confirmed", 0) if new_event.get("confirmed").nil?
new_event.set("recovered", 0) if new_event.get("recovered").nil?
new_event.set("deaths", 0) if new_event.get("deaths").nil?
#..calculate the number of active cases, if the field is not already populated
active = new_event.get("confirmed").to_i - new_event.get("recovered").to_i - new_event.get("deaths").to_i
new_event.set("active", active) if active >= 0
#..calculate ratios and store them
ratio_deaths_to_confirmed = new_event.get("deaths").to_f / new_event.get("confirmed").to_f
ratio_recovered_to_confirmed = new_event.get("recovered").to_f / new_event.get("confirmed").to_f
ratio_deaths_to_recovered = new_event.get("deaths").to_f / new_event.get("recovered").to_f
new_event.set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
new_event.set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
new_event.set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
if new_event.get("province_state")&.match?(/(.*?),\s(\w{2})/)
matched = new_event.get("province_state").match(/(.*?),\s(\w{2})/)
@states[matched[2]].nil? ? new_event.set("province_state", matched[2]) : new_event.set("province_state", @states[matched[2]])
new_event.set("admin2", matched[1].gsub(/ [Cc]ounty/, ""))
new_event.tag("added_county")
end
#..generate a unique key for the current event
key = ((new_event.get("admin2")||"").downcase + "_" + (new_event.get("province_state")||"").downcase + "_" + (new_event.get("country_region")||"").downcase + "_" + (current_date.strftime("%m-%d-%Y")||"").to_s).gsub(/[^\w\d]/, "_")
new_event.set("unique_id", key)
#..add a tag to identify the event later
new_event.tag("covid_time_series")
events_array.append(new_event)
end
return events_array
end
'
code => '
#First day in the dataset
github_daily_reports_first_day = Time.gm(2020, 01, 22, 12, 00)
#Current day
github_daily_reports_last_day = Time.now
current_date = github_daily_reports_first_day
#If the CSV hashes are stored in a file, parse them..
if File.file?(@github_stored_hashes_path)
stored_hashes = JSON.parse(File.read(@github_stored_hashes_path))
#..else, create a new hash
else
stored_hashes = Hash.new
end
#For each day since the beggining of the dataset, until the current day..
while current_date <= github_daily_reports_last_day
begin
#..generate the URL based on the current_day
current_url = @github_daily_reports_base_url + current_date.strftime("%m-%d-%Y") + ".csv"
#..parse the CSV while normalizing the headers
#(e.g. Until 21.03.2020, the country is stored under "Country/Region". After this date, it is "Country_Region"
#We normalize this such that both variations are ultimately stored under "country_region". This applies to other fields as well)
current_csv = CSV.parse(open(current_url).read.gsub("\uFEFF", ""), headers: true, header_converters: lambda { |h| h.downcase.gsub(/[^\w]/, "_") } )
#..generate the CSV hash
current_md5 = Digest::SHA256.hexdigest(current_csv.to_s)
rescue
#The pipeline will usually fail to fetch data for the current day, if it was not already published
logger.warn? and logger.warn("Failed to fetch CSV for date " + current_date.strftime("%m-%d-%Y"))
else
#..if the hash for the current CSV is different than the stored one, process the events
if stored_hashes[current_url] != current_md5
events_array = create_events(current_csv, current_date)
#..store the new hash
stored_hashes[current_url] = current_md5
#..and send each event back through the pipeline
events_array.each do |event|
new_event_block.call(event)
end
end
end
#Check the next day
current_date += 86400
end
#Attempt to write the new CSV hashes to the disk
begin
File.write(@github_stored_hashes_path,stored_hashes.to_json)
rescue
logger.error? and logger.error("Failed to write hashes to file " + @github_stored_hashes_path + ". Does Logstash have read/write access to this location?")
ensure
#..and cancel this event. We are only interested in the time series data, which was already sent back through the pipeline.
event.cancel
end
'
}#end ruby
}#end if
#Each time series data event will be sent back through the pipeline.
#This 'if' discriminates between the original event that triggered the downloading and processing of the CSV, and the time series data
if "covid_time_series" in [tags] {
#Parse date fields
date {
#When the github dataset was last updated by the maintainers
match => [ "last_update", "yyyy-mm-dd HH:mm:ss", "m/d/yy HH:mm", "m/d/yyyy HH:mm", "ISO8601" ]
target => "last_update"
}#end date
date {
#When the event was processed by Logstash
match => ["last_process", "ISO8601" ]
target => "last_process"
}#end date
date {
#The true date of the report
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}#end date
#Fix inconsistencies in the data set
#Names from the latest versions of the dataset have been used
if [country_region] == "Saint Martin" or [country_region] == "St. Martin" {
mutate {
replace => {
"country_region" => "France"
"province_state" => "St Martin"
}
}
}
if [country_region] == "Channel Islands" or [country_region] == "Cayman Islands" {
mutate {
replace => {
"province_state" => "%{country_region}"
"country_region" => "United Kingdom"
}
}
}
mutate {
gsub => [
"country_region", "Mainland China", "China",
"country_region", "South Korea", "Korea, South",
"country_region", "Republic of Korea", "Korea, South",
"country_region", "Iran \(Islamic Republic of\)", "Iran",
"country_region", "Bahamas, The", "Bahamas",
"country_region", "Gambia, The", "Gambia",
"country_region", "Hong Kong SAR", "China",
"country_region", "Hong Kong", "China",
"country_region", "Macao SAR", "China",
"country_region", "Macau", "China",
"country_region", "Republic of Ireland", "Ireland",
"country_region", "Republic of Moldova", "Moldova",
"country_region", "Republic of the Congo", "Congo (Brazzaville)",
"country_region", "Russian Federation", "Russia",
"country_region", "Taipei and environs", "Taiwan",
"country_region", "Taiwan\*", "Taiwan",
"country_region", "UK", "United Kingdom",
"country_region", "Vatican City", "Holy See",
"country_region", "Viet Nam", "Vietnam",
"country_region", "occupied Palestinian territory", "West Bank and Gaza",
"country_region", "Ivory Coast", "Cote d'Ivoire",
"country_region", "East Timor", "Timor-Leste",
"country_region", "Czech Republic", "Czechia",
"country_region", "The Bahamas", "Bahamas",
"country_region", "The Gambia", "Gambia",
"country_region", "Cape Verde", "Cabo Verde",
"country_region", "North Ireland", "Ireland",
"country_region", "Palestine", "West Bank and Gaza"
]
#Normalize the location field
rename => {
"lat" => "[location][lat]"
"latitude" => "[location][lat]"
"lon" => "[location][lon]"
"long" => "[location][lon]"
"long_" => "[location][lon]"
"longitude" => "[location][lon]"
}
}
#Some events don't contain geo information. Send these to null island to avoid Elasticsearch indexing failures
if ![location][lat] {
mutate {
update => { "[location][lat]" => 0 }
}
}
if ![location][lon] {
mutate {
update => { "[location][lon]" => 0 }
}
}
#Rename the fields if you want to correlate with other datasets
mutate {
rename => {
"province_state" => "province_state"
"country_region" => "country_region"
"admin2" => "county"
"fips" => "fips"
"confirmed" => "confirmed"
"deaths" => "deaths"
"recovered" => "recovered"
"active" => "active"
"[location][lat]" => "[location][lat]"
"[location][lon]" => "[location][lon]"
"last_update" => "last_update"
"last_process" => "last_process"
"combined_key" => "combined_key"
"ratio_deaths_to_confirmed" => "ratio_deaths_to_confirmed"
"ratio_recovered_to_confirmed" => "ratio_recovered_to_confirmed"
"ratio_deaths_to_recovered" => "ratio_deaths_to_recovered"
"unique_id" => "unique_id"
}
}
}#end if
}#end filter
output {
#Send the data to Elasticsearch
elasticsearch {
#Add your Elasticsearch hosts
hosts => ["<ES_HOST>"]
#Add the index name
index => "covid-19-live-update"
#Add Elasticsearch credentials
user => "<ES_USER>"
password => "<ES_PASSWORD>"
#Add SSL details
#ssl => true
#cacert => "/path/to/cert/"
#Update documents based on the unique id that we defined
action => "update"
document_id => "%{unique_id}"
#..or create a new document if it doesn't already exist
doc_as_upsert => true
manage_template => false
}
}