Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Cache latest op version when broadcasting presence #679

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 95 additions & 23 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ function Agent(backend, stream) {
// request if the client disconnects ungracefully. This is a
// map of channel -> id -> request
this.presenceRequests = Object.create(null);
// Keep track of the latest known Doc version, so that we can avoid fetching
// ops to transform presence if not needed
this.latestDocVersionStreams = Object.create(null);
this.latestDocVersions = Object.create(null);

// We need to track this manually to make sure we don't reply to messages
// after the stream was closed.
Expand Down Expand Up @@ -108,24 +112,21 @@ Agent.prototype._cleanup = function() {
emitter.destroy();
}
this.subscribedQueries = Object.create(null);

for (var collection in this.latestDocVersionStreams) {
var streams = this.latestDocVersionStreams[collection];
for (var id in streams) streams[id].destroy();
}
this.latestDocVersionStreams = Object.create(null);
};

/**
* Passes operation data received on stream to the agent stream via
* _sendOp()
*/
Agent.prototype._subscribeToStream = function(collection, id, stream) {
if (this.closed) return stream.destroy();

var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = Object.create(null));

// If already subscribed to this document, destroy the previously subscribed stream
var previous = streams[id];
if (previous) previous.destroy();
streams[id] = stream;

var agent = this;
stream.on('data', function(data) {
this._subscribeMapToStream(this.subscribedDocs, collection, id, stream, function(data) {
if (data.error) {
// Log then silently ignore errors in a subscription stream, since these
// may not be the client's fault, and they were not the result of a
Expand All @@ -135,13 +136,26 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
}
agent._onOp(collection, id, data);
});
};

Agent.prototype._subscribeMapToStream = function(map, collection, id, stream, dataHandler) {
if (this.closed) return stream.destroy();

var streams = map[collection] || (map[collection] = Object.create(null));

// If already subscribed to this document, destroy the previously subscribed stream
var previous = streams[id];
if (previous) previous.destroy();
streams[id] = stream;

stream.on('data', dataHandler);
stream.on('end', function() {
// The op stream is done sending, so release its reference
var streams = agent.subscribedDocs[collection];
var streams = map[collection];
if (!streams || streams[id] !== stream) return;
delete streams[id];
if (util.hasKeys(streams)) return;
delete agent.subscribedDocs[collection];
delete map[collection];
});
};

Expand Down Expand Up @@ -794,25 +808,83 @@ Agent.prototype._broadcastPresence = function(presence, callback) {
collection: presence.c
};
var start = Date.now();
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, this, context, function(error) {

var subscriptionUpdater = presence.p === null ?
this._unsubscribeDocVersion.bind(this) :
this._subscribeDocVersion.bind(this);

subscriptionUpdater(presence.c, presence.d, function(error) {
if (error) return callback(error);
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null));
var previousRequest = requests[presence.id];
if (!previousRequest || previousRequest.pv < presence.pv) {
presenceRequests[presence.ch][presence.id] = presence;
}
backend.transformPresenceToLatestVersion(agent, presence, function(error, presence) {
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, agent, context, function(error) {
if (error) return callback(error);
var channel = agent._getPresenceChannel(presence.ch);
agent.backend.pubsub.publish([channel], presence, function(error) {
if (error) return callback(error);
backend.emit('timing', 'presence.broadcast', Date.now() - start, context);
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null));
var previousRequest = requests[presence.id];
if (!previousRequest || previousRequest.pv < presence.pv) {
presenceRequests[presence.ch][presence.id] = presence;
}

var transformer = function(agent, presence, callback) {
callback(null, presence);
};

var latestDocVersion = util.dig(agent.latestDocVersions, presence.c, presence.d);
var presenceIsUpToDate = presence.v === latestDocVersion;
if (!presenceIsUpToDate) {
// null presence can't be transformed, so skip the database call and just
// set the version to the latest known Doc version
if (presence.p === null) {
transformer = function(agent, presence, callback) {
presence.v = latestDocVersion;
callback(null, presence);
};
} else {
transformer = backend.transformPresenceToLatestVersion.bind(backend);
}
}

transformer(agent, presence, function(error, presence) {
if (error) return callback(error);
var channel = agent._getPresenceChannel(presence.ch);
agent.backend.pubsub.publish([channel], presence, function(error) {
if (error) return callback(error);
backend.emit('timing', 'presence.broadcast', Date.now() - start, context);
callback(null, presence);
});
});
});
});
};

Agent.prototype._subscribeDocVersion = function(collection, id, callback) {
if (!collection || !id) return callback();

var latestDocVersions = this.latestDocVersions;
var isSubscribed = util.dig(latestDocVersions, collection, id) !== undefined;
if (isSubscribed) return callback();

var agent = this;
this.backend.subscribe(this, collection, id, null, function(error, stream, snapshot) {
if (error) return callback(error);

util.digOrCreate(latestDocVersions, collection, id, function() {
return snapshot.v;
});

agent._subscribeMapToStream(agent.latestDocVersionStreams, collection, id, stream, function(op) {
// op.v behind snapshot.v by 1
latestDocVersions[collection][id] = op.v + 1;
});

callback();
});
};

Agent.prototype._unsubscribeDocVersion = function(collection, id, callback) {
var stream = util.dig(this.latestDocVersionStreams, collection, id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't clean up the this.latestDocVersions, which means a long-lived client that goes through many docs could have an agent with tons of old entries in that map.

Per discussion, immediately cleaning up this.latestDocVersions may make the case of multiple null presences worse. Perhaps a delayed cleanup or something.

if (stream) stream.destroy();
util.nextTick(callback);
};

Agent.prototype._createPresence = function(request) {
return {
a: ACTIONS.presence,
Expand Down
38 changes: 38 additions & 0 deletions test/client/presence/doc-presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var types = require('../../../lib/types');
var presenceTestType = require('./presence-test-type');
var errorHandler = require('../../util').errorHandler;
var PresencePauser = require('./presence-pauser');
var sinon = require('sinon');
types.register(presenceTestType.type);

describe('DocPresence', function() {
Expand Down Expand Up @@ -297,6 +298,43 @@ describe('DocPresence', function() {
], done);
});

it('does not call getOps() when presence is already up-to-date', function(done) {
var localPresence1 = presence1.create('presence-1');

async.series([
doc1.fetch.bind(doc1), // Ensure up-to-date
function(next) {
sinon.spy(Backend.prototype, 'getOps');
next();
},
localPresence1.submit.bind(localPresence1, {index: 1}),
function(next) {
expect(Backend.prototype.getOps).not.to.have.been.called;
next();
}
], done);
});

it('does not call getOps() for old presence when it is null', function(done) {
var localPresence1 = presence1.create('presence-1');

async.series([
doc1.unsubscribe.bind(doc1),
doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}),
function(next) {
expect(doc1.version).to.eql(1);
expect(doc2.version).to.eql(2);

sinon.spy(Backend.prototype, 'getOps');
localPresence1.submit(null, function(error) {
if (error) return next(error);
expect(Backend.prototype.getOps).not.to.have.been.called;
next();
});
}
], done);
});

// This test case attempts to force us into a tight race condition corner case:
// 1. doc1 sends presence, as well as submits an op
// 2. doc2 receives the op first, followed by the presence, which is now out-of-date
Expand Down
Loading