Skip to content

Commit

Permalink
Merge pull request #617 from sethkinast/chunk.stream
Browse files Browse the repository at this point in the history
Add `chunk.stream` to allow streamables in context
  • Loading branch information
prashn64 committed Apr 14, 2015
2 parents f833b34 + 52891cf commit 98dda84
Show file tree
Hide file tree
Showing 11 changed files with 6,663 additions and 292 deletions.
80 changes: 62 additions & 18 deletions .jshintrc
Original file line number Diff line number Diff line change
@@ -1,20 +1,64 @@
{
"curly": true,
"eqeqeq": true,
"immed": true,
"latedef": true,
"newcap": true,
"noarg": true,
"sub": true,
"undef": true,
"unused": false,
"boss": true,
"eqnull": true,
"browser": true,
"globals": {
"jQuery": true,
"require": true,
"module": true,
"define": true
}
"bitwise" : true,
"curly" : true,
"eqeqeq" : true,
"forin" : false,
"immed" : true,
"latedef" : true,
"newcap" : true,
"noarg" : true,
"noempty" : true,
"nonew" : true,
"plusplus" : false,
"regexp" : true,
"undef" : true,
"strict" : false,
"trailing" : true,
"unused" : true,

"asi" : false,
"boss" : false,
"debug" : false,
"eqnull" : true,
"esnext" : false,
"evil" : false,
"expr" : false,
"funcscope" : false,
"globalstrict" : false,
"iterator" : false,
"lastsemic" : false,
"laxbreak" : false,
"laxcomma" : false,
"loopfunc" : false,
"multistr" : false,
"onecase" : false,
"proto" : false,
"regexdash" : false,
"scripturl" : false,
"smarttabs" : false,
"shadow" : false,
"sub" : false,
"supernew" : false,
"validthis" : false,

"browser" : true,
"couch" : false,
"devel" : false,
"dojo" : false,
"jquery" : false,
"mootools" : false,
"node" : true,
"nonstandard" : false,
"prototypejs" : false,
"rhino" : false,
"wsh" : false,

"nomen" : false,
"onevar" : false,
"passfail" : false,
"white" : false,

"predef" : [
"define"
]
}
2 changes: 1 addition & 1 deletion Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ module.exports = function(grunt) {
display: 'short',
helpers: ['test/jasmine-test/spec/coreTests.js'],
specs: ['test/jasmine-test/spec/testHelpers.js', 'test/jasmine-test/spec/renderTestSpec.js'],
vendor: 'node_modules/ayepromise/ayepromise.js'
vendor: ['node_modules/ayepromise/ayepromise.js', 'test/lib/highland.js']
}
},
/*tests unminified code, mostly used for debugging by `grunt dev` task*/
Expand Down
4 changes: 3 additions & 1 deletion lib/compiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@
if (out.length) {
context.blocks = 'ctx=ctx.shiftBlocks(blocks);';
return 'var blocks={' + out.join(',') + '};';
} else {
context.blocks = '';
}
return context.blocks = '';
return context.blocks;
}

function compileBodies(context) {
Expand Down
133 changes: 120 additions & 13 deletions lib/dust.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
}
};

dust.loadSource = function(source, path) {
dust.loadSource = function(source) {
/*jshint evil:true*/
return eval(source);
};
Expand Down Expand Up @@ -221,6 +221,16 @@
typeof elem.then === 'function';
};

/**
* Decide very naively if something is a Stream.
* @param elem {*} object to inspect
* @return {Boolean} is `elem` a Stream?
*/
dust.isStreamable = function(elem) {
return elem &&
typeof elem.on === 'function';
};

// apply the filter chain and return the output string
dust.filter = function(string, auto, filters) {
var i, len, name;
Expand Down Expand Up @@ -421,7 +431,7 @@
return this.stack && this.stack.head;
};

