Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
make-github-pseudonymous-again committed Jul 23, 2024
1 parent f1983f5 commit 56ccada
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 76 deletions.
9 changes: 0 additions & 9 deletions imports/api/publication/useItem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ const useItem = <T extends Document, U = T>(
deps,
);

console.debug({
what: 'USE_ITEM',
collection: collection?._collection.name,
selector,
options,
deps,
items,
});

assert(items.length <= 1, `useItem got items.length === ${items.length}`);
const result = items[0];
const found = Boolean(result);
Expand Down
208 changes: 148 additions & 60 deletions imports/api/query/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import {type Filter as MongoFilter} from 'mongodb';

import {
type ClientSessionOptions,
type ChangeStream,
type ChangeStreamOptions,
type Timestamp,
} from 'mongodb';

import debounce from 'debounce';

import {isObject} from '@functional-abstraction/type';

import type Collection from '../Collection';
Expand All @@ -16,7 +19,7 @@ import type Document from '../Document';
import {type Options} from '../transaction/TransactionDriver';
import withSession from '../transaction/withSession';

import {type EventEmitter, eventEmitter} from '../../lib/events';
import {EventEmitter, eventEmitter} from '../../lib/events';

import type Filter from './Filter';

Expand Down Expand Up @@ -54,12 +57,23 @@ const _filterToFullDocumentFilter = <T>(
]),
);

type Match = {

Check warning on line 60 in imports/api/query/watch.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/query/watch.ts#L60

Added line #L60 was not covered by tests
$match: {}
};


type Pipeline = {
pipeline: Match[];
isSuperset: boolean;
};


const _fullDocumentMissingFilter = {fullDocument: undefined};
const _fullDocumentBeforeChangeMissingFilter = {
fullDocumentBeforeChange: undefined,
};

