From b91d0c3493b1b408ef29b86e79e30c3b4768a366 Mon Sep 17 00:00:00 2001 From: Gustavo Henke Date: Sun, 18 Aug 2024 20:56:52 +1000 Subject: [PATCH] Inter-Process Communication (#498) --- README.md | 26 +++++- src/command.spec.ts | 165 ++++++++++++++++++++++++++++++++++- src/command.ts | 103 ++++++++++++++++++++-- src/concurrently.spec.ts | 12 +-- src/concurrently.ts | 2 + src/fixtures/fake-command.ts | 1 + src/spawn.spec.ts | 14 ++- src/spawn.ts | 45 +++++++--- 8 files changed, 340 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 7772a491..86e20270 100644 --- a/README.md +++ b/README.md @@ -318,7 +318,7 @@ For more details, visit https://github.com/open-cli-tools/concurrently ### `concurrently(commands[, options])` - `commands`: an array of either strings (containing the commands to run) or objects - with the shape `{ command, name, prefixColor, env, cwd }`. + with the shape `{ command, name, prefixColor, env, cwd, ipc }`. - `options` (optional): an object containing any of the below: - `cwd`: the working directory to be used by all commands. Can be overriden per command. @@ -405,11 +405,33 @@ It has the following properties: - `stderr`: an RxJS observable to the command's `stderr`. - `error`: an RxJS observable to the command's error events (e.g. when it fails to spawn). - `timer`: an RxJS observable to the command's timing events (e.g. starting, stopping). +- `messages`: an object with the following properties: + + - `incoming`: an RxJS observable for the IPC messages received from the underlying process. + - `outgoing`: an RxJS observable for the IPC messages sent to the underlying process. + + Both observables emit [`MessageEvent`](#messageevent)s.
+ Note that if the command wasn't spawned with IPC support, these won't emit any values. + - `close`: an RxJS observable to the command's close events. See [`CloseEvent`](#CloseEvent) for more information. -- `start()`: starts the command, setting up all +- `start()`: starts the command and sets up all of the above streams +- `send(message[, handle, options])`: sends a message to the underlying process via IPC channels, + returning a promise that resolves once the message has been sent. + See [Node.js docs](https://nodejs.org/docs/latest/api/child_process.html#subprocesssendmessage-sendhandle-options-callback). - `kill([signal])`: kills the command, optionally specifying a signal (e.g. `SIGTERM`, `SIGKILL`, etc). +### `MessageEvent` + +An object that represents a message that was received from/sent to the underlying command process.
+It has the following properties: + +- `message`: the message itself. +- `handle`: a [`net.Socket`](https://nodejs.org/docs/latest/api/net.html#class-netsocket), + [`net.Server`](https://nodejs.org/docs/latest/api/net.html#class-netserver) or + [`dgram.Socket`](https://nodejs.org/docs/latest/api/dgram.html#class-dgramsocket), + if one was sent, or `undefined`. + ### `CloseEvent` An object with information about a command's closing event.
diff --git a/src/command.spec.ts b/src/command.spec.ts index a3c14c30..509beffe 100644 --- a/src/command.spec.ts +++ b/src/command.spec.ts @@ -1,5 +1,5 @@ import { autoUnsubscribe, subscribeSpyTo } from '@hirez_io/observer-spy'; -import { SpawnOptions } from 'child_process'; +import { SendHandle, SpawnOptions } from 'child_process'; import { EventEmitter } from 'events'; import * as Rx from 'rxjs'; import { Readable, Writable } from 'stream'; @@ -16,14 +16,19 @@ import { type CommandValues = { error: unknown; close: CloseEvent; timer: unknown[] }; let process: ChildProcess; +let sendMessage: jest.Mock; let spawn: jest.Mocked; let killProcess: KillProcess; +const IPC_FD = 3; + autoUnsubscribe(); beforeEach(() => { + sendMessage = jest.fn(); process = new (class extends EventEmitter { readonly pid = 1; + send = sendMessage; readonly stdout = new Readable({ read() { // do nothing @@ -248,6 +253,164 @@ describe('#start()', () => { expect((await stderr).toString()).toBe('dang'); }); + + describe('on incoming messages', () => { + it('does not share to the incoming messages stream, if IPC is disabled', () => { + const { command } = createCommand(); + const spy = subscribeSpyTo(command.messages.incoming); + command.start(); + + process.emit('message', {}); + expect(spy.getValuesLength()).toBe(0); + }); + + it('shares to the incoming messages stream, if IPC is enabled', () => { + const { command } = createCommand({ ipc: IPC_FD }); + const spy = subscribeSpyTo(command.messages.incoming); + command.start(); + + const message1 = {}; + process.emit('message', message1, undefined); + + const message2 = {}; + const handle = {} as SendHandle; + process.emit('message', message2, handle); + + expect(spy.getValuesLength()).toBe(2); + expect(spy.getValueAt(0)).toEqual({ message: message1, handle: undefined }); + expect(spy.getValueAt(1)).toEqual({ message: message2, handle }); + }); + }); + + describe('on outgoing messages', () => { + it('calls onSent with an error if the process does not have IPC enabled', () => { + const { command } = createCommand({ ipc: IPC_FD }); + command.start(); + + Object.assign(process, { + // The TS types don't assume `send` can be undefined, + // despite the Node docs saying so + send: undefined, + }); + + const onSent = jest.fn(); + command.messages.outgoing.next({ message: {}, onSent }); + expect(onSent).toHaveBeenCalledWith(expect.any(Error)); + }); + + it('sends the message to the process', () => { + const { command } = createCommand({ ipc: IPC_FD }); + command.start(); + + const message1 = {}; + command.messages.outgoing.next({ message: message1, onSent() {} }); + + const message2 = {}; + const handle = {} as SendHandle; + command.messages.outgoing.next({ message: message2, handle, onSent() {} }); + + const message3 = {}; + const options = {}; + command.messages.outgoing.next({ message: message3, options, onSent() {} }); + + expect(process.send).toHaveBeenCalledTimes(3); + expect(process.send).toHaveBeenNthCalledWith( + 1, + message1, + undefined, + undefined, + expect.any(Function), + ); + expect(process.send).toHaveBeenNthCalledWith( + 2, + message2, + handle, + undefined, + expect.any(Function), + ); + expect(process.send).toHaveBeenNthCalledWith( + 3, + message3, + undefined, + options, + expect.any(Function), + ); + }); + + it('sends the message to the process, if it starts late', () => { + const { command } = createCommand({ ipc: IPC_FD }); + command.messages.outgoing.next({ message: {}, onSent() {} }); + expect(process.send).not.toHaveBeenCalled(); + + command.start(); + expect(process.send).toHaveBeenCalled(); + }); + + it('calls onSent with the result of sending the message', () => { + const { command } = createCommand({ ipc: IPC_FD }); + command.start(); + + const onSent = jest.fn(); + command.messages.outgoing.next({ message: {}, onSent }); + expect(onSent).not.toHaveBeenCalled(); + + sendMessage.mock.calls[0][3](); + expect(onSent).toHaveBeenCalledWith(undefined); + + const error = new Error(); + sendMessage.mock.calls[0][3](error); + expect(onSent).toHaveBeenCalledWith(error); + }); + }); +}); + +describe('#send()', () => { + it('throws if IPC is not set up', () => { + const { command } = createCommand(); + const fn = () => command.send({}); + expect(fn).toThrow(); + }); + + it('pushes the message on the outgoing messages stream', () => { + const { command } = createCommand({ ipc: IPC_FD }); + const spy = subscribeSpyTo(command.messages.outgoing); + + const message1 = { foo: true }; + command.send(message1); + + const message2 = { bar: 123 }; + const handle = {} as SendHandle; + command.send(message2, handle); + + const message3 = { baz: 'yes' }; + const options = {}; + command.send(message3, undefined, options); + + expect(spy.getValuesLength()).toBe(3); + expect(spy.getValueAt(0)).toMatchObject({ + message: message1, + handle: undefined, + options: undefined, + }); + expect(spy.getValueAt(1)).toMatchObject({ message: message2, handle, options: undefined }); + expect(spy.getValueAt(2)).toMatchObject({ message: message3, handle: undefined, options }); + }); + + it('resolves when onSent callback is called with no arguments', async () => { + const { command } = createCommand({ ipc: IPC_FD }); + const spy = subscribeSpyTo(command.messages.outgoing); + const promise = command.send({}); + spy.getFirstValue().onSent(); + await expect(promise).resolves.toBeUndefined(); + }); + + it('rejects when onSent callback is called with an argument', async () => { + const { command } = createCommand({ ipc: IPC_FD }); + const spy = subscribeSpyTo(command.messages.outgoing); + const promise = command.send({}); + spy.getFirstValue().onSent('foo'); + await expect(promise).rejects.toBe('foo'); + }); }); describe('#kill()', () => { diff --git a/src/command.ts b/src/command.ts index ef9a34f9..c700e467 100644 --- a/src/command.ts +++ b/src/command.ts @@ -1,4 +1,9 @@ -import { ChildProcess as BaseChildProcess, SpawnOptions } from 'child_process'; +import { + ChildProcess as BaseChildProcess, + MessageOptions, + SendHandle, + SpawnOptions, +} from 'child_process'; import * as Rx from 'rxjs'; import { EventEmitter, Writable } from 'stream'; @@ -33,6 +38,14 @@ export interface CommandInfo { */ prefixColor?: string; + /** + * Whether sending of messages to/from this command (also known as "inter-process communication") + * should be enabled, and using which file descriptor number. + * + * If set, must be > 2. + */ + ipc?: number; + /** * Output command in raw format. */ @@ -69,11 +82,21 @@ export interface TimerEvent { endDate?: Date; } +export interface MessageEvent { + message: object; + handle?: SendHandle; +} + +interface OutgoingMessageEvent extends MessageEvent { + options?: MessageOptions; + onSent(error?: unknown): void; +} + /** * Subtype of NodeJS's child_process including only what's actually needed for a command to work. */ export type ChildProcess = EventEmitter & - Pick; + Pick; /** * Interface for a function that must kill the process with `pid`, optionally sending `signal` to it. @@ -116,13 +139,23 @@ export class Command implements CommandInfo { /** @inheritdoc */ readonly cwd?: string; + /** @inheritdoc */ + readonly ipc?: number; + readonly close = new Rx.Subject(); readonly error = new Rx.Subject(); readonly stdout = new Rx.Subject(); readonly stderr = new Rx.Subject(); readonly timer = new Rx.Subject(); + readonly messages = { + incoming: new Rx.Subject(), + outgoing: new Rx.ReplaySubject(), + }; process?: ChildProcess; + + // TODO: Should exit/error/stdio subscriptions be added here? + private subscriptions: readonly Rx.Subscription[] = []; stdin?: Writable; pid?: number; killed = false; @@ -131,7 +164,7 @@ export class Command implements CommandInfo { state: CommandState = 'stopped'; constructor( - { index, name, command, prefixColor, env, cwd }: CommandInfo & { index: number }, + { index, name, command, prefixColor, env, cwd, ipc }: CommandInfo & { index: number }, spawnOpts: SpawnOptions, spawn: SpawnCommand, killProcess: KillProcess, @@ -142,6 +175,7 @@ export class Command implements CommandInfo { this.prefixColor = prefixColor; this.env = env || {}; this.cwd = cwd; + this.ipc = ipc; this.killProcess = killProcess; this.spawn = spawn; this.spawnOpts = spawnOpts; @@ -159,8 +193,9 @@ export class Command implements CommandInfo { const highResStartTime = process.hrtime(); this.timer.next({ startDate }); + this.subscriptions = [...this.maybeSetupIPC(child)]; Rx.fromEvent(child, 'error').subscribe((event) => { - this.process = undefined; + this.cleanUp(); const endDate = new Date(Date.now()); this.timer.next({ startDate, endDate }); this.error.next(event); @@ -169,7 +204,7 @@ export class Command implements CommandInfo { Rx.fromEvent(child, 'close') .pipe(Rx.map((event) => event as [number | null, NodeJS.Signals | null])) .subscribe(([exitCode, signal]) => { - this.process = undefined; + this.cleanUp(); // Don't override error event if (this.state !== 'errored') { @@ -204,6 +239,56 @@ export class Command implements CommandInfo { this.stdin = child.stdin || undefined; } + private maybeSetupIPC(child: ChildProcess) { + if (!this.ipc) { + return []; + } + + return [ + pipeTo( + Rx.fromEvent(child, 'message').pipe( + Rx.map((event) => { + const [message, handle] = event as [object, SendHandle | undefined]; + return { message, handle }; + }), + ), + this.messages.incoming, + ), + this.messages.outgoing.subscribe((message) => { + if (!child.send) { + return message.onSent(new Error('Command does not have an IPC channel')); + } + + child.send(message.message, message.handle, message.options, (error) => { + message.onSent(error); + }); + }), + ]; + } + + /** + * Sends a message to the underlying process once it starts. + * + * @throws If the command doesn't have an IPC channel enabled + * @returns Promise that resolves when the message is sent, + * or rejects if it fails to deliver the message. + */ + send(message: object, handle?: SendHandle, options?: MessageOptions): Promise { + if (this.ipc == null) { + throw new Error('Command IPC is disabled'); + } + return new Promise((resolve, reject) => { + this.messages.outgoing.next({ + message, + handle, + options, + onSent(error) { + error ? reject(error) : resolve(); + }, + }); + }); + } + /** * Kills this command, optionally specifying a signal to send to it. */ @@ -214,6 +299,12 @@ export class Command implements CommandInfo { } } + private cleanUp() { + this.subscriptions?.forEach((sub) => sub.unsubscribe()); + this.messages.outgoing = new Rx.ReplaySubject(); + this.process = undefined; + } + /** * Detects whether a command can be killed. * @@ -228,5 +319,5 @@ export class Command implements CommandInfo { * Pipes all events emitted by `stream` into `subject`. */ function pipeTo(stream: Rx.Observable, subject: Rx.Subject) { - stream.subscribe((event) => subject.next(event)); + return stream.subscribe((event) => subject.next(event)); } diff --git a/src/concurrently.spec.ts b/src/concurrently.spec.ts index 1a92317d..9d6aa742 100644 --- a/src/concurrently.spec.ts +++ b/src/concurrently.spec.ts @@ -272,13 +272,13 @@ it('uses raw from options for each command', () => { expect(spawn).toHaveBeenCalledWith( 'echo', expect.objectContaining({ - stdio: 'inherit', + stdio: ['inherit', 'inherit', 'inherit'], }), ); expect(spawn).toHaveBeenCalledWith( 'kill', expect.objectContaining({ - stdio: 'inherit', + stdio: ['inherit', 'inherit', 'inherit'], }), ); }); @@ -292,13 +292,13 @@ it('uses overridden raw option for each command if specified', () => { expect(spawn).toHaveBeenCalledWith( 'echo', expect.objectContaining({ - stdio: 'pipe', + stdio: ['pipe', 'pipe', 'pipe'], }), ); expect(spawn).toHaveBeenCalledWith( 'echo', expect.objectContaining({ - stdio: 'inherit', + stdio: ['inherit', 'inherit', 'inherit'], }), ); }); @@ -312,7 +312,7 @@ it('uses hide from options for each command', () => { expect(spawn).toHaveBeenCalledWith( 'echo', expect.objectContaining({ - stdio: 'pipe', + stdio: ['pipe', 'pipe', 'pipe'], }), ); expect(spawn).toHaveBeenCalledWith( @@ -333,7 +333,7 @@ it('hides output for commands even if raw option is on', () => { expect(spawn).toHaveBeenCalledWith( 'echo', expect.objectContaining({ - stdio: 'inherit', + stdio: ['inherit', 'inherit', 'inherit'], }), ); expect(spawn).toHaveBeenCalledWith( diff --git a/src/concurrently.ts b/src/concurrently.ts index b9390738..9b5ca497 100644 --- a/src/concurrently.ts +++ b/src/concurrently.ts @@ -194,6 +194,7 @@ export function concurrently( ...command, }, getSpawnOpts({ + ipc: command.ipc, stdio: hidden ? 'hidden' : command.raw ?? options.raw ? 'raw' : 'normal', env: command.env, cwd: command.cwd || options.cwd, @@ -262,6 +263,7 @@ function mapToCommandInfo(command: ConcurrentlyCommandInput): CommandInfo { name: command.name || '', env: command.env || {}, cwd: command.cwd || '', + ipc: command.ipc, ...(command.prefixColor ? { prefixColor: command.prefixColor, diff --git a/src/fixtures/fake-command.ts b/src/fixtures/fake-command.ts index 6d0107fe..95ca3d4f 100644 --- a/src/fixtures/fake-command.ts +++ b/src/fixtures/fake-command.ts @@ -30,6 +30,7 @@ export class FakeCommand extends Command { export const createFakeProcess = (pid: number): ChildProcess => Object.assign(new EventEmitter(), { pid, + send: jest.fn(), stdin: new PassThrough(), stdout: new PassThrough(), stderr: new PassThrough(), diff --git a/src/spawn.spec.ts b/src/spawn.spec.ts index 4d826b81..a4b06fef 100644 --- a/src/spawn.spec.ts +++ b/src/spawn.spec.ts @@ -28,17 +28,27 @@ describe('getSpawnOpts()', () => { }); it('sets stdio to pipe when stdio mode is normal', () => { - expect(getSpawnOpts({ stdio: 'normal' }).stdio).toBe('pipe'); + expect(getSpawnOpts({ stdio: 'normal' }).stdio).toEqual(['pipe', 'pipe', 'pipe']); }); it('sets stdio to inherit when stdio mode is raw', () => { - expect(getSpawnOpts({ stdio: 'raw' }).stdio).toBe('inherit'); + expect(getSpawnOpts({ stdio: 'raw' }).stdio).toEqual(['inherit', 'inherit', 'inherit']); }); it('sets stdio to ignore stdout + stderr when stdio mode is hidden', () => { expect(getSpawnOpts({ stdio: 'hidden' }).stdio).toEqual(['ignore', 'ignore', 'pipe']); }); + it('sets an ipc channel at the specified descriptor index', () => { + const opts = getSpawnOpts({ ipc: 3 }); + expect(opts.stdio?.[3]).toBe('ipc'); + }); + + it('throws if the ipc channel is <= 2', () => { + const fn = () => getSpawnOpts({ ipc: 0 }); + expect(fn).toThrow(); + }); + it('merges FORCE_COLOR into env vars if color supported', () => { const process = { ...baseProcess, env: { foo: 'bar' } }; expect(getSpawnOpts({ process, colorSupport: false }).env).toEqual(process.env); diff --git a/src/spawn.ts b/src/spawn.ts index 4f38e433..61645ef7 100644 --- a/src/spawn.ts +++ b/src/spawn.ts @@ -1,4 +1,5 @@ -import { ChildProcess, spawn as baseSpawn, SpawnOptions } from 'child_process'; +import assert from 'assert'; +import { ChildProcess, IOType, spawn as baseSpawn, SpawnOptions } from 'child_process'; import supportsColor from 'supports-color'; /** @@ -26,6 +27,7 @@ export const getSpawnOpts = ({ colorSupport = supportsColor.stdout, cwd, process = global.process, + ipc, stdio = 'normal', env = {}, }: { @@ -48,6 +50,12 @@ export const getSpawnOpts = ({ */ cwd?: string; + /** + * The file descriptor number at which the channel for inter-process communication + * should be set up. + */ + ipc?: number; + /** * Which stdio mode to use. Raw implies inheriting the parent process' stdio. * @@ -63,13 +71,28 @@ export const getSpawnOpts = ({ * Map of custom environment variables to include in the spawn options. */ env?: Record; -}): SpawnOptions => ({ - cwd: cwd || process.cwd(), - stdio: stdio === 'normal' ? 'pipe' : stdio === 'raw' ? 'inherit' : ['ignore', 'ignore', 'pipe'], - ...(/^win/.test(process.platform) && { detached: false }), - env: { - ...(colorSupport ? { FORCE_COLOR: colorSupport.level.toString() } : {}), - ...process.env, - ...env, - }, -}); +}): SpawnOptions => { + const stdioValues: (IOType | 'ipc')[] = + stdio === 'normal' + ? ['pipe', 'pipe', 'pipe'] + : stdio === 'raw' + ? ['inherit', 'inherit', 'inherit'] + : ['ignore', 'ignore', 'pipe']; + + if (ipc != null) { + // Avoid overriding the stdout/stderr/stdin + assert.ok(ipc > 2, '[concurrently] the IPC channel number should be > 2'); + stdioValues[ipc] = 'ipc'; + } + + return { + cwd: cwd || process.cwd(), + stdio: stdioValues, + ...(/^win/.test(process.platform) && { detached: false }), + env: { + ...(colorSupport ? { FORCE_COLOR: colorSupport.level.toString() } : {}), + ...process.env, + ...env, + }, + }; +};