Skip to content

Commit

Permalink
feat: basic support of pipe sequences (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsherret authored Jan 26, 2024
1 parent 3fefbdc commit 1bcd684
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 77 deletions.
15 changes: 15 additions & 0 deletions mod.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,21 @@ Deno.test("command .lines()", async () => {
assertEquals(result, ["1", "2"]);
});

Deno.test("piping in command", async () => {
{
const result = await $`echo 1 | cat -`.text();
assertEquals(result, "1");
}
{
const result = await $`echo 1 && echo 2 | cat -`.text();
assertEquals(result, "1\n2");
}
{
const result = await $`echo 1 || echo 2 | cat -`.text();
assertEquals(result, "1");
}
});

Deno.test("redirects", async () => {
await withTempDir(async (tempDir) => {
// absolute
Expand Down
9 changes: 8 additions & 1 deletion mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ import { createPathRef, PathRef } from "./src/path.ts";
export type { Delay, DelayIterator } from "./src/common.ts";
export { FsFileWrapper, PathRef } from "./src/path.ts";
export type { ExpandGlobOptions, PathSymlinkOptions, SymlinkOptions, WalkEntry, WalkOptions } from "./src/path.ts";
export { CommandBuilder, CommandChild, CommandResult, KillSignal, KillSignalController } from "./src/command.ts";
export {
CommandBuilder,
CommandChild,
CommandResult,
KillSignal,
KillSignalController,
type KillSignalListener,
} from "./src/command.ts";
export type { CommandContext, CommandHandler, CommandPipeReader, CommandPipeWriter } from "./src/command_handler.ts";
export type { Closer, Reader, ShellPipeReaderKind, ShellPipeWriterKind, WriterSync } from "./src/pipes.ts";
export type {
Expand Down
35 changes: 25 additions & 10 deletions src/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ function validateCommandName(command: string) {
const SHELL_SIGNAL_CTOR_SYMBOL = Symbol();

interface KillSignalState {
aborted: boolean;
abortedCode: number;
listeners: ((signal: Deno.Signal) => void)[];
}

Expand All @@ -1086,7 +1086,7 @@ export class KillSignalController {

constructor() {
this.#state = {
aborted: false,
abortedCode: 0,
listeners: [],
};
this.#killSignal = new KillSignal(SHELL_SIGNAL_CTOR_SYMBOL, this.#state);
Expand All @@ -1106,6 +1106,9 @@ export class KillSignalController {
}
}

/** Listener for when a KillSignal is killed. */
export type KillSignalListener = (signal: Deno.Signal) => void;

/** Similar to `AbortSignal`, but for `Deno.Signal`.
*
* A `KillSignal` is considered aborted if its controller
Expand All @@ -1128,7 +1131,7 @@ export class KillSignal {
* SIGKILL, SIGABRT, SIGQUIT, SIGINT, or SIGSTOP
*/
get aborted(): boolean {
return this.#state.aborted;
return this.#state.abortedCode !== 0;
}

/**
Expand All @@ -1147,39 +1150,51 @@ export class KillSignal {
};
}

addListener(listener: (signal: Deno.Signal) => void) {
addListener(listener: KillSignalListener) {
this.#state.listeners.push(listener);
}

removeListener(listener: (signal: Deno.Signal) => void) {
removeListener(listener: KillSignalListener) {
const index = this.#state.listeners.indexOf(listener);
if (index >= 0) {
this.#state.listeners.splice(index, 1);
}
}

/** @internal - DO NOT USE. Very unstable. Not sure about this. */
get _abortedExitCode() {
return this.#state.abortedCode;
}
}

function sendSignalToState(state: KillSignalState, signal: Deno.Signal) {
if (signalCausesAbort(signal)) {
state.aborted = true;
const code = getSignalAbortCode(signal);
if (code !== undefined) {
state.abortedCode = code;
}
for (const listener of state.listeners) {
listener(signal);
}
}

function signalCausesAbort(signal: Deno.Signal) {
function getSignalAbortCode(signal: Deno.Signal) {
// consider the command aborted if the signal is any one of these
switch (signal) {
case "SIGTERM":
return 128 + 15;
case "SIGKILL":
return 128 + 9;
case "SIGABRT":
return 128 + 6;
case "SIGQUIT":
return 128 + 3;
case "SIGINT":
return 128 + 2;
case "SIGSTOP":
return true;
// should SIGSTOP be considered an abort?
return 128 + 19;
default:
return false;
return undefined;
}
}

Expand Down
12 changes: 7 additions & 5 deletions src/commands/cat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ export async function catCommand(

async function executeCat(context: CommandContext) {
const flags = parseCatArgs(context.args);
let exit_code = 0;
let exitCode = 0;
const buf = new Uint8Array(1024);

for (const path of flags.paths) {
if (path === "-") { // read from stdin
if (typeof context.stdin === "object") { // stdin is a Reader
while (true) {
while (!context.signal.aborted) {
const size = await context.stdin.read(buf);
if (!size || size === 0) break;
else context.stdout.writeSync(buf.slice(0, size));
}
exitCode = context.signal._abortedExitCode;
} else {
const _assertValue: "null" | "inherit" = context.stdin;
throw new Error(`not supported. stdin was '${context.stdin}'`);
Expand All @@ -40,21 +41,22 @@ async function executeCat(context: CommandContext) {
let file;
try {
file = await Deno.open(pathUtils.join(context.cwd, path), { read: true });
while (true) {
while (!context.signal.aborted) {
// NOTE: rust supports cancellation here
const size = file.readSync(buf);
if (!size || size === 0) break;
else context.stdout.writeSync(buf.slice(0, size));
}
exitCode = context.signal._abortedExitCode;
} catch (err) {
context.stderr.writeLine(`cat ${path}: ${err}`);
exit_code = 1;
exitCode = 1;
} finally {
if (file) file.close();
}
}
}
return exit_code;
return exitCode;
}

export function parseCatArgs(args: string[]): CatFlags {
Expand Down
42 changes: 42 additions & 0 deletions src/pipes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,45 @@ export class PipedBuffer implements WriterSync {
this.#hasSet = true;
}
}

export class PipeSequencePipe implements Reader, WriterSync {
#inner = new Buffer();
#readListener: (() => void) | undefined;
#closed = false;

close() {
this.#readListener?.();
this.#closed = true;
}

writeSync(p: Uint8Array): number {
const value = this.#inner.writeSync(p);
if (this.#readListener !== undefined) {
const listener = this.#readListener;
this.#readListener = undefined;
listener();
}
return value;
}

read(p: Uint8Array): Promise<number | null> {
if (this.#readListener !== undefined) {
// doesn't support multiple read listeners at the moment
throw new Error("Misuse of PipeSequencePipe");
}

if (this.#inner.length === 0) {
if (this.#closed) {
return Promise.resolve(null);
} else {
return new Promise((resolve) => {
this.#readListener = () => {
resolve(this.#inner.readSync(p));
};
});
}
} else {
return Promise.resolve(this.#inner.readSync(p));
}
}
}
Loading

0 comments on commit 1bcd684

Please sign in to comment.