Skip to content

Commit

Permalink
🚧 progress: First working polling implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
make-github-pseudonymous-again committed Jun 5, 2024
1 parent bf39376 commit a40b17b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 39 deletions.
61 changes: 32 additions & 29 deletions imports/api/makeObservedQueryPublication.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import {DiffSequence} from 'meteor/diff-sequence';

import schema from '../lib/schema';

import type Collection from './Collection';
import type Document from './Document';
import type Filter from './query/Filter';
import type ObserveChangesCallbacks from './ObserveChangesCallbacks';
import queryToSelectorOptionsPair from './query/queryToSelectorOptionsPair';
import {userQuery} from './query/UserQuery';
import type UserQuery from './query/UserQuery';
import watch from './query/watch';

const observeOptions = schema
.object({
added: schema.boolean().optional(),
addedBefore: schema.boolean().optional(),
movedBefore: schema.boolean().optional(),
removed: schema.boolean().optional(),
changed: schema.boolean().optional(),
})
Expand Down Expand Up @@ -39,11 +43,14 @@ const makeObservedQueryPublication = <T extends Document, U = T>(
...selector,
owner: this.userId,
};
// const callbacks: ObserveOptions = {
// added: true,
// removed: true,
// ...observe,
// };

const callbacks: ObserveOptions = {
addedBefore: true,
movedBefore: true,
removed: true,
...observe,
};

const uid = JSON.stringify({
key,
selector,
Expand All @@ -55,33 +62,29 @@ const makeObservedQueryPublication = <T extends Document, U = T>(
this.stop();
};

// NOTE We only diff ids if we do not care about change events.
const diffOptions = callbacks.changed ? undefined : {
projectionFn: ({_id}) => _id,
};

const observer: ObserveChangesCallbacks<T> = Object.fromEntries([
callbacks.addedBefore && ['addedBefore', stop],
callbacks.movedBefore && ['movedBefore', stop],
callbacks.removed && ['removed', stop],
callbacks.changed && ['changed', stop],
].filter(Boolean));

const handle = await watch<T, U>(
QueriedCollection,
selector as Filter<T>,
options,
async () => {
stop();
// switch (operationType) {
// case 'replace':
// case 'update': {
// if (callbacks.changed) stop();
// break;
// }

// case 'insert': {
// if (callbacks.added) stop();
// break;
// }

// case 'delete': {
// if (callbacks.removed) stop();
// break;
// }

// default: {
// stop();
// }
// }
async (init) => {
DiffSequence.diffQueryOrderedChanges(
handle.init,
init,
observer,
diffOptions,
);
},
);

Expand Down
30 changes: 20 additions & 10 deletions imports/api/query/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,24 @@ const _watchInit = async <T extends Document, U = T>(
sessionOptions,
);

const _filterToFullDocumentFilter = <T>(filter: Filter<T>) =>
const _filterToFullDocumentFilter = <T>(operationKey: string, filter: Filter<T>) =>
Object.fromEntries(
Object.entries(filter).map(([key, value]) => [
key.startsWith('$') ? key : `fullDocument.${key}`,
isObject(value) ? _filterToFullDocumentFilter(value as Filter<T>) : value,
key.startsWith('$') ? key : `${operationKey}.${key}`,
isObject(value) ? _filterToFullDocumentFilter(operationKey, value as Filter<T>) : value,
]),
);

const _filterToMatch = <T>(filter: Filter<T>) => ({
$match: {
$and: _filterToFullDocumentFilter(filter),
$or: [
_filterToFullDocumentFilter('fullDocument', filter),
_filterToFullDocumentFilter('fullDocumentBeforeChange', filter),
{$and: [
{fullDocument: undefined},
{fullDocumentBeforeChange: undefined},
]},
],
},
});

Expand All @@ -62,7 +69,7 @@ const _filterToPipeline = <T>({$text, ...rest}: Filter<T>) => {
};
};

Check warning on line 70 in imports/api/query/watch.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/query/watch.ts#L70

Added line #L70 was not covered by tests

const _optionsToPipeline = (options: Options) => [{$project: options.project}];
const _optionsToPipeline = (options: Options) => options.project === undefined ? [] : [{$project: options.project}];

const _watchStream = <T extends Document, U = T>(
collection: Collection<T, U>,
Expand Down Expand Up @@ -136,21 +143,24 @@ const watch = async <T extends Document, U = T>(
);

let queued = 0;
const queue = new Promise((resolve) => {
let queue = new Promise((resolve) => {
resolve(undefined);
});
await queue;

const enqueue = (task) => {
// TODO Throttle.
if (queued === 0) return;
if (queued !== 0) return;
++queued;
queue.then(
queue = queue.then(
() => {
--queued;
return task();
},
}
).catch(
(error) => {
console.error({error});
},
}
);
};

Expand Down
17 changes: 17 additions & 0 deletions types/meteor/diff-sequence.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
declare module 'meteor/diff-sequence' {
import {type Mongo} from 'meteor/mongo';
import {type Document} from 'mongodb';

type Options<T> = {
projectionFn?: (document: T) => Partial<T>;
};

namespace DiffSequence {
function diffQueryOrderedChanges<T extends Document>(
old_results: T,
new_results: T,
observer: Mongo.ObserveChangesCallbacks<T>,
options?: Options<T> | undefined,
): void;
}
}

0 comments on commit a40b17b

Please sign in to comment.