Skip to content

Commit

Permalink
Add buffered stdin to accept terminal input whilst running WASM commands
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 committed Jul 22, 2024
1 parent 1841226 commit f75c2b8
Show file tree
Hide file tree
Showing 8 changed files with 957 additions and 725 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ jupyter lite build
And serve it:

```bash
jupyter lite serve
jupyter lite serve --LiteBuildConfig.extra_http_headers=Cross-Origin-Embedder-Policy=require-corp --LiteBuildConfig.extra_http_headers=Cross-Origin-Opener-Policy=same-origin
```

Note the setting of `extra_http_headers` which are required to support `SharedArrayBuffer`.

### Packaging the extension

See [RELEASE](RELEASE.md)
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"@jupyterlab/services": "^7.2.0",
"@jupyterlab/terminal": "^4.2.0",
"@jupyterlab/terminal-extension": "^4.2.0",
"@jupyterlite/cockle": "^0.0.4",
"@jupyterlite/cockle": "^0.0.5",
"@jupyterlite/contents": "^0.3.0 || ^0.4.0-beta.0",
"@jupyterlite/server": "^0.3.0 || ^0.4.0-beta.0",
"@lumino/coreutils": "^2.1.2",
Expand Down
202 changes: 202 additions & 0 deletions src/buffered_stdin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/**
* Classes to deal with buffered stdin. Both main and webworkers have access to the same
* SharedArrayBuffer and use that to pass stdin characters from the UI (main worker) to the shell
* (webworker). This is necessary when the shell is running a WASM command that is synchronous and
* blocking, as the usual async message passing from main to webworker does not work as the received
* messages would only be processed when the command has finished.
*/

// Indexes into SharedArrayBuffer.
const MAIN = 0;
const WORKER = 1;
const LENGTH = 2;
const START_CHAR = 3;

abstract class BufferedStdin {
constructor(sharedArrayBuffer?: SharedArrayBuffer) {
if (sharedArrayBuffer === undefined) {
const length = (this._maxChars + 3) * Int32Array.BYTES_PER_ELEMENT;
this._sharedArrayBuffer = new SharedArrayBuffer(length);
} else {
this._sharedArrayBuffer = sharedArrayBuffer;
}

this._intArray = new Int32Array(this._sharedArrayBuffer);
if (sharedArrayBuffer === undefined) {
this._intArray[MAIN] = 0;
this._intArray[WORKER] = 0;
}
}

async disable(): Promise<void> {
this._enabled = false;
this._clear();
}

async enable(): Promise<void> {
this._enabled = true;
}

get enabled(): boolean {
return this._enabled;
}

protected _clear() {
this._intArray[MAIN] = 0;
this._intArray[WORKER] = 0;
this._readCount = 0;
}

/**
* Load the character from the shared array buffer and return it.
*/
protected _loadFromSharedArrayBuffer(): number[] {
const len = Atomics.load(this._intArray, LENGTH);
const ret: number[] = [];
for (let i = 0; i < len; i++) {
ret.push(Atomics.load(this._intArray, START_CHAR + i));
}
return ret;
}

protected _enabled: boolean = false;
protected _maxChars: number = 8; // Max number of actual characters in a token.
protected _sharedArrayBuffer: SharedArrayBuffer;
protected _intArray: Int32Array;
protected _readCount: number = 0;
}

export namespace MainBufferedStdin {
export interface ISendStdinNow {
(output: string): Promise<void>;
}
}

/**
* Main worker buffers characters locally, and stores just one character at a time in the
* SharedArrayBuffer so that the web worker can read it.
*/
export class MainBufferedStdin extends BufferedStdin {
constructor() {
super();
}

override async disable(): Promise<void> {
// Send all remaining buffered characters as soon as possible via the supplied sendFunction.
this._disabling = true;
if (this._storedCount !== this._readCount) {
const codes = this._loadFromSharedArrayBuffer();
let text = '';
for (const code of codes) {
text += String.fromCharCode(code);
}
await this._sendStdinNow!(text);
}
while (this._buffer.length > 0) {
await this._sendStdinNow!(this._buffer.shift()!);
}
this._disabling = false;

super.disable();
}

get sharedArrayBuffer(): SharedArrayBuffer {
return this._sharedArrayBuffer;
}

/**
* Push a character to the buffer.
* It may or may not be stored in the SharedArrayBuffer immediately.
*/
async push(char: string) {
// May be multiple characters if ANSI control sequence.
this._buffer.push(char);
this._bufferCount++;

if (char.length > this._maxChars) {
// Too big, log this and do not pass it on?
console.log(`String '${char}' is too long to buffer`);
}

if (!this._disabling && this._readCount === this._storedCount) {
this._storeInSharedArrayBuffer();
}
}

registerSendStdinNow(sendStdinNow: MainBufferedStdin.ISendStdinNow) {
this._sendStdinNow = sendStdinNow;
}

/**
* After a successful read by the worker, main checks if another character can be stored in the
* SharedArrayBuffer.
*/
private _afterRead() {
this._readCount = Atomics.load(this._intArray, 1);
if (this._readCount !== this._storedCount) {
throw new Error('Should not happen');
}

if (this._bufferCount > this._storedCount) {
this._storeInSharedArrayBuffer();
}
}

protected override _clear() {
super._clear();
this._buffer = [];
this._bufferCount = 0;
this._storedCount = 0;
}

private _storeInSharedArrayBuffer() {
const char: string = this._buffer.shift()!;
this._storedCount++;

// Store character in SharedArrayBuffer.
const len = char.length;
Atomics.store(this._intArray, LENGTH, len);
for (let i = 0; i < len; i++) {
Atomics.store(this._intArray, START_CHAR + i, char.charCodeAt(i));
}

// Notify web worker that a new character is available.
Atomics.store(this._intArray, MAIN, this._storedCount);
Atomics.notify(this._intArray, MAIN, 1);

// Async wait for web worker to read this character.
const { async, value } = Atomics.waitAsync(
this._intArray,
WORKER,
this._readCount
);
if (async) {
value.then(() => this._afterRead());
}
}

private _buffer: string[] = [];
private _bufferCount: number = 0;
private _disabling: boolean = false;
private _storedCount: number = 0;
private _sendStdinNow?: MainBufferedStdin.ISendStdinNow;
}

