Skip to content

Commit

Permalink
cli: support custom output dir (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
yorkie authored Jun 24, 2020
1 parent 8e98e6d commit 89055ed
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 110 deletions.
70 changes: 2 additions & 68 deletions packages/cli/src/actions/start.ts
Original file line number Diff line number Diff line change
@@ -1,74 +1,8 @@
import { ChildProcess } from 'child_process';
import { createGunzip } from 'zlib';
import * as path from 'path';
import tar from 'tar-stream';
import { mkdirp, createWriteStream, remove } from 'fs-extra';
import { StartHandler } from '../types';
import { listen, get, getFile } from '../request';
import { route } from '../router';
import { tunaMirrorURI } from '../config';
import { ora, tail, parseConfigFilename } from '../utils';
import { run } from '../pipeline';

const start: StartHandler = async (filename: string, opts: any) => {
const spinner = ora();
const cwd = process.cwd();

try {
filename = await parseConfigFilename(filename);
} catch (err) {
spinner.fail(err.message);
return process.exit(1);
}

const params = {
cwd,
config: filename,
pyIndex: opts.tuna ? tunaMirrorURI : undefined
};
if (!opts.verbose) {
const job = await get(`${route.job}/start`, params);
spinner.succeed(`create job(${job.id}) succeeded.`);
} else {
let stdout: ChildProcess, stderr: ChildProcess;
await listen(`${route.job}/start`, params, {
'job created': (e: MessageEvent) => {
const job = JSON.parse(e.data);
spinner.succeed(`start running ${filename}...`);
stdout = tail(job.id, 'stdout');
stderr = tail(job.id, 'stderr');
},
'job finished': async (e: MessageEvent) => {
const job = JSON.parse(e.data);
spinner.succeed(`job(${job.id}) is finished with ${e.data}`);
stdout?.kill();
stderr?.kill();

const outputRootPath = path.join(cwd, 'output');
// remove the output dir
await remove(outputRootPath);

// generate output
const extract = tar.extract();
extract.on('entry', async (header, stream, next) => {
const dist = path.join(outputRootPath, header.name);
if (header.type === 'directory') {
await mkdirp(dist);
} else if (header.type === 'file') {
stream.pipe(createWriteStream(dist));
}
stream.on('end', next);
stream.resume();
});
(await getFile(`${route.job}/${job.id}/output.tar.gz`)).pipe(createGunzip()).pipe(extract);
},
'error': (e: MessageEvent) => {
spinner.fail(`occurrs an error ${e.data}`);
stdout?.kill();
stderr?.kill();
process.exit(1);
}
});
}
return run(filename, opts);
};

export default start;
43 changes: 3 additions & 40 deletions packages/cli/src/bin/pipcook-job.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#!/usr/bin/env node

import { ChildProcess } from 'child_process';
import program from 'commander';
import { get, listen } from '../request';
import { get } from '../request';
import { run } from '../pipeline';
import { route } from '../router';
import { tail } from '../utils';
import { tunaMirrorURI } from '../config';
import { ora } from '../utils';

const PipelineStatus = [ 'creating', 'running', 'success', 'fail' ];
Expand All @@ -18,42 +16,6 @@ async function list(): Promise<void> {
}), [ 'id', 'status', 'evaluatePass', 'createdAt' ]);
}

async function run(id: string, opts: any): Promise<void> {
const spinner = ora();
const params = {
cwd: process.cwd(),
pipelineId: id,
pyIndex: opts.tuna ? tunaMirrorURI : undefined
};
if (!opts.verbose) {
const job = await get(`${route.job}/run`, params);
spinner.succeed(`create job(${job.id}) succeeded.`);
} else {
let stdout: ChildProcess, stderr: ChildProcess;
spinner.start(`start running ${id}...`);
await listen(`${route.job}/run`, params, {
'job created': (e: MessageEvent) => {
const job = JSON.parse(e.data);
spinner.succeed(`create job(${job.id}) succeeded.`);
stdout = tail(job.id, 'stdout');
stderr = tail(job.id, 'stderr');
},
'job finished': (e: MessageEvent) => {
const job = JSON.parse(e.data);
spinner.succeed(`job(${job.id}) is finished with ${e.data}`);
stdout?.kill();
stderr?.kill();
},
'error': (e: MessageEvent) => {
spinner.fail(`occurrs an error ${e.data}`);
stdout?.kill();
stderr?.kill();
process.exit(1);
}
});
}
}

