From 7cb4b128762a0101e62c436e65954f142439e462 Mon Sep 17 00:00:00 2001 From: Contra Date: Tue, 27 Sep 2016 15:18:52 -0400 Subject: [PATCH] use streams to use less memory --- dist/index.js | 40 +++++++++++++++++++++------------------- package.json | 16 ++++++++-------- src/index.js | 33 ++++++++++++++++----------------- 3 files changed, 45 insertions(+), 44 deletions(-) diff --git a/dist/index.js b/dist/index.js index 9f162e1..8fe503c 100644 --- a/dist/index.js +++ b/dist/index.js @@ -6,8 +6,6 @@ var _async = require('async'); var _async2 = _interopRequireDefault(_async); -var _buffer = require('buffer'); - var _path = require('path'); var _path2 = _interopRequireDefault(_path); @@ -20,6 +18,14 @@ var _shp2json = require('shp2json'); var _shp2json2 = _interopRequireDefault(_shp2json); +var _JSONStream = require('JSONStream'); + +var _JSONStream2 = _interopRequireDefault(_JSONStream); + +var _through2Asyncmap = require('through2-asyncmap'); + +var _through2Asyncmap2 = _interopRequireDefault(_through2Asyncmap); + var _plural = require('plural'); var _plural2 = _interopRequireDefault(_plural); @@ -46,13 +52,14 @@ var _debug3 = _interopRequireDefault(_debug2); function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } -var debug = (0, _debug3.default)('census'); /*eslint no-console: 0 */ +/*eslint no-console: 0 */ + +var debug = (0, _debug3.default)('census'); exports.default = function (overrides, _ref) { var onBoundary = _ref.onBoundary; var onFinish = _ref.onFinish; - if (!onBoundary) throw new Error('Missing onBoundary!'); if (!onFinish) throw new Error('Missing onFinish!'); onFinish = (0, _once2.default)(onFinish); @@ -86,26 +93,21 @@ function processFilePath(context, file, cb) { cb = (0, _once2.default)(cb); var ftp = context.ftp; - ftp.get(file.path, function (err, stream) { + ftp.get(file.path, function (err, srcStream) { if (err) return cb(err); - var srcStream = (0, _shp2json2.default)(stream); - var chunks = []; - - srcStream.on('data', function (data) { - chunks.push(data); - }); - - srcStream.once('error', function (err) { + var count = 0; + (0, _shp2json2.default)(srcStream).pipe(_JSONStream2.default.parse('features.*')).pipe(_through2Asyncmap2.default.obj(function (feat, done) { + ++count; + context.onBoundary(file.type, feat, done); + })).once('error', function (err) { return cb(err); - }); - srcStream.once('end', function () { - var docs = JSON.parse(_buffer.Buffer.concat(chunks)).features; - debug(' -- ' + _chalk2.default.cyan('Parsed ' + file.path + ', inserting ' + docs.length + ' boundaries now...')); - _async2.default.forEach(docs, _async2.default.ensureAsync(context.onBoundary.bind(null, file.type)), cb); + }).once('end', function () { + debug(' -- ' + _chalk2.default.cyan('Parsed ' + file.path + ' and inserted ' + count + ' boundaries')); + cb(); }); - stream.resume(); + srcStream.resume(); }); } diff --git a/package.json b/package.json index 18e63a9..feda6b8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "census-boundaries", - "version": "1.0.6", + "version": "1.1.0", "description": "Downloads and converts US Census TIGER data representing all boundaries in the United States", "main": "dist/index.js", "keywords": [ @@ -39,18 +39,17 @@ "babel": "^6.3.26", "babel-cli": "^6.4.0", "babel-core": "^6.4.0", - "babel-eslint": "^4.1.6", + "babel-eslint": "^7.0.0", "babel-loader": "^6.2.1", - "babel-plugin-add-module-exports": "^0.1.2", + "babel-plugin-add-module-exports": "^0.2.1", "babel-plugin-transform-runtime": "^6.4.3", "babel-preset-es2015": "^6.3.13", - "babel-preset-es2015-loose": "^7.0.0", + "babel-preset-es2015-loose": "^8.0.0", "babel-preset-stage-0": "^6.3.13", "babel-register": "^6.4.3", "babelify": "^7.2.0", - "eslint": "^1.10.3", + "eslint": "^3.6.1", "eslint-cli": "^1.0.0", - "eslint-config-rackt": "^1.1.1", "github-changes": "^1.0.1", "rimraf": "^2.5.0" }, @@ -66,7 +65,6 @@ }, "eslintConfig": { "parser": "babel-eslint", - "extends": "rackt", "env": { "node": true, "es6": true @@ -76,6 +74,7 @@ } }, "dependencies": { + "JSONStream": "^1.2.1", "async": "^2.0.0-rc.3", "babel-register": "^6.8.0", "babel-runtime": "^6.3.19", @@ -86,6 +85,7 @@ "meow": "^3.7.0", "once": "^1.3.3", "plural": "^0.2.0", - "shp2json": "^1.2.1" + "shp2json": "^1.2.1", + "through2-asyncmap": "^1.1.0" } } diff --git a/src/index.js b/src/index.js index 5305509..3c320e4 100644 --- a/src/index.js +++ b/src/index.js @@ -1,10 +1,11 @@ /*eslint no-console: 0 */ import async from 'async' -import { Buffer } from 'buffer' import path from 'path' import chalk from 'chalk' import toJSON from 'shp2json' +import JSONStream from 'JSONStream' +import map from 'through2-asyncmap' import plural from 'plural' import defaultsDeep from 'lodash.defaultsdeep' import once from 'once' @@ -14,7 +15,6 @@ import _debug from 'debug' const debug = _debug('census') export default (overrides, { onBoundary, onFinish }) => { - if (!onBoundary) throw new Error('Missing onBoundary!') if (!onFinish) throw new Error('Missing onFinish!') onFinish = once(onFinish) @@ -47,24 +47,23 @@ function processObject(context, object, cb) { function processFilePath(context, file, cb) { cb = once(cb) const { ftp } = context - ftp.get(file.path, (err, stream) => { + ftp.get(file.path, (err, srcStream) => { if (err) return cb(err) - const srcStream = toJSON(stream) - const chunks = [] - - srcStream.on('data', (data) => { - chunks.push(data) - }) - - srcStream.once('error', (err) => cb(err)) - srcStream.once('end', () => { - const docs = JSON.parse(Buffer.concat(chunks)).features - debug(` -- ${chalk.cyan(`Parsed ${file.path}, inserting ${docs.length} boundaries now...`)}`) - async.forEach(docs, async.ensureAsync(context.onBoundary.bind(null, file.type)), cb) - }) + let count = 0 + toJSON(srcStream) + .pipe(JSONStream.parse('features.*')) + .pipe(map.obj((feat, done) => { + ++count + context.onBoundary(file.type, feat, done) + })) + .once('error', (err) => cb(err)) + .once('end', () => { + debug(` -- ${chalk.cyan(`Parsed ${file.path} and inserted ${count} boundaries`)}`) + cb() + }) - stream.resume() + srcStream.resume() }) }