From a40b17b885e2ecabdfa65f0ed4422465fae02c44 Mon Sep 17 00:00:00 2001 From: make-github-pseudonymous-again <5165674+make-github-pseudonymous-again@users.noreply.github.com> Date: Wed, 5 Jun 2024 23:53:03 +0200 Subject: [PATCH] :construction: progress: First working polling implementation. --- imports/api/makeObservedQueryPublication.ts | 61 +++++++++++---------- imports/api/query/watch.ts | 30 ++++++---- types/meteor/diff-sequence.d.ts | 17 ++++++ 3 files changed, 69 insertions(+), 39 deletions(-) create mode 100644 types/meteor/diff-sequence.d.ts diff --git a/imports/api/makeObservedQueryPublication.ts b/imports/api/makeObservedQueryPublication.ts index 5d2a2715d..4bb776982 100644 --- a/imports/api/makeObservedQueryPublication.ts +++ b/imports/api/makeObservedQueryPublication.ts @@ -1,8 +1,11 @@ +import {DiffSequence} from 'meteor/diff-sequence'; + import schema from '../lib/schema'; import type Collection from './Collection'; import type Document from './Document'; import type Filter from './query/Filter'; +import type ObserveChangesCallbacks from './ObserveChangesCallbacks'; import queryToSelectorOptionsPair from './query/queryToSelectorOptionsPair'; import {userQuery} from './query/UserQuery'; import type UserQuery from './query/UserQuery'; @@ -10,7 +13,8 @@ import watch from './query/watch'; const observeOptions = schema .object({ - added: schema.boolean().optional(), + addedBefore: schema.boolean().optional(), + movedBefore: schema.boolean().optional(), removed: schema.boolean().optional(), changed: schema.boolean().optional(), }) @@ -39,11 +43,14 @@ const makeObservedQueryPublication = ( ...selector, owner: this.userId, }; - // const callbacks: ObserveOptions = { - // added: true, - // removed: true, - // ...observe, - // }; + + const callbacks: ObserveOptions = { + addedBefore: true, + movedBefore: true, + removed: true, + ...observe, + }; + const uid = JSON.stringify({ key, selector, @@ -55,33 +62,29 @@ const makeObservedQueryPublication = ( this.stop(); }; + // NOTE We only diff ids if we do not care about change events. + const diffOptions = callbacks.changed ? undefined : { + projectionFn: ({_id}) => _id, + }; + + const observer: ObserveChangesCallbacks = Object.fromEntries([ + callbacks.addedBefore && ['addedBefore', stop], + callbacks.movedBefore && ['movedBefore', stop], + callbacks.removed && ['removed', stop], + callbacks.changed && ['changed', stop], + ].filter(Boolean)); + const handle = await watch( QueriedCollection, selector as Filter, options, - async () => { - stop(); - // switch (operationType) { - // case 'replace': - // case 'update': { - // if (callbacks.changed) stop(); - // break; - // } - - // case 'insert': { - // if (callbacks.added) stop(); - // break; - // } - - // case 'delete': { - // if (callbacks.removed) stop(); - // break; - // } - - // default: { - // stop(); - // } - // } + async (init) => { + DiffSequence.diffQueryOrderedChanges( + handle.init, + init, + observer, + diffOptions, + ); }, ); diff --git a/imports/api/query/watch.ts b/imports/api/query/watch.ts index c3fd866fc..b83543c22 100644 --- a/imports/api/query/watch.ts +++ b/imports/api/query/watch.ts @@ -40,17 +40,24 @@ const _watchInit = async ( sessionOptions, ); -const _filterToFullDocumentFilter = (filter: Filter) => +const _filterToFullDocumentFilter = (operationKey: string, filter: Filter) => Object.fromEntries( Object.entries(filter).map(([key, value]) => [ - key.startsWith('$') ? key : `fullDocument.${key}`, - isObject(value) ? _filterToFullDocumentFilter(value as Filter) : value, + key.startsWith('$') ? key : `${operationKey}.${key}`, + isObject(value) ? _filterToFullDocumentFilter(operationKey, value as Filter) : value, ]), ); const _filterToMatch = (filter: Filter) => ({ $match: { - $and: _filterToFullDocumentFilter(filter), + $or: [ + _filterToFullDocumentFilter('fullDocument', filter), + _filterToFullDocumentFilter('fullDocumentBeforeChange', filter), + {$and: [ + {fullDocument: undefined}, + {fullDocumentBeforeChange: undefined}, + ]}, + ], }, }); @@ -62,7 +69,7 @@ const _filterToPipeline = ({$text, ...rest}: Filter) => { }; }; -const _optionsToPipeline = (options: Options) => [{$project: options.project}]; +const _optionsToPipeline = (options: Options) => options.project === undefined ? [] : [{$project: options.project}]; const _watchStream = ( collection: Collection, @@ -136,21 +143,24 @@ const watch = async ( ); let queued = 0; - const queue = new Promise((resolve) => { + let queue = new Promise((resolve) => { resolve(undefined); }); + await queue; + const enqueue = (task) => { // TODO Throttle. - if (queued === 0) return; + if (queued !== 0) return; ++queued; - queue.then( + queue = queue.then( () => { --queued; return task(); - }, + } + ).catch( (error) => { console.error({error}); - }, + } ); }; diff --git a/types/meteor/diff-sequence.d.ts b/types/meteor/diff-sequence.d.ts new file mode 100644 index 000000000..64bd3d928 --- /dev/null +++ b/types/meteor/diff-sequence.d.ts @@ -0,0 +1,17 @@ +declare module 'meteor/diff-sequence' { + import {type Mongo} from 'meteor/mongo'; + import {type Document} from 'mongodb'; + + type Options = { + projectionFn?: (document: T) => Partial; + }; + + namespace DiffSequence { + function diffQueryOrderedChanges( + old_results: T, + new_results: T, + observer: Mongo.ObserveChangesCallbacks, + options?: Options | undefined, + ): void; + } +}