Skip to content

Commit

Permalink
feat: async iterate readable streams, iterate function (#30)
Browse files Browse the repository at this point in the history
- async iterator on all bluestream readable streams
- `iterate(stream)` any readable stream

BREAKING CHANGE: dropped the default export
  • Loading branch information
reconbot authored May 3, 2018
1 parent de32329 commit 7c2fe7b
Show file tree
Hide file tree
Showing 13 changed files with 1,038 additions and 809 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ cache:
notifications:
email: false
node_js:
- '10'
- '9'
- '8'
after_success:
Expand Down
20 changes: 2 additions & 18 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { collect } from './collect'
import { filter, FilterStream } from './filter'
import { iterate } from './iterate'
import { pipe } from './pipe'
import { IReadableStreamOptions, read, readFunction, ReadStream } from './read'
import { readAsync } from './readAsync'
Expand Down Expand Up @@ -29,24 +30,7 @@ export {
WriteStream,
collect,
filter,
map,
pipe,
read,
readAsync,
reduce,
transform,
wait,
write,
}

export default {
FilterStream,
ReadStream,
ReduceStream,
TransformStream,
WriteStream,
collect,
filter,
iterate,
map,
pipe,
read,
Expand Down
46 changes: 46 additions & 0 deletions lib/iterate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Readable } from 'stream'
import { readAsync } from '.'

if ((Symbol as any).asyncIterator === undefined) {
((Symbol as any).asyncIterator) = Symbol.for('asyncIterator')
}

async function* iterateObjectMode (stream) {
let data = true
while (data) {
data = await readAsync(stream, 1)
if (data) {
yield data[0]
}
}
}

async function* iterateBufferMode (stream) {
let data = true
while (data) {
data = await readAsync(stream)
if (data) {
yield data
}
}
}

export function internalIterator (stream: Readable) {
const readableState = (stream as any)._readableState
const objectMode = readableState && readableState.objectMode
if (objectMode) {
return iterateObjectMode(stream)
}
return iterateBufferMode(stream)
}

export function iterate (stream: Readable) {
if (stream[(Symbol as any).asyncIterator]) {
return stream[(Symbol as any).asyncIterator]()
}
const objectMode = (stream as any)._readableState.objectMode
if (objectMode) {
return iterateObjectMode(stream)
}
return iterateBufferMode(stream)
}
6 changes: 6 additions & 0 deletions lib/read.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Readable, ReadableOptions } from 'stream'
import { IBluestream } from './interfaces'
import { internalIterator } from './iterate'
import { readAsync } from './readAsync'
import { defer, maybeResume } from './utils'

async function readHandler (bytes) {
Expand Down Expand Up @@ -113,5 +115,9 @@ export class ReadStream extends Readable implements IBluestream {
}
}

ReadStream.prototype[(Symbol as any).asyncIterator] = function () {
return internalIterator(this)
}

export const read =
(opts: IReadableStreamOptions | readFunction = {}, readFn?: readFunction) => new ReadStream(opts, readFn)
37 changes: 27 additions & 10 deletions lib/readAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ const readOnceAsync = async (stream: Readable, count?: number) => {
if (data !== null) {
return data
}
if ((stream as any)._readableState.ended) {
return null
}
return new Promise(resolve => {
stream.once('readable', () => {
const nextData = stream.read(count)
Expand All @@ -17,17 +20,10 @@ const readOnceAsync = async (stream: Readable, count?: number) => {
})
}

export const readAsync = async (stream, count) => {
if (!(stream && stream._readableState)) {
throw new TypeError('"stream" is not a readable stream')
}
if (stream._readableState.flowing) {
// tslint:disable-next-line
throw new TypeError('"stream" is in flowing mode, this is probably not what you want as data loss could occur. Please use stream.pause() to pause the stream before calling readAsync.');
}

const objectMode = stream._readableState.objectMode
const internalReadAsync = async (stream: Readable, count?: number) => {
const { resolve, reject, promise } = defer()
const readableState = (stream as any)._readableState
const objectMode = readableState && readableState.objectMode

const cleanup = () => {
stream.removeListener('error', reject)
Expand Down Expand Up @@ -57,3 +53,24 @@ export const readAsync = async (stream, count) => {
}
return promise
}

const inflightReads = new WeakMap()
export const readAsync = async (stream: Readable, count?: number) => {
if (!(stream && (stream as any)._readableState)) {
throw new TypeError('"stream" is not a readable stream')
}
if ((stream as any)._readableState.flowing) {
// tslint:disable-next-line
throw new TypeError('"stream" is in flowing mode, this is probably not what you want as data loss could occur. Please use stream.pause() to pause the stream before calling readAsync.');
}

const inflightRead = inflightReads.get(stream)
if (inflightRead) {
const queuedRead = inflightRead.then(() => internalReadAsync(stream, count))
inflightReads.set(stream, queuedRead)
return queuedRead
}
const readOperation = internalReadAsync(stream, count)
inflightReads.set(stream, readOperation)
return readOperation
}
Loading

0 comments on commit 7c2fe7b

Please sign in to comment.