Skip to content

Commit

Permalink
use streams to use less memory
Browse files Browse the repository at this point in the history
  • Loading branch information
Contra committed Sep 27, 2016
1 parent 4fb2cc5 commit 7cb4b12
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 44 deletions.
40 changes: 21 additions & 19 deletions dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ var _async = require('async');

var _async2 = _interopRequireDefault(_async);

var _buffer = require('buffer');

var _path = require('path');

var _path2 = _interopRequireDefault(_path);
Expand All @@ -20,6 +18,14 @@ var _shp2json = require('shp2json');

var _shp2json2 = _interopRequireDefault(_shp2json);

var _JSONStream = require('JSONStream');

var _JSONStream2 = _interopRequireDefault(_JSONStream);

var _through2Asyncmap = require('through2-asyncmap');

var _through2Asyncmap2 = _interopRequireDefault(_through2Asyncmap);

var _plural = require('plural');

var _plural2 = _interopRequireDefault(_plural);
Expand All @@ -46,13 +52,14 @@ var _debug3 = _interopRequireDefault(_debug2);

function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }

var debug = (0, _debug3.default)('census'); /*eslint no-console: 0 */
/*eslint no-console: 0 */

var debug = (0, _debug3.default)('census');

exports.default = function (overrides, _ref) {
var onBoundary = _ref.onBoundary;
var onFinish = _ref.onFinish;


if (!onBoundary) throw new Error('Missing onBoundary!');
if (!onFinish) throw new Error('Missing onFinish!');
onFinish = (0, _once2.default)(onFinish);
Expand Down Expand Up @@ -86,26 +93,21 @@ function processFilePath(context, file, cb) {
cb = (0, _once2.default)(cb);
var ftp = context.ftp;

ftp.get(file.path, function (err, stream) {
ftp.get(file.path, function (err, srcStream) {
if (err) return cb(err);

var srcStream = (0, _shp2json2.default)(stream);
var chunks = [];

srcStream.on('data', function (data) {
chunks.push(data);
});

srcStream.once('error', function (err) {
var count = 0;
(0, _shp2json2.default)(srcStream).pipe(_JSONStream2.default.parse('features.*')).pipe(_through2Asyncmap2.default.obj(function (feat, done) {
++count;
context.onBoundary(file.type, feat, done);
})).once('error', function (err) {
return cb(err);
});
srcStream.once('end', function () {
var docs = JSON.parse(_buffer.Buffer.concat(chunks)).features;
debug(' -- ' + _chalk2.default.cyan('Parsed ' + file.path + ', inserting ' + docs.length + ' boundaries now...'));
_async2.default.forEach(docs, _async2.default.ensureAsync(context.onBoundary.bind(null, file.type)), cb);
}).once('end', function () {
debug(' -- ' + _chalk2.default.cyan('Parsed ' + file.path + ' and inserted ' + count + ' boundaries'));
cb();
});

stream.resume();
srcStream.resume();
});
}

Expand Down
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "census-boundaries",
"version": "1.0.6",
"version": "1.1.0",
"description": "Downloads and converts US Census TIGER data representing all boundaries in the United States",
"main": "dist/index.js",
"keywords": [
Expand Down Expand Up @@ -39,18 +39,17 @@
"babel": "^6.3.26",
"babel-cli": "^6.4.0",
"babel-core": "^6.4.0",
"babel-eslint": "^4.1.6",
"babel-eslint": "^7.0.0",
"babel-loader": "^6.2.1",
"babel-plugin-add-module-exports": "^0.1.2",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-plugin-transform-runtime": "^6.4.3",
"babel-preset-es2015": "^6.3.13",
"babel-preset-es2015-loose": "^7.0.0",
"babel-preset-es2015-loose": "^8.0.0",
"babel-preset-stage-0": "^6.3.13",
"babel-register": "^6.4.3",
"babelify": "^7.2.0",
"eslint": "^1.10.3",
"eslint": "^3.6.1",
"eslint-cli": "^1.0.0",
"eslint-config-rackt": "^1.1.1",
"github-changes": "^1.0.1",
"rimraf": "^2.5.0"
},
Expand All @@ -66,7 +65,6 @@
},
"eslintConfig": {
"parser": "babel-eslint",
"extends": "rackt",
"env": {
"node": true,
"es6": true
Expand All @@ -76,6 +74,7 @@
}
},
"dependencies": {
"JSONStream": "^1.2.1",
"async": "^2.0.0-rc.3",
"babel-register": "^6.8.0",
"babel-runtime": "^6.3.19",
Expand All @@ -86,6 +85,7 @@
"meow": "^3.7.0",
"once": "^1.3.3",
"plural": "^0.2.0",
"shp2json": "^1.2.1"
"shp2json": "^1.2.1",
"through2-asyncmap": "^1.1.0"
}
}
33 changes: 16 additions & 17 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/*eslint no-console: 0 */

import async from 'async'
import { Buffer } from 'buffer'
import path from 'path'
import chalk from 'chalk'
import toJSON from 'shp2json'
import JSONStream from 'JSONStream'
import map from 'through2-asyncmap'
import plural from 'plural'
import defaultsDeep from 'lodash.defaultsdeep'
import once from 'once'
Expand All @@ -14,7 +15,6 @@ import _debug from 'debug'
const debug = _debug('census')

export default (overrides, { onBoundary, onFinish }) => {

if (!onBoundary) throw new Error('Missing onBoundary!')
if (!onFinish) throw new Error('Missing onFinish!')
onFinish = once(onFinish)
Expand Down Expand Up @@ -47,24 +47,23 @@ function processObject(context, object, cb) {
function processFilePath(context, file, cb) {
cb = once(cb)
const { ftp } = context
ftp.get(file.path, (err, stream) => {
ftp.get(file.path, (err, srcStream) => {
if (err) return cb(err)

const srcStream = toJSON(stream)
const chunks = []

srcStream.on('data', (data) => {
chunks.push(data)
})

srcStream.once('error', (err) => cb(err))
srcStream.once('end', () => {
const docs = JSON.parse(Buffer.concat(chunks)).features
debug(` -- ${chalk.cyan(`Parsed ${file.path}, inserting ${docs.length} boundaries now...`)}`)
async.forEach(docs, async.ensureAsync(context.onBoundary.bind(null, file.type)), cb)
})
let count = 0
toJSON(srcStream)
.pipe(JSONStream.parse('features.*'))
.pipe(map.obj((feat, done) => {
++count
context.onBoundary(file.type, feat, done)
}))
.once('error', (err) => cb(err))
.once('end', () => {
debug(` -- ${chalk.cyan(`Parsed ${file.path} and inserted ${count} boundaries`)}`)
cb()
})

stream.resume()
srcStream.resume()
})
}

Expand Down

0 comments on commit 7cb4b12

Please sign in to comment.