forked from kamens/gae_bingo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.py
456 lines (332 loc) · 17.7 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
import hashlib
from google.appengine.ext import db
from google.appengine.ext import deferred
from google.appengine.api import memcache
from google.appengine.datastore import entity_pb
from google.appengine.ext.webapp import RequestHandler
from .models import _GAEBingoExperiment, _GAEBingoAlternative, _GAEBingoIdentityRecord, _GAEBingoSnapshotLog
from identity import identity
import request_cache
from config import QUEUE_NAME
import pickle_util
# gae_bingo relies on the deferred library,
# and as such it is susceptible to the same path manipulation weaknesses explained here:
# http://stackoverflow.com/questions/2502215/permanenttaskfailure-in-appengine-deferred-library
#
# ...if you need to run one-time configuration or path manipulation code when an instance
# is started, you may need to add that code to this file as this file will become
# a possibly instance-starting entry point. See docs and above Stack Oveflow question.
#
# Example: import config_django
def init_request_cache_from_memcache():
if not request_cache.cache.get("loaded_from_memcache"):
request_cache.cache.update(memcache.get_multi([BingoCache.MEMCACHE_KEY, BingoIdentityCache.key_for_identity(identity())]))
request_cache.cache["loaded_from_memcache"] = True
class BingoCache(object):
MEMCACHE_KEY = "_gae_bingo_cache"
@staticmethod
def get():
init_request_cache_from_memcache()
if not request_cache.cache.get(BingoCache.MEMCACHE_KEY):
request_cache.cache[BingoCache.MEMCACHE_KEY] = BingoCache.load_from_datastore()
return request_cache.cache[BingoCache.MEMCACHE_KEY]
def __init__(self):
self.dirty = False
self.storage_disabled = False # True if loading archives that shouldn't be cached
self.experiments = {} # Protobuf version of experiments for extremely fast (de)serialization
self.experiment_models = {} # Deserialized experiment models
self.alternatives = {} # Protobuf version of alternatives for extremely fast (de)serialization
self.alternative_models = {} # Deserialized alternative models
self.experiment_names_by_conversion_name = {} # Mapping of conversion names to experiment names
self.experiment_names_by_canonical_name = {} # Mapping of canonical names to experiment names
def store_if_dirty(self):
# Only write to memcache if a change has been made
if getattr(self, "storage_disabled", False) or not self.dirty:
return
# Wipe out deserialized models before serialization for speed
self.experiment_models = {}
self.alternative_models = {}
# No longer dirty
self.dirty = False
memcache.set(BingoCache.MEMCACHE_KEY, self)
def persist_to_datastore(self):
""" Persist current state of experiment and alternative models to
datastore. Their sums might be slightly out-of-date during any
given persist, but not by much.
"""
experiments_to_put = []
for experiment_name in self.experiments:
experiment_model = self.get_experiment(experiment_name)
if experiment_model:
experiments_to_put.append(experiment_model)
alternatives_to_put = []
for experiment_name in self.alternatives:
alternative_models = self.get_alternatives(experiment_name)
for alternative_model in alternative_models:
# When persisting to datastore, we want to store the most recent value we've got
alternative_model.load_latest_counts()
alternatives_to_put.append(alternative_model)
self.update_alternative(alternative_model)
# When periodically persisting to datastore, first make sure memcache
# has relatively up-to-date participant/conversion counts for each
# alternative.
self.dirty = True
self.store_if_dirty()
# Once memcache is done, put both experiments and alternatives.
async_experiments = db.put_async(experiments_to_put)
async_alternatives = db.put_async(alternatives_to_put)
async_experiments.get_result()
async_alternatives.get_result()
def log_cache_snapshot(self):
# Log current data on live experiments to the datastore
log_entries = []
for experiment_name in self.experiments:
experiment_model = self.get_experiment(experiment_name)
if experiment_model and experiment_model.live:
log_entries += self.log_experiment_snapshot(experiment_model)
db.put(log_entries)
def log_experiment_snapshot(self, experiment_model):
log_entries = []
alternative_models = self.get_alternatives(experiment_model.name)
for alternative_model in alternative_models:
# When logging, we want to store the most recent value we've got
log_entry = _GAEBingoSnapshotLog(parent=experiment_model, alternative_number=alternative_model.number, conversions=alternative_model.latest_conversions_count(), participants=alternative_model.latest_participants_count())
log_entries.append(log_entry)
return log_entries
@staticmethod
def load_from_datastore(archives=False):
"""Load BingoCache from the datastore, using archives if specified."""
# This shouldn't happen often (should only happen when memcache has
# been completely evicted), but we still want to be as fast as
# possible.
bingo_cache = BingoCache()
if archives:
# Disable cache writes if loading from archives
bingo_cache.storage_disabled = True
experiment_dict = {}
alternatives_dict = {}
# Kick both of these off w/ run() so they'll prefetch asynchronously
experiments = _GAEBingoExperiment.all().filter(
"archived =", archives).run(batch_size=400)
alternatives = _GAEBingoAlternative.all().filter(
"archived =", archives).run(batch_size=400)
for experiment in experiments:
experiment_dict[experiment.name] = experiment
alternatives = sorted(list(alternatives), key=lambda alt: alt.number)
for alternative in alternatives:
if alternative.experiment_name not in alternatives_dict:
alternatives_dict[alternative.experiment_name] = []
alternatives_dict[alternative.experiment_name].append(alternative)
for experiment_name in experiment_dict:
ex, alts = (experiment_dict.get(experiment_name),
alternatives_dict.get(experiment_name))
if ex and alts:
bingo_cache.add_experiment(ex, alts)
# Immediately store in memcache as soon as possible after loading from
# datastore to minimize # of datastore loads
bingo_cache.store_if_dirty()
return bingo_cache
def add_experiment(self, experiment, alternatives):
if not experiment or not alternatives:
raise Exception("Cannot add empty experiment or empty alternatives to BingoCache")
self.experiment_models[experiment.name] = experiment
self.experiments[experiment.name] = db.model_to_protobuf(experiment).Encode()
if not experiment.conversion_name in self.experiment_names_by_conversion_name:
self.experiment_names_by_conversion_name[experiment.conversion_name] = []
self.experiment_names_by_conversion_name[experiment.conversion_name].append(experiment.name)
if not experiment.canonical_name in self.experiment_names_by_canonical_name:
self.experiment_names_by_canonical_name[experiment.canonical_name] = []
self.experiment_names_by_canonical_name[experiment.canonical_name].append(experiment.name)
for alternative in alternatives:
self.update_alternative(alternative)
self.dirty = True
def update_experiment(self, experiment):
self.experiment_models[experiment.name] = experiment
self.experiments[experiment.name] = db.model_to_protobuf(experiment).Encode()
self.dirty = True
def update_alternative(self, alternative):
if not alternative.experiment_name in self.alternatives:
self.alternatives[alternative.experiment_name] = {}
self.alternatives[alternative.experiment_name][alternative.number] = db.model_to_protobuf(alternative).Encode()
# Clear out alternative models cache so they'll be re-grabbed w/ next .get_alternatives
if alternative.experiment_name in self.alternative_models:
del self.alternative_models[alternative.experiment_name]
self.dirty = True
def remove_from_cache(self, experiment):
# Remove from current cache
if experiment.name in self.experiments:
del self.experiments[experiment.name]
if experiment.name in self.experiment_models:
del self.experiment_models[experiment.name]
if experiment.name in self.alternatives:
del self.alternatives[experiment.name]
if experiment.name in self.alternative_models:
del self.alternative_models[experiment.name]
if experiment.conversion_name in self.experiment_names_by_conversion_name:
self.experiment_names_by_conversion_name[experiment.conversion_name].remove(experiment.name)
if experiment.canonical_name in self.experiment_names_by_canonical_name:
self.experiment_names_by_canonical_name[experiment.canonical_name].remove(experiment.name)
self.dirty = True
# Immediately store in memcache as soon as possible after deleting from datastore
self.store_if_dirty()
@db.transactional(xg=True)
def delete_experiment_and_alternatives(self, experiment):
"""Permanently delete specified experiment and all alternatives."""
if not experiment:
return
# First delete from datastore
experiment.delete()
for alternative in self.get_alternatives(experiment.name):
alternative.reset_counts()
alternative.delete()
self.remove_from_cache(experiment)
@db.transactional(xg=True)
def archive_experiment_and_alternatives(self, experiment):
"""Permanently archive specified experiment and all alternatives.
Archiving an experiment maintains its visibility for historical
purposes, but it will no longer be loaded into the cached list of
active experiments.
Args:
experiment: experiment entity to be archived.
"""
if not experiment:
return
experiment.archived = True
experiment.live = False
experiment.put()
alts = self.get_alternatives(experiment.name)
for alternative in alts:
alternative.archived = True
alternative.live = False
db.put(alts)
self.remove_from_cache(experiment)
def experiments_and_alternatives_from_canonical_name(self, canonical_name):
experiment_names = self.get_experiment_names_by_canonical_name(canonical_name)
return [self.get_experiment(experiment_name) for experiment_name in experiment_names], \
[self.get_alternatives(experiment_name) for experiment_name in experiment_names]
def get_experiment(self, experiment_name):
if experiment_name not in self.experiment_models:
if experiment_name in self.experiments:
self.experiment_models[experiment_name] = db.model_from_protobuf(entity_pb.EntityProto(self.experiments[experiment_name]))
return self.experiment_models.get(experiment_name)
def get_alternatives(self, experiment_name):
if experiment_name not in self.alternative_models:
if experiment_name in self.alternatives:
self.alternative_models[experiment_name] = []
for alternative_number in self.alternatives[experiment_name]:
self.alternative_models[experiment_name].append(db.model_from_protobuf(entity_pb.EntityProto(self.alternatives[experiment_name][alternative_number])))
return self.alternative_models.get(experiment_name) or []
def get_experiment_names_by_conversion_name(self, conversion_name):
return self.experiment_names_by_conversion_name.get(conversion_name) or []
def get_experiment_names_by_canonical_name(self, canonical_name):
return sorted(self.experiment_names_by_canonical_name.get(canonical_name) or [])
class BingoIdentityCache(object):
MEMCACHE_KEY = "_gae_bingo_identity_cache:%s"
@staticmethod
def key_for_identity(ident):
return BingoIdentityCache.MEMCACHE_KEY % ident
@staticmethod
def get():
init_request_cache_from_memcache()
key = BingoIdentityCache.key_for_identity(identity())
if not request_cache.cache.get(key):
request_cache.cache[key] = BingoIdentityCache.load_from_datastore()
return request_cache.cache[key]
def store_for_identity_if_dirty(self, ident):
if not self.dirty:
return
# No longer dirty
self.dirty = False
memcache.set(BingoIdentityCache.key_for_identity(ident), self)
# Always fire off a task queue to persist bingo identity cache
# since there's no cron job persisting these objects like BingoCache.
self.persist_to_datastore(ident)
def persist_to_datastore(self, ident):
# Add the memcache value to a memcache bucket which
# will be persisted to the datastore when it overflows
# or when the periodic cron job is run
sig = hashlib.md5(str(ident)).hexdigest()
sig_num = int(sig, base=16)
bucket = sig_num % 51
key = "_gae_bingo_identity_bucket:%s" % bucket
list_identities = memcache.get(key) or []
list_identities.append(ident)
if len(list_identities) > 50:
# If over 50 identities are waiting for persistent storage,
# go ahead and kick off a deferred task to do so
# in case it'll be a while before the cron job runs.
deferred.defer(persist_gae_bingo_identity_records, list_identities, _queue=QUEUE_NAME)
# There are race conditions here such that we could miss persistence
# of some identities, but that's not a big deal as long as
# there is no statistical correlation b/w the experiment and those
# being lost.
memcache.set(key, [])
else:
memcache.set(key, list_identities)
@staticmethod
def persist_buckets_to_datastore():
# Persist all memcache buckets to datastore
dict_buckets = memcache.get_multi(["_gae_bingo_identity_bucket:%s" % bucket for bucket in range(0, 50)])
for key in dict_buckets:
if len(dict_buckets[key]) > 0:
deferred.defer(persist_gae_bingo_identity_records, dict_buckets[key], _queue=QUEUE_NAME)
memcache.set(key, [])
@staticmethod
def load_from_datastore():
bingo_identity_cache = _GAEBingoIdentityRecord.load(identity())
if bingo_identity_cache:
bingo_identity_cache.purge()
bingo_identity_cache.dirty = True
bingo_identity_cache.store_for_identity_if_dirty(identity())
else:
bingo_identity_cache = BingoIdentityCache()
return bingo_identity_cache
def __init__(self):
self.dirty = False
self.participating_tests = [] # List of test names currently participating in
self.converted_tests = {} # Dict of test names:number of times user has successfully converted
def purge(self):
bingo_cache = BingoCache.get()
for participating_test in self.participating_tests:
if not participating_test in bingo_cache.experiments:
self.participating_tests.remove(participating_test)
for converted_test in self.converted_tests.keys():
if not converted_test in bingo_cache.experiments:
del self.converted_tests[converted_test]
def participate_in(self, experiment_name):
self.participating_tests.append(experiment_name)
self.dirty = True
def convert_in(self, experiment_name):
if experiment_name not in self.converted_tests:
self.converted_tests[experiment_name] = 1
else:
self.converted_tests[experiment_name] += 1
self.dirty = True
def bingo_and_identity_cache():
return BingoCache.get(), BingoIdentityCache.get()
def store_if_dirty():
# Only load from request cache here -- if it hasn't been loaded from memcache previously, it's not dirty.
bingo_cache = request_cache.cache.get(BingoCache.MEMCACHE_KEY)
bingo_identity_cache = request_cache.cache.get(BingoIdentityCache.key_for_identity(identity()))
if bingo_cache:
bingo_cache.store_if_dirty()
if bingo_identity_cache:
bingo_identity_cache.store_for_identity_if_dirty(identity())
def persist_gae_bingo_identity_records(list_identities):
dict_identity_caches = memcache.get_multi([BingoIdentityCache.key_for_identity(ident) for ident in list_identities])
for ident in list_identities:
identity_cache = dict_identity_caches.get(BingoIdentityCache.key_for_identity(ident))
if identity_cache:
bingo_identity = _GAEBingoIdentityRecord(
key_name = _GAEBingoIdentityRecord.key_for_identity(ident),
identity = ident,
pickled = pickle_util.dump(identity_cache),
)
bingo_identity.put()
class PersistToDatastore(RequestHandler):
def get(self):
BingoCache.get().persist_to_datastore()
BingoIdentityCache.persist_buckets_to_datastore()
class LogSnapshotToDatastore(RequestHandler):
def get(self):
BingoCache.get().log_cache_snapshot()