diff --git a/README.md b/README.md
index 7772a491..86e20270 100644
--- a/README.md
+++ b/README.md
@@ -318,7 +318,7 @@ For more details, visit https://github.com/open-cli-tools/concurrently
### `concurrently(commands[, options])`
- `commands`: an array of either strings (containing the commands to run) or objects
- with the shape `{ command, name, prefixColor, env, cwd }`.
+ with the shape `{ command, name, prefixColor, env, cwd, ipc }`.
- `options` (optional): an object containing any of the below:
- `cwd`: the working directory to be used by all commands. Can be overriden per command.
@@ -405,11 +405,33 @@ It has the following properties:
- `stderr`: an RxJS observable to the command's `stderr`.
- `error`: an RxJS observable to the command's error events (e.g. when it fails to spawn).
- `timer`: an RxJS observable to the command's timing events (e.g. starting, stopping).
+- `messages`: an object with the following properties:
+
+ - `incoming`: an RxJS observable for the IPC messages received from the underlying process.
+ - `outgoing`: an RxJS observable for the IPC messages sent to the underlying process.
+
+ Both observables emit [`MessageEvent`](#messageevent)s.
+ Note that if the command wasn't spawned with IPC support, these won't emit any values.
+
- `close`: an RxJS observable to the command's close events.
See [`CloseEvent`](#CloseEvent) for more information.
-- `start()`: starts the command, setting up all
+- `start()`: starts the command and sets up all of the above streams
+- `send(message[, handle, options])`: sends a message to the underlying process via IPC channels,
+ returning a promise that resolves once the message has been sent.
+ See [Node.js docs](https://nodejs.org/docs/latest/api/child_process.html#subprocesssendmessage-sendhandle-options-callback).
- `kill([signal])`: kills the command, optionally specifying a signal (e.g. `SIGTERM`, `SIGKILL`, etc).
+### `MessageEvent`
+
+An object that represents a message that was received from/sent to the underlying command process.
+It has the following properties:
+
+- `message`: the message itself.
+- `handle`: a [`net.Socket`](https://nodejs.org/docs/latest/api/net.html#class-netsocket),
+ [`net.Server`](https://nodejs.org/docs/latest/api/net.html#class-netserver) or
+ [`dgram.Socket`](https://nodejs.org/docs/latest/api/dgram.html#class-dgramsocket),
+ if one was sent, or `undefined`.
+
### `CloseEvent`
An object with information about a command's closing event.
diff --git a/src/command.spec.ts b/src/command.spec.ts
index a3c14c30..509beffe 100644
--- a/src/command.spec.ts
+++ b/src/command.spec.ts
@@ -1,5 +1,5 @@
import { autoUnsubscribe, subscribeSpyTo } from '@hirez_io/observer-spy';
-import { SpawnOptions } from 'child_process';
+import { SendHandle, SpawnOptions } from 'child_process';
import { EventEmitter } from 'events';
import * as Rx from 'rxjs';
import { Readable, Writable } from 'stream';
@@ -16,14 +16,19 @@ import {
type CommandValues = { error: unknown; close: CloseEvent; timer: unknown[] };
let process: ChildProcess;
+let sendMessage: jest.Mock;
let spawn: jest.Mocked;
let killProcess: KillProcess;
+const IPC_FD = 3;
+
autoUnsubscribe();
beforeEach(() => {
+ sendMessage = jest.fn();
process = new (class extends EventEmitter {
readonly pid = 1;
+ send = sendMessage;
readonly stdout = new Readable({
read() {
// do nothing
@@ -248,6 +253,164 @@ describe('#start()', () => {
expect((await stderr).toString()).toBe('dang');
});
+
+ describe('on incoming messages', () => {
+ it('does not share to the incoming messages stream, if IPC is disabled', () => {
+ const { command } = createCommand();
+ const spy = subscribeSpyTo(command.messages.incoming);
+ command.start();
+
+ process.emit('message', {});
+ expect(spy.getValuesLength()).toBe(0);
+ });
+
+ it('shares to the incoming messages stream, if IPC is enabled', () => {
+ const { command } = createCommand({ ipc: IPC_FD });
+ const spy = subscribeSpyTo(command.messages.incoming);
+ command.start();
+
+ const message1 = {};
+ process.emit('message', message1, undefined);
+
+ const message2 = {};
+ const handle = {} as SendHandle;
+ process.emit('message', message2, handle);
+
+ expect(spy.getValuesLength()).toBe(2);
+ expect(spy.getValueAt(0)).toEqual({ message: message1, handle: undefined });
+ expect(spy.getValueAt(1)).toEqual({ message: message2, handle });
+ });
+ });
+
+ describe('on outgoing messages', () => {
+ it('calls onSent with an error if the process does not have IPC enabled', () => {
+ const { command } = createCommand({ ipc: IPC_FD });
+ command.start();
+
+ Object.assign(process, {
+ // The TS types don't assume `send` can be undefined,
+ // despite the Node docs saying so
+ send: undefined,
+ });
+
+ const onSent = jest.fn();
+ command.messages.outgoing.next({ message: {}, onSent });
+ expect(onSent).toHaveBeenCalledWith(expect.any(Error));
+ });
+
+ it('sends the message to the process', () => {
+ const { command } = createCommand({ ipc: IPC_FD });
+ command.start();
+
+ const message1 = {};
+ command.messages.outgoing.next({ message: message1, onSent() {} });
+
+ const message2 = {};
+ const handle = {} as SendHandle;
+ command.messages.outgoing.next({ message: message2, handle, onSent() {} });
+
+ const message3 = {};
+ const options = {};
+ command.messages.outgoing.next({ message: message3, options, onSent() {} });
+
+ expect(process.send).toHaveBeenCalledTimes(3);
+ expect(process.send).toHaveBeenNthCalledWith(
+ 1,
+ message1,
+ undefined,
+ undefined,
+ expect.any(Function),
+ );
+ expect(process.send).toHaveBeenNthCalledWith(
+ 2,
+ message2,
+ handle,
+ undefined,
+ expect.any(Function),
+ );
+ expect(process.send).toHaveBeenNthCalledWith(
+ 3,
+ message3,
+ undefined,
+ options,
+ expect.any(Function),
+ );
+ });
+
+ it('sends the message to the process, if it starts late', () => {
+ const { command } = createCommand({ ipc: IPC_FD });
+ command.messages.outgoing.next({ message: {}, onSent() {} });
+ expect(process.send).not.toHaveBeenCalled();
+
+ command.start();
+ expect(process.send).toHaveBeenCalled();
+ });
+
+ it('calls onSent with the result of sending the message', () => {
+ const { command } = createCommand({ ipc: IPC_FD });
+ command.start();
+
+ const onSent = jest.fn();
+ command.messages.outgoing.next({ message: {}, onSent });
+ expect(onSent).not.toHaveBeenCalled();
+
+ sendMessage.mock.calls[0][3]();
+ expect(onSent).toHaveBeenCalledWith(undefined);
+
+ const error = new Error();
+ sendMessage.mock.calls[0][3](error);
+ expect(onSent).toHaveBeenCalledWith(error);
+ });
+ });
+});
+
+describe('#send()', () => {
+ it('throws if IPC is not set up', () => {
+ const { command } = createCommand();
+ const fn = () => command.send({});
+ expect(fn).toThrow();
+ });
+
+ it('pushes the message on the outgoing messages stream', () => {
+ const { command } = createCommand({ ipc: IPC_FD });
+ const spy = subscribeSpyTo(command.messages.outgoing);
+
+ const message1 = { foo: true };
+ command.send(message1);
+
+ const message2 = { bar: 123 };
+ const handle = {} as SendHandle;
+ command.send(message2, handle);
+
+ const message3 = { baz: 'yes' };
+ const options = {};
+ command.send(message3, undefined, options);
+
+ expect(spy.getValuesLength()).toBe(3);
+ expect(spy.getValueAt(0)).toMatchObject({
+ message: message1,
+ handle: undefined,
+ options: undefined,
+ });
+ expect(spy.getValueAt(1)).toMatchObject({ message: message2, handle, options: undefined });
+ expect(spy.getValueAt(2)).toMatchObject({ message: message3, handle: undefined, options });
+ });
+
+ it('resolves when onSent callback is called with no arguments', async () => {
+ const { command } = createCommand({ ipc: IPC_FD });
+ const spy = subscribeSpyTo(command.messages.outgoing);
+ const promise = command.send({});
+ spy.getFirstValue().onSent();
+ await expect(promise).resolves.toBeUndefined();
+ });
+
+ it('rejects when onSent callback is called with an argument', async () => {
+ const { command } = createCommand({ ipc: IPC_FD });
+ const spy = subscribeSpyTo(command.messages.outgoing);
+ const promise = command.send({});
+ spy.getFirstValue().onSent('foo');
+ await expect(promise).rejects.toBe('foo');
+ });
});
describe('#kill()', () => {
diff --git a/src/command.ts b/src/command.ts
index ef9a34f9..c700e467 100644
--- a/src/command.ts
+++ b/src/command.ts
@@ -1,4 +1,9 @@
-import { ChildProcess as BaseChildProcess, SpawnOptions } from 'child_process';
+import {
+ ChildProcess as BaseChildProcess,
+ MessageOptions,
+ SendHandle,
+ SpawnOptions,
+} from 'child_process';
import * as Rx from 'rxjs';
import { EventEmitter, Writable } from 'stream';
@@ -33,6 +38,14 @@ export interface CommandInfo {
*/
prefixColor?: string;
+ /**
+ * Whether sending of messages to/from this command (also known as "inter-process communication")
+ * should be enabled, and using which file descriptor number.
+ *
+ * If set, must be > 2.
+ */
+ ipc?: number;
+
/**
* Output command in raw format.
*/
@@ -69,11 +82,21 @@ export interface TimerEvent {
endDate?: Date;
}
+export interface MessageEvent {
+ message: object;
+ handle?: SendHandle;
+}
+
+interface OutgoingMessageEvent extends MessageEvent {
+ options?: MessageOptions;
+ onSent(error?: unknown): void;
+}
+
/**
* Subtype of NodeJS's child_process including only what's actually needed for a command to work.
*/
export type ChildProcess = EventEmitter &
- Pick;
+ Pick;
/**
* Interface for a function that must kill the process with `pid`, optionally sending `signal` to it.
@@ -116,13 +139,23 @@ export class Command implements CommandInfo {
/** @inheritdoc */
readonly cwd?: string;
+ /** @inheritdoc */
+ readonly ipc?: number;
+
readonly close = new Rx.Subject();
readonly error = new Rx.Subject();
readonly stdout = new Rx.Subject();
readonly stderr = new Rx.Subject();
readonly timer = new Rx.Subject();
+ readonly messages = {
+ incoming: new Rx.Subject(),
+ outgoing: new Rx.ReplaySubject(),
+ };
process?: ChildProcess;
+
+ // TODO: Should exit/error/stdio subscriptions be added here?
+ private subscriptions: readonly Rx.Subscription[] = [];
stdin?: Writable;
pid?: number;
killed = false;
@@ -131,7 +164,7 @@ export class Command implements CommandInfo {
state: CommandState = 'stopped';
constructor(
- { index, name, command, prefixColor, env, cwd }: CommandInfo & { index: number },
+ { index, name, command, prefixColor, env, cwd, ipc }: CommandInfo & { index: number },
spawnOpts: SpawnOptions,
spawn: SpawnCommand,
killProcess: KillProcess,
@@ -142,6 +175,7 @@ export class Command implements CommandInfo {
this.prefixColor = prefixColor;
this.env = env || {};
this.cwd = cwd;
+ this.ipc = ipc;
this.killProcess = killProcess;
this.spawn = spawn;
this.spawnOpts = spawnOpts;
@@ -159,8 +193,9 @@ export class Command implements CommandInfo {
const highResStartTime = process.hrtime();
this.timer.next({ startDate });
+ this.subscriptions = [...this.maybeSetupIPC(child)];
Rx.fromEvent(child, 'error').subscribe((event) => {
- this.process = undefined;
+ this.cleanUp();
const endDate = new Date(Date.now());
this.timer.next({ startDate, endDate });
this.error.next(event);
@@ -169,7 +204,7 @@ export class Command implements CommandInfo {
Rx.fromEvent(child, 'close')
.pipe(Rx.map((event) => event as [number | null, NodeJS.Signals | null]))
.subscribe(([exitCode, signal]) => {
- this.process = undefined;
+ this.cleanUp();
// Don't override error event
if (this.state !== 'errored') {
@@ -204,6 +239,56 @@ export class Command implements CommandInfo {
this.stdin = child.stdin || undefined;
}
+ private maybeSetupIPC(child: ChildProcess) {
+ if (!this.ipc) {
+ return [];
+ }
+
+ return [
+ pipeTo(
+ Rx.fromEvent(child, 'message').pipe(
+ Rx.map((event) => {
+ const [message, handle] = event as [object, SendHandle | undefined];
+ return { message, handle };
+ }),
+ ),
+ this.messages.incoming,
+ ),
+ this.messages.outgoing.subscribe((message) => {
+ if (!child.send) {
+ return message.onSent(new Error('Command does not have an IPC channel'));
+ }
+
+ child.send(message.message, message.handle, message.options, (error) => {
+ message.onSent(error);
+ });
+ }),
+ ];
+ }
+
+ /**
+ * Sends a message to the underlying process once it starts.
+ *
+ * @throws If the command doesn't have an IPC channel enabled
+ * @returns Promise that resolves when the message is sent,
+ * or rejects if it fails to deliver the message.
+ */
+ send(message: object, handle?: SendHandle, options?: MessageOptions): Promise {
+ if (this.ipc == null) {
+ throw new Error('Command IPC is disabled');
+ }
+ return new Promise((resolve, reject) => {
+ this.messages.outgoing.next({
+ message,
+ handle,
+ options,
+ onSent(error) {
+ error ? reject(error) : resolve();
+ },
+ });
+ });
+ }
+
/**
* Kills this command, optionally specifying a signal to send to it.
*/
@@ -214,6 +299,12 @@ export class Command implements CommandInfo {
}
}
+ private cleanUp() {
+ this.subscriptions?.forEach((sub) => sub.unsubscribe());
+ this.messages.outgoing = new Rx.ReplaySubject();
+ this.process = undefined;
+ }
+
/**
* Detects whether a command can be killed.
*
@@ -228,5 +319,5 @@ export class Command implements CommandInfo {
* Pipes all events emitted by `stream` into `subject`.
*/
function pipeTo(stream: Rx.Observable, subject: Rx.Subject) {
- stream.subscribe((event) => subject.next(event));
+ return stream.subscribe((event) => subject.next(event));
}
diff --git a/src/concurrently.spec.ts b/src/concurrently.spec.ts
index 1a92317d..9d6aa742 100644
--- a/src/concurrently.spec.ts
+++ b/src/concurrently.spec.ts
@@ -272,13 +272,13 @@ it('uses raw from options for each command', () => {
expect(spawn).toHaveBeenCalledWith(
'echo',
expect.objectContaining({
- stdio: 'inherit',
+ stdio: ['inherit', 'inherit', 'inherit'],
}),
);
expect(spawn).toHaveBeenCalledWith(
'kill',
expect.objectContaining({
- stdio: 'inherit',
+ stdio: ['inherit', 'inherit', 'inherit'],
}),
);
});
@@ -292,13 +292,13 @@ it('uses overridden raw option for each command if specified', () => {
expect(spawn).toHaveBeenCalledWith(
'echo',
expect.objectContaining({
- stdio: 'pipe',
+ stdio: ['pipe', 'pipe', 'pipe'],
}),
);
expect(spawn).toHaveBeenCalledWith(
'echo',
expect.objectContaining({
- stdio: 'inherit',
+ stdio: ['inherit', 'inherit', 'inherit'],
}),
);
});
@@ -312,7 +312,7 @@ it('uses hide from options for each command', () => {
expect(spawn).toHaveBeenCalledWith(
'echo',
expect.objectContaining({
- stdio: 'pipe',
+ stdio: ['pipe', 'pipe', 'pipe'],
}),
);
expect(spawn).toHaveBeenCalledWith(
@@ -333,7 +333,7 @@ it('hides output for commands even if raw option is on', () => {
expect(spawn).toHaveBeenCalledWith(
'echo',
expect.objectContaining({
- stdio: 'inherit',
+ stdio: ['inherit', 'inherit', 'inherit'],
}),
);
expect(spawn).toHaveBeenCalledWith(
diff --git a/src/concurrently.ts b/src/concurrently.ts
index b9390738..9b5ca497 100644
--- a/src/concurrently.ts
+++ b/src/concurrently.ts
@@ -194,6 +194,7 @@ export function concurrently(
...command,
},
getSpawnOpts({
+ ipc: command.ipc,
stdio: hidden ? 'hidden' : command.raw ?? options.raw ? 'raw' : 'normal',
env: command.env,
cwd: command.cwd || options.cwd,
@@ -262,6 +263,7 @@ function mapToCommandInfo(command: ConcurrentlyCommandInput): CommandInfo {
name: command.name || '',
env: command.env || {},
cwd: command.cwd || '',
+ ipc: command.ipc,
...(command.prefixColor
? {
prefixColor: command.prefixColor,
diff --git a/src/fixtures/fake-command.ts b/src/fixtures/fake-command.ts
index 6d0107fe..95ca3d4f 100644
--- a/src/fixtures/fake-command.ts
+++ b/src/fixtures/fake-command.ts
@@ -30,6 +30,7 @@ export class FakeCommand extends Command {
export const createFakeProcess = (pid: number): ChildProcess =>
Object.assign(new EventEmitter(), {
pid,
+ send: jest.fn(),
stdin: new PassThrough(),
stdout: new PassThrough(),
stderr: new PassThrough(),
diff --git a/src/spawn.spec.ts b/src/spawn.spec.ts
index 4d826b81..a4b06fef 100644
--- a/src/spawn.spec.ts
+++ b/src/spawn.spec.ts
@@ -28,17 +28,27 @@ describe('getSpawnOpts()', () => {
});
it('sets stdio to pipe when stdio mode is normal', () => {
- expect(getSpawnOpts({ stdio: 'normal' }).stdio).toBe('pipe');
+ expect(getSpawnOpts({ stdio: 'normal' }).stdio).toEqual(['pipe', 'pipe', 'pipe']);
});
it('sets stdio to inherit when stdio mode is raw', () => {
- expect(getSpawnOpts({ stdio: 'raw' }).stdio).toBe('inherit');
+ expect(getSpawnOpts({ stdio: 'raw' }).stdio).toEqual(['inherit', 'inherit', 'inherit']);
});
it('sets stdio to ignore stdout + stderr when stdio mode is hidden', () => {
expect(getSpawnOpts({ stdio: 'hidden' }).stdio).toEqual(['ignore', 'ignore', 'pipe']);
});
+ it('sets an ipc channel at the specified descriptor index', () => {
+ const opts = getSpawnOpts({ ipc: 3 });
+ expect(opts.stdio?.[3]).toBe('ipc');
+ });
+
+ it('throws if the ipc channel is <= 2', () => {
+ const fn = () => getSpawnOpts({ ipc: 0 });
+ expect(fn).toThrow();
+ });
+
it('merges FORCE_COLOR into env vars if color supported', () => {
const process = { ...baseProcess, env: { foo: 'bar' } };
expect(getSpawnOpts({ process, colorSupport: false }).env).toEqual(process.env);
diff --git a/src/spawn.ts b/src/spawn.ts
index 4f38e433..61645ef7 100644
--- a/src/spawn.ts
+++ b/src/spawn.ts
@@ -1,4 +1,5 @@
-import { ChildProcess, spawn as baseSpawn, SpawnOptions } from 'child_process';
+import assert from 'assert';
+import { ChildProcess, IOType, spawn as baseSpawn, SpawnOptions } from 'child_process';
import supportsColor from 'supports-color';
/**
@@ -26,6 +27,7 @@ export const getSpawnOpts = ({
colorSupport = supportsColor.stdout,
cwd,
process = global.process,
+ ipc,
stdio = 'normal',
env = {},
}: {
@@ -48,6 +50,12 @@ export const getSpawnOpts = ({
*/
cwd?: string;
+ /**
+ * The file descriptor number at which the channel for inter-process communication
+ * should be set up.
+ */
+ ipc?: number;
+
/**
* Which stdio mode to use. Raw implies inheriting the parent process' stdio.
*
@@ -63,13 +71,28 @@ export const getSpawnOpts = ({
* Map of custom environment variables to include in the spawn options.
*/
env?: Record;
-}): SpawnOptions => ({
- cwd: cwd || process.cwd(),
- stdio: stdio === 'normal' ? 'pipe' : stdio === 'raw' ? 'inherit' : ['ignore', 'ignore', 'pipe'],
- ...(/^win/.test(process.platform) && { detached: false }),
- env: {
- ...(colorSupport ? { FORCE_COLOR: colorSupport.level.toString() } : {}),
- ...process.env,
- ...env,
- },
-});
+}): SpawnOptions => {
+ const stdioValues: (IOType | 'ipc')[] =
+ stdio === 'normal'
+ ? ['pipe', 'pipe', 'pipe']
+ : stdio === 'raw'
+ ? ['inherit', 'inherit', 'inherit']
+ : ['ignore', 'ignore', 'pipe'];
+
+ if (ipc != null) {
+ // Avoid overriding the stdout/stderr/stdin
+ assert.ok(ipc > 2, '[concurrently] the IPC channel number should be > 2');
+ stdioValues[ipc] = 'ipc';
+ }
+
+ return {
+ cwd: cwd || process.cwd(),
+ stdio: stdioValues,
+ ...(/^win/.test(process.platform) && { detached: false }),
+ env: {
+ ...(colorSupport ? { FORCE_COLOR: colorSupport.level.toString() } : {}),
+ ...process.env,
+ ...env,
+ },
+ };
+};