Skip to content

Commit

Permalink
skip duplicates messages by inserting hostID
Browse files Browse the repository at this point in the history
Change-Id: Ie73187a051d09c8812cc6118436d428cbc300ad2
Signed-off-by: Florent BENOIT <[email protected]>
  • Loading branch information
benoitf committed Oct 21, 2018
1 parent d188b88 commit ac580dd
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
55 changes: 41 additions & 14 deletions packages/plugin-ext/src/api/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ export class RPCProtocolImpl implements RPCProtocol {
private readonly invokedHandlers: { [req: string]: Promise<any>; };
private readonly pendingRPCReplies: { [msgId: string]: Deferred<any>; };
private readonly multiplexor: RPCMultiplexer;
private messageToSendHostId: string | undefined;

constructor(connection: MessageConnection) {
constructor(connection: MessageConnection, readonly remoteHostID?: string) {
this.isDisposed = false;
// tslint:disable-next-line:no-null-keyword
this.locals = Object.create(null);
Expand All @@ -72,7 +73,7 @@ export class RPCProtocolImpl implements RPCProtocol {
// tslint:disable-next-line:no-null-keyword
this.invokedHandlers = Object.create(null);
this.pendingRPCReplies = {};
this.multiplexor = new RPCMultiplexer(connection, msg => this.receiveOneMessage(msg));
this.multiplexor = new RPCMultiplexer(connection, msg => this.receiveOneMessage(msg), remoteHostID);
}
getProxy<T>(proxyId: ProxyIdentifier<T>): T {
if (!this.proxies[proxyId.id]) {
Expand Down Expand Up @@ -109,7 +110,7 @@ export class RPCProtocolImpl implements RPCProtocol {
const result = new Deferred();

this.pendingRPCReplies[callId] = result;
this.multiplexor.send(MessageFactory.request(callId, proxyId, methodName, args));
this.multiplexor.send(MessageFactory.request(callId, proxyId, methodName, args, this.messageToSendHostId));
return result.promise;
}

Expand All @@ -120,6 +121,17 @@ export class RPCProtocolImpl implements RPCProtocol {

const msg = <RPCMessage>JSON.parse(rawmsg);

// handle message that sets the Host ID
if ((<any>msg).setHostID) {
this.messageToSendHostId = (<any>msg).setHostID;
return;
}

// skip message if not matching host
if (this.remoteHostID && (<any>msg).hostID && this.remoteHostID !== (<any>msg).hostID) {
return;
}

switch (msg.type) {
case MessageType.Request:
this.receiveRequest(msg);
Expand All @@ -141,10 +153,10 @@ export class RPCProtocolImpl implements RPCProtocol {

this.invokedHandlers[callId].then(r => {
delete this.invokedHandlers[callId];
this.multiplexor.send(MessageFactory.replyOK(callId, r));
this.multiplexor.send(MessageFactory.replyOK(callId, r, this.messageToSendHostId));
}, err => {
delete this.invokedHandlers[callId];
this.multiplexor.send(MessageFactory.replyErr(callId, err));
this.multiplexor.send(MessageFactory.replyErr(callId, err, this.messageToSendHostId));
});
}

Expand Down Expand Up @@ -218,11 +230,14 @@ class RPCMultiplexer {

private messagesToSend: string[];

constructor(connection: MessageConnection, onMessage: (msg: string) => void) {
constructor(connection: MessageConnection, onMessage: (msg: string) => void, remoteHostId?: string) {
this.connection = connection;
this.sendAccumulatedBound = this.sendAccumulated.bind(this);

this.messagesToSend = [];
if (remoteHostId) {
this.send(`{"setHostID":"${remoteHostId}"}`);
}

this.connection.onMessage((data: string[]) => {
const len = data.length;
Expand Down Expand Up @@ -252,22 +267,34 @@ class RPCMultiplexer {

class MessageFactory {

public static request(req: string, rpcId: string, method: string, args: any[]): string {
return `{"type":${MessageType.Request},"id":"${req}","proxyId":"${rpcId}","method":"${method}","args":${JSON.stringify(args)}}`;
public static request(req: string, rpcId: string, method: string, args: any[], messageToSendHostId?: string): string {
let prefix = '';
if (messageToSendHostId) {
prefix = `"hostID":"${messageToSendHostId}",`;
}
return `{${prefix}"type":${MessageType.Request},"id":"${req}","proxyId":"${rpcId}","method":"${method}","args":${JSON.stringify(args)}}`;
}

public static replyOK(req: string, res: any): string {
public static replyOK(req: string, res: any, messageToSendHostId?: string): string {
let prefix = '';
if (messageToSendHostId) {
prefix = `"hostID":"${messageToSendHostId}",`;
}
if (typeof res === 'undefined') {
return `{"type":${MessageType.Reply},"id":"${req}"}`;
return `{${prefix}"type":${MessageType.Reply},"id":"${req}"}`;
}
return `{"type":${MessageType.Reply},"id":"${req}","res":${JSON.stringify(res)}}`;
return `{${prefix}"type":${MessageType.Reply},"id":"${req}","res":${JSON.stringify(res)}}`;
}

public static replyErr(req: string, err: any): string {
public static replyErr(req: string, err: any, messageToSendHostId?: string): string {
let prefix = '';
if (messageToSendHostId) {
prefix = `"hostID":"${messageToSendHostId}",`;
}
if (err instanceof Error) {
return `{"type":${MessageType.ReplyErr},"id":"${req}","err":${JSON.stringify(transformErrorForSerialization(err))}}`;
return `{${prefix}"type":${MessageType.ReplyErr},"id":"${req}","err":${JSON.stringify(transformErrorForSerialization(err))}}`;
}
return `{"type":${MessageType.ReplyErr},"id":"${req}","err":null}`;
return `{${prefix}"type":${MessageType.ReplyErr},"id":"${req}","err":null}`;
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/plugin-ext/src/hosted/browser/hosted-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class HostedPluginSupport {
if (plugins.length === 1) {
pluginID = getPluginId(plugins[0].model);
}
const rpc = this.createServerRpc(pluginID);
const rpc = this.createServerRpc(pluginID, hostKey);
const hostedExtManager = rpc.getProxy(MAIN_RPC_CONTEXT.HOSTED_PLUGIN_MANAGER_EXT);
hostedExtManager.$init({ plugins: plugins, preferences: this.preferenceServiceImpl.getPreferences(), env: { queryParams: getQueryParameters() } });
setUpPluginApi(rpc, container);
Expand Down Expand Up @@ -121,7 +121,7 @@ export class HostedPluginSupport {
return result;
}

private createServerRpc(pluginID: string): RPCProtocol {
private createServerRpc(pluginID: string, hostID: string): RPCProtocol {
return new RPCProtocolImpl({
onMessage: this.watcher.onPostMessageEvent,
send: message => {
Expand All @@ -130,6 +130,6 @@ export class HostedPluginSupport {
wrappedMessage['content'] = message;
this.server.onMessage(JSON.stringify(wrappedMessage));
}
});
}, hostID);
}
}

0 comments on commit ac580dd

Please sign in to comment.