forked from aarreedd/CSV-to-ElasticSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
csv_to_elastic.py
190 lines (164 loc) · 6.89 KB
/
csv_to_elastic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/python3
"""
DESCRIPTION
Simple python script to import a csv into ElasticSearch. It can also update existing Elastic data if
only parameter --id-column is provided
HOW IT WORKS
The script creates an ElasticSearch API PUT request for
each row in your CSV. It is similar to running an bulk insert by:
$ curl -XPUT localhost:9200/_bulk -d '{index: "", type: ""}
{ data }'
In both `json-struct` and `elastic-index` path, the script will
insert your csv data by replacing the column name wrapped in '%'
with the data for the given row. For example, `%id%` will be
replaced with data from the `id` column of your CSV.
NOTES
- CSV must have headers
- insert elastic address (with port) as argument, it defaults to localhost:9200
EXAMPLES
1. CREATE example:
$ python csv_to_elastic.py \
--elastic-address 'localhost:9200' \
--csv-file input.csv \
--elastic-index 'index' \
--datetime-field=dateField \
--json-struct '{
"name" : "%name%",
"major" : "%major%"
}'
CSV:
| name | major |
|--------|------------------|
| Mike | Engineering |
| Erin | Computer Science |
2. CREATE/UPDATE example:
$ python csv_to_elastic.py \
--elastic-address 'localhost:9200' \
--csv-file input.csv \
--elastic-index 'index' \
--datetime-field=dateField \
--json-struct '{
"name" : "%name%",
"major" : "%major%"
}'
--id-column id
CSV:
| id | name | major |
|------|--------|------------------|
| 1 | Mike | Engineering |
| 2 | Erin | Computer Science |
"""
import argparse
import http.client
import os
import csv
import json
import dateutil.parser
def main(file_path, delimiter, max_rows, elastic_index, json_struct, datetime_field, elastic_type, elastic_address, id_column):
endpoint = '/_bulk'
if max_rows is None:
max_rows_disp = "all"
else:
max_rows_disp = max_rows
print("")
print(" ----- CSV to ElasticSearch ----- ")
print("Importing %s rows into `%s` from '%s'" % (max_rows_disp, elastic_index, file_path))
print("")
count = 0
headers = []
headers_position = {}
to_elastic_string = ""
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=delimiter, quotechar='"')
for row in reader:
if count == 0:
for iterator, col in enumerate(row):
headers.append(col)
headers_position[col] = iterator
elif max_rows is not None and count >= max_rows:
print('Max rows imported - exit')
break
elif len(row[0]) == 0: # Empty rows on the end of document
print("Found empty rows at the end of document")
break
else:
pos = 0
if os.name == 'nt':
_data = json_struct.replace("^", '"')
else:
_data = json_struct.replace("'", '"')
_data = _data.replace('\n','').replace('\r','')
for header in headers:
if header == datetime_field:
datetime_type = dateutil.parser.parse(row[pos])
_data = _data.replace('"%' + header + '%"', '"{:%Y-%m-%d %H:%M}"'.format(datetime_type))
else:
try:
int(row[pos])
_data = _data.replace('"%' + header + '%"', row[pos])
except ValueError:
_data = _data.replace('%' + header + '%', row[pos])
pos += 1
# Send the request
if id_column is not None:
index_row = {"index": {"_index": elastic_index,
"_type": elastic_type,
'_id': row[headers_position[id_column]]}}
else:
index_row = {"index": {"_index": elastic_index, "_type": elastic_type}}
json_string = json.dumps(index_row) + "\n" + _data + "\n"
to_elastic_string += json_string
count += 1
print('Reached end of CSV - sending to Elastic')
connection = http.client.HTTPConnection(elastic_address)
headers = {"Content-type": "application/json", "Accept": "text/plain"}
connection.request('POST', url=endpoint, headers = headers, body=to_elastic_string)
response = connection.getresponse()
print("Returned status code:", response.status)
#body = response.read()
#print("Returned body:", body)
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='CSV to ElasticSearch.')
parser.add_argument('--elastic-address',
required=False,
type=str,
default='localhost:9200',
help='Your elasticsearch endpoint address')
parser.add_argument('--csv-file',
required=True,
type=str,
help='path to csv to import')
parser.add_argument('--json-struct',
required=True,
type=str,
help='json to be inserted')
parser.add_argument('--elastic-index',
required=True,
type=str,
help='elastic index you want to put data in')
parser.add_argument('--elastic-type',
required=False,
type=str,
default='test_type',
help='Your entry type for elastic')
parser.add_argument('--max-rows',
type=int,
default=None,
help='max rows to import')
parser.add_argument('--datetime-field',
type=str,
help='datetime field for elastic')
parser.add_argument('--id-column',
type=str,
default=None,
help='If you want to have index and you have it in csv, this the argument to point to it')
parser.add_argument('--delimiter',
type=str,
default=";",
help='If you want to have a different delimiter than ;')
parsed_args = parser.parse_args()
main(file_path=parsed_args.csv_file, delimiter = parsed_args.delimiter, json_struct=parsed_args.json_struct,
elastic_index=parsed_args.elastic_index, elastic_type=parsed_args.elastic_type,
datetime_field=parsed_args.datetime_field, max_rows=parsed_args.max_rows,
elastic_address=parsed_args.elastic_address, id_column=parsed_args.id_column)