Context.prototype.getBlock = function(key, chk, ctx) {
Context.prototype.getBlock = function(key) {
var blocks, len, fn;

if (typeof key === 'function') {
Expand Down Expand Up @@ -513,6 +523,9 @@
this.callback(null, this.out);
};

/**
* Creates an interface sort of like a Streams2 ReadableStream.
*/
function Stream() {
this.head = new Chunk(this);
}
Expand All @@ -525,6 +538,7 @@
this.emit('data', chunk.data.join('')); //ie7 perf
} else if (chunk.error) {
this.emit('error', chunk.error);
this.emit('end');
dust.log('Streaming failed with error `' + chunk.error + '`', ERROR);
this.flush = EMPTY_FUNC;
return;
Expand All @@ -537,20 +551,26 @@
this.emit('end');
};

/**
* Executes listeners for `type` by passing data. Note that this is different from a
* Node stream, which can pass an arbitrary number of arguments
* @return `true` if event had listeners, `false` otherwise
*/
Stream.prototype.emit = function(type, data) {
var events = this.events || {},
handlers = events[type] || [],
i, l;

if (!handlers.length) {
dust.log('Stream broadcasting, but no listeners for `' + type + '`', DEBUG);
return;
return false;
}

handlers = handlers.slice(0);
for (i = 0, l = handlers.length; i < l; i++) {
handlers[i](data);
}
return true;
};

Stream.prototype.on = function(type, callback) {
Expand All @@ -565,24 +585,52 @@
return this;
};

/**
* Pipes to a WritableStream. Note that backpressure isn't implemented,
* so we just write as fast as we can.
* @param stream {WritableStream}
* @return self
*/
Stream.prototype.pipe = function(stream) {
if(typeof stream.write !== 'function' ||
typeof stream.end !== 'function') {
dust.log('Incompatible stream passed to `pipe`', WARN);
return this;
}

var destEnded = false;

if(typeof stream.emit === 'function') {
stream.emit('pipe', this);
}

if(typeof stream.on === 'function') {
stream.on('error', function() {
destEnded = true;
});
}

return this
.on('data', function(data) {
if(destEnded) {
return;
}
try {
stream.write(data, 'utf8');
} catch (err) {
dust.log(err, ERROR);
}
})
.on('end', function() {
if(destEnded) {
return;
}
try {
stream.end();
destEnded = true;
} catch (err) {
dust.log(err, ERROR);
}
})
.on('error', function(err) {
stream.error(err);
});
};

Expand Down Expand Up @@ -653,10 +701,14 @@
elem = elem.apply(context.current(), [this, context, null, {auto: auto, filters: filters}]);
if (elem instanceof Chunk) {
return elem;
} else {
return this.reference(elem, context, auto, filters);
}
}
if (dust.isThenable(elem)) {
return this.await(elem, context);
return this.await(elem, context, null, auto, filters);
} else if (dust.isStreamable(elem)) {
return this.stream(elem, context, null, auto, filters);
} else if (!dust.isEmpty(elem)) {
return this.write(dust.filter(elem, auto, filters));
} else {
Expand Down Expand Up @@ -699,17 +751,17 @@
if (len > 0) {
// any custom helper can blow up the stack and store a flattened context, guard defensively
if(context.stack.head) {
context.stack.head['$len'] = len;
context.stack.head.$len = len;
}
for (i = 0; i < len; i++) {
if(context.stack.head) {
context.stack.head['$idx'] = i;
context.stack.head.$idx = i;
}
chunk = body(chunk, context.push(elem[i], i, len));
}
if(context.stack.head) {
context.stack.head['$idx'] = undefined;
context.stack.head['$len'] = undefined;
context.stack.head.$idx = undefined;
context.stack.head.$len = undefined;
}
return chunk;
}
Expand All @@ -719,6 +771,8 @@
}
} else if (dust.isThenable(elem)) {
return this.await(elem, context, bodies);
} else if (dust.isStreamable(elem)) {
return this.stream(elem, context, bodies);
} else if (elem === true) {
// true is truthy but does not change context
if (body) {
Expand Down Expand Up @@ -830,15 +884,15 @@
* @param bodies {Object} must contain a "body", may contain an "error"
* @return {Chunk}
*/
Chunk.prototype.await = function(thenable, context, bodies) {
Chunk.prototype.await = function(thenable, context, bodies, auto, filters) {
var body = bodies && bodies.block,
errorBody = bodies && bodies.error;
return this.map(function(chunk) {
thenable.then(function(data) {
if(body) {
chunk.render(body, context.push(data)).end();
} else {
chunk.end(data);
chunk.reference(data, context, auto, filters).end();
}
}, function(err) {
if(errorBody) {
Expand All @@ -851,6 +905,59 @@
});
};

/**
* Reserve a chunk to be evaluated with the contents of a streamable.
* Currently an error event will bomb out the stream. Once an error
* is received, we push it to an {:error} block if one exists, and log otherwise,
* then stop listening to the stream.
* @param streamable {Streamable} the target streamable that will emit events
* @param context {Context} context to use to render each thunk
* @param bodies {Object} must contain a "body", may contain an "error"
* @return {Chunk}
*/
Chunk.prototype.stream = function(stream, context, bodies, auto, filters) {
var body = bodies && bodies.block,
errorBody = bodies && bodies.error;
return this.map(function(chunk) {
var ended = false;
stream
.on('data', function data(thunk) {
if(ended) {
return;
}
if(body) {
// Fork a new chunk out of the blockstream so that we can flush it independently
chunk = chunk.map(function(chunk) {
chunk.render(body, context.push(thunk)).end();
});
} else {
// Don't fork, just write into the master async chunk
chunk = chunk.reference(thunk, context, auto, filters);
}
})
.on('error', function error(err) {
if(ended) {
return;
}
if(errorBody) {
chunk.render(errorBody, context.push(err));
} else {
dust.log('Unhandled stream error in `' + context.getTemplateName() + '`');
}
if(!ended) {
ended = true;
chunk.end();
}
})
.on('end', function end() {
if(!ended) {
ended = true;
chunk.end();
}
});
});
};

Chunk.prototype.capture = function(body, context, callback) {
return this.map(function(chunk) {
var stub = new Stub(function(err, out) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"grunt-jasmine-nodejs": "~1.0.2",
"grunt-shell": "~1.1.2",
"grunt-template-jasmine-istanbul": "~0.3.3",
"highland": "2.4.0",
"pegjs": "0.8.0"
},
"license": "MIT",
Expand Down
Loading

0 comments on commit 98dda84

Please sign in to comment.