-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpublish.coffee
435 lines (338 loc) · 17.4 KB
/
publish.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# Copyright 2015, Quixey Inc.
# All rights reserved.
#
# Licensed under the Modified BSD License found in the
# LICENSE file in the root directory of this source tree.
J.defineModel 'JDataSession', 'jframework_datasessions',
_id: $$.str
fields:
querySpecStrings:
type: $$.arr
Fiber = Npm.require 'fibers'
Future = Npm.require 'fibers/future'
J._inMethod = new Meteor.EnvironmentVariable
J.methods = (methods) ->
wrappedMethods = {}
for methodName, methodFunc of methods
do (methodName, methodFunc) ->
wrappedMethods[methodName] = ->
args = arguments
J._inMethod.withValue true, =>
methodFunc.apply @, args
Meteor.methods wrappedMethods
# dataSessionId: J.Dict
# updateObserversFiber: <Fiber>
# querySpecSet: {qsString: true}
# mergedQuerySpecs: [
# {
# modelName:
# selector:
# fields:
# sort:
# skip:
# limit:
# }
# ]
J._dataSessions = {}
# Stores Meteor's publisher functions
# sessionId:
# userId
# added
# changed
# removed
# ...etc
dataSessionPublisherContexts = {}
# The set of active cursors and exactly what each has sent to the client
# dataSessionId: modelName: docId: fieldName: querySpecString: value
J._dataSessionFieldsByModelIdQuery = {}
Meteor.methods
_debugPublish: ->
J._dataSessionFieldsByModelIdQuery
_updateDataQueries: (dataSessionId, addedQuerySpecs, deletedQuerySpecs) ->
log = ->
newArgs = ["[#{dataSessionId}]"].concat _.toArray arguments
console.log.apply console, newArgs
log '_updateDataQueries'
# if addedQuerySpecs.length
# log ' added:', J.util.stringify qs for qs in addedQuerySpecs
# if deletedQuerySpecs.length
# log ' deleted:', J.util.stringify qs for qs in deletedQuerySpecs
session = J._dataSessions[dataSessionId]
if not session?
console.warn "Data session not found", dataSessionId
throw new Meteor.Error "Data session not found: #{JSON.stringify dataSessionId}"
for querySpec in addedQuerySpecs
unless querySpec.modelName of J.models
throw new Meteor.Error "Invalid modelName in querySpec:
#{J.util.stringify querySpec}"
# Apply the diff to session.querySpecSet()
actualAdded = []
actualDeleted = []
for querySpec in deletedQuerySpecs
qsString = J.fetching.stringifyQs querySpec
if session.querySpecSet().hasKey(qsString)
actualDeleted.push querySpec
session.querySpecSet().delete qsString
for querySpec in addedQuerySpecs
qsString = J.fetching.stringifyQs querySpec
unless session.querySpecSet().hasKey(qsString)
actualAdded.push qsString
session.querySpecSet().setOrAdd qsString, true
Tracker.flush()
session.updateObserversFiber = Fiber.current
updateObservers.call dataSessionPublisherContexts[dataSessionId], dataSessionId
session.updateObserversFiber = null
jDataSession = new $$.JDataSession
_id: dataSessionId
querySpecStrings: session.querySpecSet().getKeys()
jDataSession.save ->
# log '..._updateDataQueries done'
Meteor.publish '_jdata', (dataSessionId) ->
check dataSessionId, String
log = ->
newArgs = ["[#{dataSessionId}]"].concat _.toArray arguments
console.log.apply console, newArgs
log 'publish _jdata'
session = J._dataSessions[dataSessionId] = J.AutoDict(
"dataSessions[#{dataSessionId}]"
querySpecSet: J.Dict() # qsString: true
mergedQuerySpecs: => getMergedQuerySpecs session.querySpecSet()
observerByQsString: J.Dict()
)
dataSessionPublisherContexts[dataSessionId] = @
J._dataSessionFieldsByModelIdQuery[dataSessionId] = {}
existingSessionInstance = $$.JDataSession.fetchOne dataSessionId
if existingSessionInstance?
existingQuerySpecs = existingSessionInstance.querySpecStrings().map(
(querySpecString) => J.fetching.parseQs querySpecString
).toArr()
Meteor.call '_updateDataQueries', dataSessionId, existingQuerySpecs, []
@onStop =>
console.log "[#{dataSessionId}] PUBLISHER STOPPED"
removeSession = ->
session.stop()
delete J._dataSessionFieldsByModelIdQuery[dataSessionId]
delete dataSessionPublisherContexts[dataSessionId]
delete J._dataSessions[dataSessionId]
$$.JDataSession.remove dataSessionId
stopObservers = ->
session.observerByQsString().forEach (qsString, observer) =>
observer.stop()
if not session.updateObserversFiber? or not session.updateObserversFiber.started
console.log "Resetting everything."
###
Current Fiber was either never set, or is stopped and ready to be reset.
We can stop observers, delete our session, and attempt to restart the Fiber if it exists.
###
stopObservers()
if session.updateObserversFiber? and not session.updateObserversFiber.started
console.warn "Uh oh, we were in the middle of updating observers."
session.updateObserversFiber?.reset()
removeSession()
else if session.updateObserversFiber?.started
# Current Fiber is still running. Stop the observers, but keep our session.
console.log "Session is still intact."
stopObservers()
@ready()
getMergedQuerySpecs = (querySpecSet) =>
mergedQuerySpecs = J.List()
querySpecSet.forEach (rawQsString) ->
# TODO: Fancier merge stuff
rawQuerySpec = J.fetching.parseQs rawQsString
mergedQuerySpecs.push rawQuerySpec
mergedQuerySpecs
updateObservers = (dataSessionId) ->
log = ->
newArgs = ["[#{dataSessionId}]"].concat _.toArray arguments
console.log.apply console, newArgs
# log "Update observers"
session = J._dataSessions[dataSessionId]
oldQsStrings = session.observerByQsString().getKeys()
newQsStrings = session.mergedQuerySpecs().map(
(qs) => J.fetching.stringifyQs qs.toObj()
).toArr()
qsStringsDiff = J.util.diffStrings oldQsStrings, newQsStrings
# console.log "qsStringsDiff", qsStringsDiff
fieldsByModelIdQuery = J._dataSessionFieldsByModelIdQuery[dataSessionId]
getMergedSubfields = (a, b) ->
return a if b is undefined
return b if a is undefined
if J.util.isPlainObject(a) and J.util.isPlainObject(b)
keySet = {}
keySet[key] = true for key of a
keySet[key] = true for key of b
ret = {}
for key of keySet
ret[key] = getMergedSubfields a[key], b[key]
ret
else
# It's possible that a != b (by value) right now because
# one cursor is triggering an observer for an updated value
# right before all the other cursors are going to trigger
# observers for the same updated value. It's fine to just
# arbitrarily pick (a) and let the merge become eventually
# consistent.
a
getField = (modelName, id, fieldName) ->
fieldValueByQsString = fieldsByModelIdQuery[modelName][id][fieldName] ? {}
value = _.values(fieldValueByQsString).reduce getMergedSubfields, undefined
value
setField = (modelName, id, fieldName, querySpec, value) ->
modelClass = J.models[modelName]
qsString = J.fetching.stringifyQs querySpec
if fieldName is '_reactives'
# log "#{J.util.stringify querySpec} sees #{JSON.stringify id}._reactives: #{JSON.stringify value}"
fieldsByModelIdQuery[modelName][id]._reactives ?= {}
reactivesObj = _.clone fieldsByModelIdQuery[modelName][id]._reactives[qsString] ? {}
fieldsByModelIdQuery[modelName][id]._reactives[qsString] = reactivesObj
inclusionSet = J.fetching._projectionToInclusionSet modelClass, querySpec.fields ? {}
for reactiveName, reactiveSpec of modelClass.reactiveSpecs
included = false
if reactiveSpec.denorm
for fieldOrReactiveSpec of inclusionSet
if fieldOrReactiveSpec.split('.')[0] is reactiveName
included = true
break
if included
reactiveValue = value[reactiveName]?.val
reactiveTs = value[reactiveName]?.ts
reactiveExpire = value[reactiveName]?.expire
reactiveDirty = not reactiveExpire? or (reactiveExpire < new Date())
needsRecalc = false
if reactiveDirty
needsRecalc = true
for qss, qssReactivesObj of fieldsByModelIdQuery[modelName][id]._reactives
continue if reactiveName not of qssReactivesObj
qssVal = qssReactivesObj[reactiveName].val
qssTs = qssReactivesObj[reactiveName].ts
qssExpire = qssReactivesObj[reactiveName].expire
qssDirty = not qssExpire? or qssExpire < new Date()
if not qssDirty and (not reactiveTs? or qssTs > reactiveTs)
# A querySpec still thinks it knows what the reactive
# value is, so we might not need to recompute it. Either qsString's
# cursor will soon observe qss's same value, or else qss and all
# the other cursors will observe undefined and the last one will
# recalculate the value of reactiveName.
needsRecalc = false
# Note that qss might be the same as qsString. This is useful
# when we've recalculated multiple reactives in the publisher
# but the db's _reactives field is still catching up from multiple
# update operations (one per reactive)
if qss is qsString
reactiveValue = qssVal
reactiveTs = qssTs
break
if needsRecalc
if reactivesObj[reactiveName] is undefined
# We know this value is dirty, but publish it anyway.
reactivesObj[reactiveName] =
val: reactiveValue
ts: reactiveTs
expire: reactiveExpire
# Else keep publishing the previous value of reactivesObj[reactiveName]
# until the recalc task gets popped off the queue asynchronously.
console.log "Publisher enqueuing <#{modelName} #{JSON.stringify id}>.#{reactiveName}"
J.denorm._enqueueReactiveCalc modelName, id, reactiveName
else
reactivesObj[reactiveName] =
val: reactiveValue
ts: reactiveTs
expire: reactiveExpire
else
fieldsByModelIdQuery[modelName][id][fieldName] ?= {}
fieldsByModelIdQuery[modelName][id][fieldName][qsString] = value
qsStringsDiff.added.forEach (qsString) =>
querySpec = J.fetching.parseQs qsString
# log "Add observer for: ", querySpec
modelClass = J.models[querySpec.modelName]
mongoSelector = J.fetching.selectorToMongoSelector modelClass, querySpec.selector
mongoOptions = J.fetching._qsToMongoOptions querySpec
# log 'mongoSelector: ', JSON.stringify mongoSelector
# log 'mongoOptions.fields: ', JSON.stringify mongoOptions.fields
cursor = modelClass.collection.find mongoSelector, mongoOptions
observer = cursor.observeChanges
added: (id, fields) =>
# log querySpec, "server says ADDED:", JSON.stringify(id), fields
fields = _.clone fields
fields._reactives ?= {}
if id of (fieldsByModelIdQuery?[querySpec.modelName] ? {})
# The set of projections being watched on this doc may have grown.
changedFields = {}
for fieldName, value of fields
oldValue = getField querySpec.modelName, id, fieldName
setField querySpec.modelName, id, fieldName, querySpec, value
newValue = getField querySpec.modelName, id, fieldName
if not EJSON.equals oldValue, newValue
changedFields[fieldName] = newValue
if not _.isEmpty changedFields
# log querySpec, "sending CHANGED:", id, changedFields
@changed modelClass.collection._name, id, changedFields
else
fieldsByModelIdQuery[querySpec.modelName] ?= {}
fieldsByModelIdQuery[querySpec.modelName][id] ?= {}
changedFields = {}
for fieldName, value of fields
setField querySpec.modelName, id, fieldName, querySpec, value
changedFields[fieldName] = getField querySpec.modelName, id, fieldName
# log querySpec, "sending ADDED:", id, changedFields
@added modelClass.collection._name, id, changedFields
changed: (id, fields) =>
# log querySpec, "server says CHANGED:", JSON.stringify(id), fields
fields = _.clone fields
fields._reactives ?= {}
changedFields = {}
for fieldName, value of fields
oldValue = getField querySpec.modelName, id, fieldName
setField querySpec.modelName, id, fieldName, querySpec, value
newValue = getField querySpec.modelName, id, fieldName
if not EJSON.equals oldValue, newValue
changedFields[fieldName] = newValue
if not _.isEmpty changedFields
# log querySpec, "sending CHANGED:", JSON.stringify(id), changedFields
@changed modelClass.collection._name, id, changedFields
removed: (id) =>
# log querySpec, "server says REMOVED:", JSON.stringify(id)
changedFields = {}
for fieldName in _.keys fieldsByModelIdQuery[querySpec.modelName][id]
oldValue = getField querySpec.modelName, id, fieldName
delete fieldsByModelIdQuery[querySpec.modelName][id][fieldName][qsString]
newValue = getField querySpec.modelName, id, fieldName
if not EJSON.equals oldValue, newValue
changedFields[fieldName] = newValue
if _.isEmpty fieldsByModelIdQuery[querySpec.modelName][id][fieldName]
delete fieldsByModelIdQuery[querySpec.modelName][id][fieldName]
if _.isEmpty fieldsByModelIdQuery[querySpec.modelName][id]
delete fieldsByModelIdQuery[querySpec.modelName][id]
# log querySpec, "sending REMOVED:", JSON.stringify(id)
@removed modelClass.collection._name, id
else if not _.isEmpty changedFields
# log querySpec, "sending CHANGED:", JSON.stringify(id)
@changed modelClass.collection._name, id, changedFields
session.observerByQsString().setOrAdd qsString, observer
qsStringsDiff.deleted.forEach (qsString) =>
querySpec = J.fetching.parseQs qsString
observer = session.observerByQsString().get qsString
observer.stop()
session.observerByQsString().delete qsString
# Send updates to undo the effect that this cursor had on the client's
# view of the overall data set.
modelClass = J.models[querySpec.modelName]
for docId in _.keys fieldsByModelIdQuery[querySpec.modelName] ? {}
cursorValues = fieldsByModelIdQuery[querySpec.modelName][docId]
changedFields = {}
for fieldName in _.keys cursorValues
valueByQsString = cursorValues[fieldName]
oldValue = getField querySpec.modelName, docId, fieldName
delete valueByQsString[qsString]
newValue = getField querySpec.modelName, docId, fieldName
if not EJSON.equals oldValue, newValue
changedFields[fieldName] = newValue
if _.isEmpty valueByQsString
delete cursorValues[fieldName]
if _.isEmpty cursorValues
delete fieldsByModelIdQuery[querySpec.modelName][docId]
# log querySpec, "sending REMOVED", docId
@removed modelClass.collection._name, docId
else if not _.isEmpty changedFields
# log querySpec, "passing along CHANGED", docId, changedFields
@changed modelClass.collection._name, docId, changedFields