async function remove(): Promise<void> {
const spinner = ora();
spinner.start('removing jobs...');
Expand Down Expand Up @@ -82,6 +44,7 @@ program
.command('run <pipeline>')
.option('--verbose', 'prints verbose logs', true)
.option('--tuna', 'use tuna mirror to install python packages')
.option('--output', 'the output directory name', 'output')
.action(run)
.description('run a job from a pipeline id');

Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/bin/pipcook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const pkg = require('../../package.json');
.command('run <filename>')
.option('--verbose', 'prints verbose logs', true)
.option('--tuna', 'use tuna mirror to install python packages')
.option('--output', 'the output directory name', 'output')
.description('run pipeline with a json file.')
.action(start);

Expand Down
71 changes: 69 additions & 2 deletions packages/cli/src/pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { ora, parseConfigFilename, cwd } from "./utils";
import { ChildProcess } from 'child_process';
import { createGunzip } from 'zlib';
import { join } from 'path';
import tar from 'tar-stream';
import { ora, parseConfigFilename, cwd, tail } from "./utils";
import { tunaMirrorURI } from "./config";
import { route } from "./router";
import { listen } from "./request";
import { listen, get, getFile } from "./request";
import { remove, createWriteStream, mkdirp } from 'fs-extra';

export async function install(filename: string, opts: any): Promise<void> {
const spinner = ora();
Expand Down Expand Up @@ -36,3 +41,65 @@ export async function install(filename: string, opts: any): Promise<void> {
}
});
}

export async function run(filename: string, opts: any): Promise<void> {
const spinner = ora();
const cwd = process.cwd();

try {
filename = await parseConfigFilename(filename);
} catch (err) {
spinner.fail(err.message);
return process.exit(1);
}

const params = {
cwd,
config: filename,
pyIndex: opts.tuna ? tunaMirrorURI : undefined
};
if (!opts.verbose) {
const job = await get(`${route.job}/start`, params);
spinner.succeed(`create job(${job.id}) succeeded.`);
} else {
let stdout: ChildProcess, stderr: ChildProcess;
await listen(`${route.job}/start`, params, {
'job created': (e: MessageEvent) => {
const job = JSON.parse(e.data);
spinner.succeed(`start running ${filename}...`);
stdout = tail(job.id, 'stdout');
stderr = tail(job.id, 'stderr');
},
'job finished': async (e: MessageEvent) => {
const job = JSON.parse(e.data);
spinner.succeed(`job(${job.id}) is finished with ${e.data}`);
stdout?.kill();
stderr?.kill();

const outputRootPath = join(cwd, opts.output || 'output');
// remove the output dir
await remove(outputRootPath);

// generate output
const extract = tar.extract();
extract.on('entry', async (header, stream, next) => {
const dist = join(outputRootPath, header.name);
if (header.type === 'directory') {
await mkdirp(dist);
} else if (header.type === 'file') {
stream.pipe(createWriteStream(dist));
}
stream.on('end', next);
stream.resume();
});
(await getFile(`${route.job}/${job.id}/output.tar.gz`)).pipe(createGunzip()).pipe(extract);
},
'error': (e: MessageEvent) => {
spinner.fail(`occurrs an error ${e.data}`);
stdout?.kill();
stderr?.kill();
process.exit(1);
}
});
}
}

0 comments on commit 89055ed

Please sign in to comment.