const _filterToMatch = <T>(filter: Filter<T>) => ({
const _filterToMatch = <T>(filter: Filter<T>): Match => ({
$match: {
$or: [
// TODO Correctly configure collections to define fullDocument*
Expand All @@ -75,23 +89,23 @@ const _filterToMatch = <T>(filter: Filter<T>) => ({
},
});

const _filterToPipeline = <T>({$text, ...rest}: Filter<T>) => {
const _filterToPipeline = <T>({$text, ...rest}: Filter<T>): Pipeline => {
return {
pipeline: [_filterToMatch(rest as Filter<T>)],
// TODO Any occurrence of $text should yield this, not just top-level.
isSuperset: $text !== undefined,
};
};

const _noFullDocumentMatch = () => ({
const _noFullDocumentMatch = (): Match => ({
$match: {
// NOTE This matches everything if pre- or post- images are not
// configured, which is very inefficient.
$or: [_fullDocumentMissingFilter, _fullDocumentBeforeChangeMissingFilter],
},
});

const _noFullDocumentPipeline = () => {
const _noFullDocumentPipeline = (): Pipeline => {

Check warning on line 108 in imports/api/query/watch.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/query/watch.ts#L105-L108

Added lines #L105 - L108 were not covered by tests
return {
pipeline: [_noFullDocumentMatch()],
isSuperset: true,
Expand All @@ -101,9 +115,11 @@ const _noFullDocumentPipeline = () => {
const _optionsToPipeline = (options: Options) =>

Check warning on line 115 in imports/api/query/watch.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/query/watch.ts#L111-L115

Added lines #L111 - L115 were not covered by tests
options.project === undefined ? [] : [{$project: options.project}];

let watchCount = 0;

Check warning on line 118 in imports/api/query/watch.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/query/watch.ts#L118

Added line #L118 was not covered by tests

const _watchStream = <T extends Document, U = T>(
collection: Collection<T, U>,
filterPipeline,
filterPipeline: Match[],
options: Options,
startAtOperationTime: Timestamp,
changeStreamOptions?: ChangeStreamOptions,
Expand All @@ -117,14 +133,66 @@ const _watchStream = <T extends Document, U = T>(
{$changeStreamSplitLargeEvent: {}},
];

return collection.rawCollection().watch(pipeline, {
const rawCollection = collection.rawCollection();
const {collectionName} = rawCollection;

console.debug({collection: collectionName, watchCount: ++watchCount});
const stream = rawCollection.watch(pipeline, {
startAtOperationTime,
fullDocument: 'whenAvailable',
fullDocumentBeforeChange: 'whenAvailable',
...changeStreamOptions,
});

Check warning on line 146 in imports/api/query/watch.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/query/watch.ts#L146

Added line #L146 was not covered by tests
return _groupFragments(stream);
};

const _groupFragments = <T extends Document>( stream: ChangeStream<T>,) => {
const emitter = eventEmitter<{ entry: ChangeStreamEvent; close: undefined }>();

let event: Fragment = {
_id: {

Check warning on line 154 in imports/api/query/watch.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/query/watch.ts#L154

Added line #L154 was not covered by tests
_data: '',
},
splitEvent: {
fragment: 1,
of: 1,
},
};

stream.on('change', (fragment: ChangeStreamEvent | Fragment) => {
if (fragment.splitEvent === undefined) {
assert(fragment._id._data !== event._id._data);
assert(event.splitEvent.fragment === event.splitEvent.of);
event = {...fragment, splitEvent: {fragment: 1, of: 1}};
} else if (fragment.splitEvent.fragment === 1) {
assert(fragment._id._data !== event._id._data);
assert(event.splitEvent.fragment === event.splitEvent.of);
assert(fragment.splitEvent.fragment === 1);
event = fragment;
} else {

Check warning on line 173 in imports/api/query/watch.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/query/watch.ts#L173

Added line #L173 was not covered by tests
assert(fragment._id._data === event._id._data);
assert(fragment.splitEvent.fragment === event.splitEvent.fragment + 1);
assert(fragment.splitEvent.of === event.splitEvent.of);
assert(fragment.splitEvent.fragment <= fragment.splitEvent.of);
event = {...event, ...fragment};
}

if (event.splitEvent.fragment !== event.splitEvent.of) return;

const {splitEvent, ...rest} = event;

void emitter.emitSerial('entry', rest);
});

emitter.once('close').then(
async () => stream.close()
);

return emitter;
};


const _watchSetup = async <T extends Document, U = T>(
collection: Collection<T, U>,
filter: Filter<T>,
Expand Down Expand Up @@ -169,7 +237,6 @@ const _makeQueue = async () => {
await queue;

const enqueue = (task: () => Promise<void> | void) => {
// TODO Throttle.
if (queued !== 0) return;
++queued;
queue = queue
Expand All @@ -185,12 +252,6 @@ const _makeQueue = async () => {
return enqueue;
};

export type WatchHandle<T> = EventEmitter<{
change: T[];
start: undefined;
stop: undefined;
}>;

type Fragment = {
_id: {
_data: string;
Expand All @@ -205,7 +266,76 @@ type ChangeStreamEvent = {
_id: {
_data: string;
};
splitEvent: undefined;
splitEvent?: undefined;
}


export type FilteredOplogHandle = EventEmitter<{
entry: ChangeStreamEvent;
close: undefined;
}>;

export type WatchHandle<T> = EventEmitter<{
change: T[];
start: undefined;
stop: undefined;
}>;

class Watch<T extends Document, U = T> extends EventEmitter<T>{
collection: Collection<T, U>;
filter: Filter<T>;
options: Options;
changeStreamOptions?: ChangeStreamOptions;
sessionOptions?: ClientSessionOptions;

constructor(collection, filter, options, sessionOptions) {
super();
this.collection = collection;
this.filter = filter;
this.options = options;
this.sessionOptions = sessionOptions;
}

get init() {
return [];
}

stop() {

}
}

const PIPE_DEBOUNCE = 50;
let changeCount = 0;

const _pipe = async <T extends Document, U = T>(
handle: WatchHandle<T>,
emitter: FilteredOplogHandle,
w: Watch<T, U>
) => {

const enqueue = await _makeQueue();

const onEntry = debounce(
() => {
enqueue(async () => {
const {init} = await _watchInit(
w.collection,
w.filter,
w.options,
w.sessionOptions,
);

console.debug({changeCount: ++changeCount});
await handle.emitSerial('change', init);
});
},
PIPE_DEBOUNCE
);

emitter.on('entry', onEntry);

// TODO stream.on('stop', ???)
};

const _watch = async <T extends Document, U = T>(
Expand All @@ -224,54 +354,12 @@ const _watch = async <T extends Document, U = T>(
sessionOptions,
);

const enqueue = await _makeQueue();

await handle.emitSerial('change', init);

let event: Fragment = {
_id: {
_data: '',
},
splitEvent: {
fragment: 1,
of: 1,
},
};

stream.on('change', (fragment: ChangeStreamEvent | Fragment) => {
if (fragment.splitEvent === undefined) {
assert(fragment._id._data !== event._id._data);
assert(event.splitEvent.fragment === event.splitEvent.of);
event = {...fragment, splitEvent: {fragment: 1, of: 1}};
} else if (fragment.splitEvent.fragment === 1) {
assert(fragment._id._data !== event._id._data);
assert(event.splitEvent.fragment === event.splitEvent.of);
assert(fragment.splitEvent.fragment === 1);
event = fragment;
} else {
assert(fragment._id._data === event._id._data);
assert(fragment.splitEvent.fragment === event.splitEvent.fragment + 1);
assert(fragment.splitEvent.of === event.splitEvent.of);
assert(fragment.splitEvent.fragment <= fragment.splitEvent.of);
event = {...event, ...fragment};
}

if (event.splitEvent.fragment !== event.splitEvent.of) return;

enqueue(async () => {
const {init} = await _watchInit(
collection,
filter,
options,
sessionOptions,
);
const w = new Watch<T>(collection, filter, options, sessionOptions);
await _pipe<T>(handle, stream, w);

await handle.emitSerial('change', init);
});
});

// TODO stream.on('stop', ???)
const stop = async () => stream.close();
const stop = async () => stream.emitSerial('close');

return stop;
};
Expand Down
1 change: 1 addition & 0 deletions imports/lib/events.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Emittery from 'emittery';

export type EventEmitter<T> = Emittery<T>;
export const EventEmitter = Emittery;
export const eventEmitter = <T>() => new Emittery<T>();
7 changes: 0 additions & 7 deletions types/meteor/mongo.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ declare module 'meteor/mongo' {
observeAsync(options: ObserveCallbacks<U>): Promise<ObserveHandle>;
}

// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
interface Collection<T, U = T> {
_collection: {
name: string;
};
}

type ObserveHandle = {
stop(): void;
};
Expand Down

0 comments on commit 56ccada

Please sign in to comment.