-
Notifications
You must be signed in to change notification settings - Fork 1
/
flask_pyesmongoengine.py
276 lines (228 loc) · 8.08 KB
/
flask_pyesmongoengine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
from __future__ import absolute_import
import pyes
import pyes.aggs
import pyes.rivers
import pyes.exceptions
import copy
def get_index_name(model):
if isinstance(model, str):
return model
if 'elastic_search' not in model._meta:
return None
return model._meta['elastic_search'].get('name', model._get_collection_name().replace('.', '_'))
def get_type(model):
if 'elastic_search' not in model._meta or 'river' not in model._meta['elastic_search']:
return None
return model._get_collection_name().replace('.', '_')
def prepare_index(model):
index_name = get_index_name(model)
if not index_name:
return None
index_type = get_type(model)
index_settings = copy.deepcopy(model._meta['elastic_search'])
if 'river' in index_settings:
if 'exclude_fields' not in index_settings['river'] or 'include_fields' not in index_settings['river']:
index_settings['river']['exclude_fields'] = []
index_settings['river'] = {
'settings': index_settings['river'],
'collection': model._get_collection_name()
}
if model._meta.get('allow_inheritance'):
# Make sure we have _cls in index
if '_cls' in index_settings['river'].get('exclude_fields', []):
index_settings['river']['exclude_fields'].remove('_cls')
elif index_settings['river'].get('include_fields') and '_cls' not in index_settings['river']['include_fields']:
index_settings['river']['include_fields'].append('_cls')
mappings = index_settings.get('mappings')
if mappings and not isinstance(mappings, dict):
raise TypeError("Specified es_mappings in '{}' is not a dict, it is '{}'".format(model.__name__,
type(fields).__name__))
if model._meta.get('allow_inheritance'):
# Set (override) mapping for _cls
index_settings.setdefault('mappings', {}).setdefault('properties', {})['_cls'] = {
'type': 'string',
'index': 'not_analyzed',
}
return index_name, index_type, index_settings
def prepare_indexes():
from mongoengine import Document
models = []
all_models = set([Document])
while all_models:
model = all_models.pop()
if not model._meta.get('abstract') and model._meta.get('elastic_search'):
models.append(model)
for model in model.__subclasses__():
all_models.add(model)
indexes = {}
for model in models:
index_name, index_type, index_settings = prepare_index(model)
if index_name in indexes:
indexes[index_name][index_type] = index_settings
else:
indexes[index_name] = {index_type: index_settings}
return indexes
class ResultProxy(object):
def __init__(self, model_cls, results):
self._model_cls = model_cls
self.es_results = results
def _convert(self, obj):
# There might be a better way...
import json
obj = self._model_cls.from_json(json.dumps(obj))
return self._model_cls._from_son(obj.to_mongo())
def __getitem__(self, i):
if isinstance(i, slice):
return [self._convert(j) for j in self.es_results[i]]
else:
return self._convert(self.es_results[i])
def __len__(self):
return len(self.es_results)
def __iter__(self):
for i in self.es_results:
yield self._convert(i)
def _include_pyes(obj):
for module in (pyes,):
for key in module.__dict__:
if not hasattr(obj, key):
setattr(obj, key, getattr(module, key))
class PyESMongoEngine(object):
def __init__(self, app = None):
_include_pyes(self)
if app is not None:
self.init_app(app)
def init_app(self, app):
kwargs = dict([(k.lower(), v) for k, v in app.config.setdefault('ELASTICSEARCH_SETTINGS', {}).items()])
self._index_settings = kwargs.pop('indices', {})
self._index_prefix = kwargs.pop('prefix', '')
self.conn = self.ES(**kwargs)
# Put analyzers into settings
if app.config.get('ES_ANALYSIS'):
self._index_settings.setdefault(
'default', {}
).setdefault(
'settings', {}
)['analysis'] = app.config['ES_ANALYSIS']
self._mongodb_hosts = []
for host in (i for i in app.config['MONGODB_SETTINGS']['HOST'].replace('mongodb://', '').split(',')):
if ':' in host:
host = host.split(':')
self._mongodb_hosts.append({
'host': host[0],
'port': host[1]
})
else:
self._mongodb_hosts.append({
'host': host,
'port': 27017
})
self._mongodb_db = app.config['MONGODB_SETTINGS']['DB']
self._indexes_data = None
@property
def _indexes(self):
if not self._indexes_data:
self._indexes_data = prepare_indexes()
return self._indexes_data
def _get_river(self, model):
if isinstance(model, tuple):
index, type = model
else:
index = get_index_name(model)
type = get_type(model)
if not index:
return None
if type not in self._indexes[index]:
return None
river_properties = self._indexes[index][type].get('river', None)
if not river_properties:
return None
river = pyes.rivers.MongoDBRiver(
self._mongodb_hosts,
self._mongodb_db,
river_properties['collection'],
self._index_prefix + index,
type,
options = river_properties['settings'].get('options'),
script = river_properties['settings'].get('script')
), self._index_prefix + type
return river
def delete_river(self, model):
river = self._get_river(model)
if not river:
return
river, name = river
try:
# Remove river
self.conn.delete_river(None, name)
except (pyes.exceptions.TypeMissingException,\
pyes.exceptions.IndexMissingException):
pass
def create_river(self, model):
river = self._get_river(model)
if not river:
return
river, name = river
self.conn.create_river(river.serialize(), name)
def delete_index(self, model):
# Need to remove all rivers
index = get_index_name(model)
if not index:
return
for t in self._indexes[index].keys():
self.delete_river((index, t))
# We now remove the index itself
try:
# Remove index
self.conn.indices.delete_index(self._index_prefix + index)
except (pyes.exceptions.TypeMissingException,\
pyes.exceptions.IndexMissingException):
pass
def create_index(self, model):
index = get_index_name(model)
if not index:
return None
# Get index settings
settings = dict(self._index_settings.get('default', {}))
settings.update(self._index_settings.get('collection', {}))
# Put in all the mappings and flow in the rivers
mappings = {}
for t, s in self._indexes[index].items():
if s.get('mappings'):
mappings[t] = s['mappings']
# Put mappings
if mappings:
settings['mappings'] = mappings
# Create index
self.conn.indices.create_index(self._index_prefix + index, settings)
# Create Rivers
for t in self._indexes[index].keys():
self.create_river((index, t))
# Done
def recreate_index(self, model):
# First, delete index and all it's rivers
self.delete_index(model)
# And create it
self.create_index(model)
def create_indexes(self):
# Go through all indexes
for index in self._indexes.keys():
self.create_index(index)
def delete_indexes(self):
# Go through all indexes
for index in self._indexes.keys():
self.delete_index(index)
def recreate_indexes(self):
# Go through all indexes
for index in self._indexes.keys():
self.recreate_index(index)
def search(self, indices, query, *args, **kwargs):
"""Searches in specified model's index.
"""
index_name = get_index_name(indices)
type = get_type(indices)
from mongoengine import Document
if issubclass(indices, Document) and indices._meta.get('allow_inheritance'):
# We have models with inheritance, need to use _cls! Wrapping around a BoolQuery.
query = pyes.BoolQuery(must = [query])
query.add_must(pyes.PrefixQuery('_cls', indices._class_name))
return ResultProxy(indices, self.conn.search(query, *args, doc_types = [type], indices = [self._index_prefix + index_name], **kwargs))