Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use coincident if crossOriginIsolated, comlink otherwise #126

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions packages/pyodide-kernel/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
"schema/*.json"
],
"scripts": {
"build": "jlpm build:lib && jlpm build:worker",
"build": "jlpm build:lib && jlpm build:workers",
"build:lib": "tsc -b",
"build:prod": "jlpm build",
"build:py": "python scripts/generate-wheels-js.py",
"build:worker": "esbuild --bundle --minify --sourcemap --target=es2019 --format=esm --outfile=lib/coincident.worker.js src/coincident.worker.ts",
"build:coincident:worker": "esbuild --bundle --minify --sourcemap --target=es2019 --format=esm --outfile=lib/coincident.worker.js src/coincident.worker.ts",
"build:comlink:worker": "esbuild --bundle --minify --sourcemap --target=es2019 --format=esm --outfile=lib/comlink.worker.js src/comlink.worker.ts",
"build:workers": "jlpm build:coincident:worker && jlpm build:comlink:worker",
"dist": "cd ../../dist && npm pack ../packages/pyodide-kernel",
"clean": "jlpm clean:lib && jlpm clean:py",
"clean:all": "jlpm clean",
Expand All @@ -55,7 +57,8 @@
"@jupyterlab/coreutils": "^6.1.1",
"@jupyterlite/contents": "^0.4.0-alpha.3",
"@jupyterlite/kernel": "^0.4.0-alpha.3",
"coincident": "^1.2.3"
"coincident": "^1.2.3",
"comlink": "^4.4.1"
},
"devDependencies": {
"@babel/core": "^7.22.17",
Expand Down
33 changes: 13 additions & 20 deletions packages/pyodide-kernel/src/coincident.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import coincident from 'coincident';
import {
ContentsAPI,
DriveFS,
ServiceWorkerContentsAPI,
TDriveMethod,
TDriveRequest,
TDriveResponse,
} from '@jupyterlite/contents';

import { PyodideRemoteKernel } from './worker';
import { IPyodideWorkerKernel } from './tokens';

const workerAPI: IPyodideWorkerKernel = coincident(self) as IPyodideWorkerKernel;
import { PyodideRemoteKernel } from './worker';

const workerAPI = coincident(self) as IPyodideWorkerKernel;

/**
* An Emscripten-compatible synchronous Contents API using shared array buffers.
Expand All @@ -30,26 +30,16 @@ export class SharedBufferContentsAPI extends ContentsAPI {
}

/**
* A custom drive implementation which uses shared array buffers if available, service worker otherwise
* A custom drive implementation which uses shared array buffers if available
*/
class PyodideDriveFS extends DriveFS {
createAPI(options: DriveFS.IOptions): ContentsAPI {
if (crossOriginIsolated) {
return new SharedBufferContentsAPI(
options.driveName,
options.mountpoint,
options.FS,
options.ERRNO_CODES,
);
} else {
return new ServiceWorkerContentsAPI(
options.baseUrl,
options.driveName,
options.mountpoint,
options.FS,
options.ERRNO_CODES,
);
}
return new SharedBufferContentsAPI(
options.driveName,
options.mountpoint,
options.FS,
options.ERRNO_CODES,
);
}
}

Expand Down Expand Up @@ -83,6 +73,9 @@ export class PyodideCoincidentKernel extends PyodideRemoteKernel {

const worker = new PyodideCoincidentKernel();

const sendWorkerMessage = workerAPI.processWorkerMessage.bind(workerAPI);
worker.registerCallback(sendWorkerMessage);

workerAPI.initialize = worker.initialize.bind(worker);
workerAPI.execute = worker.execute.bind(worker);
workerAPI.complete = worker.complete.bind(worker);
Expand Down
61 changes: 61 additions & 0 deletions packages/pyodide-kernel/src/comlink.worker.ts
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could technically keep both entrypoints in a single file. But having a separate file for coincident and comlink may be easier to reason about.

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Jupyter Development Team.
// Distributed under the terms of the Modified BSD License.

/**
* A WebWorker entrypoint that uses comlink to handle postMessage details
*/

import { expose } from 'comlink';

import { ContentsAPI, DriveFS, ServiceWorkerContentsAPI } from '@jupyterlite/contents';

import { IPyodideWorkerKernel } from './tokens';

import { PyodideRemoteKernel } from './worker';

/**
* A custom drive implementation which uses the service worker
*/
class PyodideDriveFS extends DriveFS {
createAPI(options: DriveFS.IOptions): ContentsAPI {
return new ServiceWorkerContentsAPI(
options.baseUrl,
options.driveName,
options.mountpoint,
options.FS,
options.ERRNO_CODES,
);
}
}

export class PyodideComlinkKernel extends PyodideRemoteKernel {
/**
* Setup custom Emscripten FileSystem
*/
protected async initFilesystem(
options: IPyodideWorkerKernel.IOptions,
): Promise<void> {
if (options.mountDrive) {
const mountpoint = '/drive';
const { FS, PATH, ERRNO_CODES } = this._pyodide;
const { baseUrl } = options;

const driveFS = new PyodideDriveFS({
FS,
PATH,
ERRNO_CODES,
baseUrl,
driveName: this._driveName,
mountpoint,
});
FS.mkdir(mountpoint);
FS.mount(driveFS, {}, mountpoint);
FS.chdir(mountpoint);
this._driveFS = driveFS;
}
}
}

const worker = new PyodideComlinkKernel();

expose(worker);
1 change: 1 addition & 0 deletions packages/pyodide-kernel/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

export * from './_pypi';
export * from './coincident.worker';
export * from './comlink.worker';
export * from './kernel';
export * from './tokens';
export * from './worker';
71 changes: 44 additions & 27 deletions packages/pyodide-kernel/src/kernel.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import coincident from 'coincident';

import { Remote, proxy, wrap } from 'comlink';

import { PromiseDelegate } from '@lumino/coreutils';

import { PageConfig } from '@jupyterlab/coreutils';
Expand Down Expand Up @@ -28,30 +30,8 @@ export class PyodideKernel extends BaseKernel implements IKernel {
constructor(options: PyodideKernel.IOptions) {
super(options);
this._worker = this.initWorker(options);
this._worker.onmessage = (e) => this._processWorkerMessage(e.data);
this._remoteKernel = this.initRemote(options);
this._contentsManager = options.contentsManager;
this.setupFilesystemAPIs();
}

private setupFilesystemAPIs() {
(this._remoteKernel.processDriveRequest as any) = async <T extends TDriveMethod>(
data: TDriveRequest<T>,
) => {
if (!DriveContentsProcessor) {
throw new Error(
'File system calls over Atomics.wait is only supported with jupyterlite>=0.4.0a3',
);
}

if (this._contentsProcessor === undefined) {
this._contentsProcessor = new DriveContentsProcessor({
contentsManager: this._contentsManager,
});
}

return await this._contentsProcessor.processDriveRequest(data);
};
}

/**
Expand All @@ -63,13 +43,48 @@ export class PyodideKernel extends BaseKernel implements IKernel {
* webpack to find it.
*/
protected initWorker(options: PyodideKernel.IOptions): Worker {
return new Worker(new URL('./coincident.worker.js', import.meta.url), {
type: 'module',
});
if (crossOriginIsolated) {
return new Worker(new URL('./coincident.worker.js', import.meta.url), {
type: 'module',
});
} else {
return new Worker(new URL('./comlink.worker.js', import.meta.url), {
type: 'module',
});
}
}

protected initRemote(options: PyodideKernel.IOptions): IPyodideWorkerKernel {
const remote = coincident(this._worker) as IPyodideWorkerKernel;
let remote: IPyodideWorkerKernel;
// Use coincident if crossOriginIsolated, comlink otherwise
if (crossOriginIsolated) {
remote = coincident(this._worker) as IPyodideWorkerKernel;
remote.processWorkerMessage = (msg: any) => {
this._processWorkerMessage(msg);
};

// The coincident worker uses its own filesystem API:
(remote.processDriveRequest as any) = async <T extends TDriveMethod>(
data: TDriveRequest<T>,
) => {
if (!DriveContentsProcessor) {
throw new Error(
'File system calls over Atomics.wait is only supported with jupyterlite>=0.4.0a3',
);
}

if (this._contentsProcessor === undefined) {
this._contentsProcessor = new DriveContentsProcessor({
contentsManager: this._contentsManager,
});
}

return await this._contentsProcessor.processDriveRequest(data);
};
} else {
remote = wrap(this._worker) as IPyodideWorkerKernel;
remote.registerCallback(proxy(this._processWorkerMessage.bind(this)));
}
const remoteOptions = this.initRemoteOptions(options);
remote.initialize(remoteOptions).then(this._ready.resolve.bind(this._ready));
return remote;
Expand Down Expand Up @@ -318,7 +333,9 @@ export class PyodideKernel extends BaseKernel implements IKernel {
private _contentsManager: Contents.IManager;
private _contentsProcessor: DriveContentsProcessor | undefined;
private _worker: Worker;
private _remoteKernel: IRemotePyodideWorkerKernel;
private _remoteKernel:
| IRemotePyodideWorkerKernel
| Remote<IRemotePyodideWorkerKernel>;
private _ready = new PromiseDelegate<void>();
}

Expand Down
11 changes: 11 additions & 0 deletions packages/pyodide-kernel/src/tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ export interface IPyodideWorkerKernel extends IWorkerKernel {
processDriveRequest<T extends TDriveMethod>(
data: TDriveRequest<T>,
): TDriveResponse<T>;

/**
* Process worker message
* @param msg
*/
processWorkerMessage(msg: any): void;

/**
* Register a callback for handling messages from the worker.
*/
registerCallback(callback: (msg: any) => void): void;
}

/**
Expand Down
32 changes: 24 additions & 8 deletions packages/pyodide-kernel/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ export class PyodideRemoteKernel {
return results;
}

/**
* Register the callback function to send messages back to the main thread.
* @param callback the callback to register
*/
registerCallback(callback: (msg: any) => void): void {
this._sendWorkerMessage = callback;
}

/**
* Makes sure pyodide is ready before continuing, and cache the parent message.
*/
Expand All @@ -207,7 +215,8 @@ export class PyodideRemoteKernel {
data: this.formatResult(data),
metadata: this.formatResult(metadata),
};
postMessage({

this._sendWorkerMessage({
parentHeader: this.formatResult(this._kernel._parent_header)['header'],
bundle,
type: 'execute_result',
Expand All @@ -220,7 +229,8 @@ export class PyodideRemoteKernel {
evalue: evalue,
traceback: traceback,
};
postMessage({

this._sendWorkerMessage({
parentHeader: this.formatResult(this._kernel._parent_header)['header'],
bundle,
type: 'execute_error',
Expand All @@ -231,7 +241,8 @@ export class PyodideRemoteKernel {
const bundle = {
wait: this.formatResult(wait),
};
postMessage({

this._sendWorkerMessage({
parentHeader: this.formatResult(this._kernel._parent_header)['header'],
bundle,
type: 'clear_output',
Expand All @@ -244,7 +255,8 @@ export class PyodideRemoteKernel {
metadata: this.formatResult(metadata),
transient: this.formatResult(transient),
};
postMessage({

this._sendWorkerMessage({
parentHeader: this.formatResult(this._kernel._parent_header)['header'],
bundle,
type: 'display_data',
Expand All @@ -261,7 +273,8 @@ export class PyodideRemoteKernel {
metadata: this.formatResult(metadata),
transient: this.formatResult(transient),
};
postMessage({

this._sendWorkerMessage({
parentHeader: this.formatResult(this._kernel._parent_header)['header'],
bundle,
type: 'update_display_data',
Expand All @@ -273,7 +286,8 @@ export class PyodideRemoteKernel {
name: this.formatResult(name),
text: this.formatResult(text),
};
postMessage({

this._sendWorkerMessage({
parentHeader: this.formatResult(this._kernel._parent_header)['header'],
bundle,
type: 'stream',
Expand Down Expand Up @@ -442,7 +456,8 @@ export class PyodideRemoteKernel {
prompt,
password,
};
postMessage({

this._sendWorkerMessage({
type: 'input_request',
parentHeader: this.formatResult(this._kernel._parent_header)['header'],
content,
Expand Down Expand Up @@ -479,7 +494,7 @@ export class PyodideRemoteKernel {
* @param buffers The binary buffers.
*/
async sendComm(type: string, content: any, metadata: any, ident: any, buffers: any) {
postMessage({
this._sendWorkerMessage({
type: type,
content: this.formatResult(content),
metadata: this.formatResult(metadata),
Expand Down Expand Up @@ -511,4 +526,5 @@ export class PyodideRemoteKernel {
protected _stderr_stream: any;
protected _resolveInputReply: any;
protected _driveFS: DriveFS | null = null;
protected _sendWorkerMessage: (msg: any) => void = () => {};
}
Loading
Loading