export class WorkerBufferedStdin extends BufferedStdin {
constructor(sharedArrayBuffer: SharedArrayBuffer) {
super(sharedArrayBuffer);
}

get(): number[] {
// Wait for main worker to store a new character.
Atomics.wait(this._intArray, MAIN, this._readCount);
const ret = this._loadFromSharedArrayBuffer();
this._readCount++;

// Notify main worker that character has been read and a new one can be stored.
Atomics.store(this._intArray, WORKER, this._readCount);
Atomics.notify(this._intArray, WORKER, 1);

return ret;
}
}
28 changes: 25 additions & 3 deletions src/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import {
Client as WebSocketClient
} from 'mock-socket';

import { MainBufferedStdin } from './buffered_stdin';
import { ITerminal, IRemoteWorkerTerminal } from './tokens';

export class Terminal implements ITerminal {
/**
* Construct a new Terminal.
*/
constructor(readonly options: ITerminal.IOptions) {
this._bufferedStdin = new MainBufferedStdin();
this._initWorker();
}

Expand All @@ -27,8 +29,22 @@ export class Terminal implements ITerminal {

this._remote = wrap(this._worker);
const { baseUrl } = this.options;
await this._remote.initialize({ baseUrl });
this._remote.registerCallbacks(proxy(this._outputCallback.bind(this)));
const { sharedArrayBuffer } = this._bufferedStdin;
await this._remote.initialize({ baseUrl, sharedArrayBuffer });
this._remote.registerCallbacks(
proxy(this._outputCallback.bind(this)),
proxy(this._enableBufferedStdinCallback.bind(this))
);

this._bufferedStdin.registerSendStdinNow(this._remote.input);
}

private async _enableBufferedStdinCallback(enable: boolean) {
if (enable) {
await this._bufferedStdin.enable();
} else {
await this._bufferedStdin.disable();
}
}

private async _outputCallback(text: string): Promise<void> {
Expand Down Expand Up @@ -61,7 +77,12 @@ export class Terminal implements ITerminal {
const content = data.slice(1);

if (message_type === 'stdin') {
await this._remote!.input(content[0] as string);
const text = content[0] as string;
if (this._bufferedStdin.enabled) {
await this._bufferedStdin.push(text);
} else {
await this._remote!.input(text);
}
} else if (message_type === 'set_size') {
const rows = content[0] as number;
const columns = content[1] as number;
Expand Down Expand Up @@ -89,4 +110,5 @@ export class Terminal implements ITerminal {
private _worker?: Worker;
private _remote?: IRemoteWorkerTerminal;
private _socket?: WebSocketClient;
private _bufferedStdin: MainBufferedStdin;
}
13 changes: 11 additions & 2 deletions src/tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

import { TerminalAPI } from '@jupyterlab/services';

import { IOutputCallback } from '@jupyterlite/cockle';
import {
IEnableBufferedStdinCallback,
IOutputCallback
} from '@jupyterlite/cockle';

import { Token } from '@lumino/coreutils';

Expand Down Expand Up @@ -70,11 +73,14 @@ export namespace IWorkerTerminal {
*/
export interface IOptions {
baseUrl: string;
sharedArrayBuffer: SharedArrayBuffer;
}
}

export namespace IRemote {
export type OutputCallback = IOutputCallback & ProxyMarked;
export type EnableBufferedStdinCallback = IEnableBufferedStdinCallback &
ProxyMarked;
}

export interface IRemote extends IWorkerTerminal {
Expand All @@ -83,7 +89,10 @@ export interface IRemote extends IWorkerTerminal {
*/
initialize(options: IWorkerTerminal.IOptions): Promise<void>;

registerCallbacks(outputCallback: IRemote.OutputCallback): void;
registerCallbacks(
outputCallback: IRemote.OutputCallback,
enableBufferedStdinCallback: IRemote.EnableBufferedStdinCallback
): void;
}

/**
Expand Down
Loading

0 comments on commit f75c2b8

Please sign in to comment.