Skip to content

Commit

Permalink
feat(readAsync): Read a specific number of bytes or objects from a st…
Browse files Browse the repository at this point in the history
…ream (#26)
  • Loading branch information
reconbot authored Mar 16, 2018
1 parent 95528f1 commit be80c93
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 23 deletions.
23 changes: 15 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ Originally forked from [promise-streams](https://github.com/spion/promise-stream
- `FilterStream` similar to `Array.prototype.filter` Easy stream filtering of data
- `ReduceStream` similar to `Array.prototype.reduce` but a stream that emits each step and `.promise()` resolves to the end result

- `bstream.wait(stream)` resolves when the stream finishes
- `bstream.collect(stream)` Concats strings and buffers, returns an array of objects.
- `bstream.pipe(source, target, [target,])` Returns a promise for when the last target stream finishes
- `bluestream.wait(stream)` resolves when the stream finishes
- `bluestream.collect(stream)` Concats strings and buffers, returns an array of objects.
- `bluestream.readAsync(stream, numberOfBytesOrObjects)` Reads a number of bytes or objects from a stream
- `bluestream.pipe(source, target, [target,])` Returns a promise for when the last target stream finishes

# Examples

Expand Down Expand Up @@ -142,7 +143,7 @@ Options:

The other options are also passed to node's Write stream constructor.

#### ps.filter
#### filter

`([opts:Options,] fn: async (data[, enc]) => boolean) => FilterStream`

Expand All @@ -151,7 +152,7 @@ indicate whether the data value should pass to the next stream

Options: Same as `ps.transform`

#### ps.reduce
#### reduce

`([opts:Options,] fn: (acc, data[, enc]) => Promise) => ReduceStream`

Expand All @@ -170,25 +171,31 @@ process.stdin.pipe(split()).pipe(es.reduce(function(acc, el) {
});
```

#### ps.wait
#### wait

`(s: Stream) => Promise`

Wait for the stream to end. Also captures errors.

#### ps.pipe
#### pipe

`(source: Stream, destination: Stream) => Promise`

Pipes s1 to s2 and forwards all errors to the resulting promise. The promise is
fulfilled without a value when the destination stream ends.

#### ps.collect
#### collect

`(source: Stream) => Promise`

Returns a Buffer, string or array of all the data events concatenated together. If no events null is returned.

#### readAsync

`(source: Stream, count: Number) => Promise`

Returns a count of bytes in a Buffer, characters in a string or objects in an array. If no data arrives before the stream ends `null` is returned.

#### PromiseStream.promise

`() => Promise`
Expand Down
33 changes: 18 additions & 15 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
import { collect } from './collect'
import { FilterStream, filter } from './filter'
import { pipe } from './pipe'
import { readAsync } from './readAsync'
import { ReadStream, read } from './read'
import { ReduceStream, reduce } from './reduce'
import { TransformStream, transform, map } from './transform'
import { WriteStream, write } from './write'
import { wait } from './utils'
import { pipe } from './pipe'
import { collect } from './collect'
import { WriteStream, write } from './write'

export { collect } from './collect'
export { FilterStream, filter } from './filter'
export { pipe } from './pipe'
export { readAsync } from './readAsync'
export { ReadStream, read } from './read'
export { ReduceStream, reduce } from './reduce'
export { TransformStream, transform, map } from './transform'
export { WriteStream, write } from './write'
export { wait } from './utils'
export { pipe } from './pipe'
export { collect } from './collect'
export { WriteStream, write } from './write'

export default {
FilterStream,
collect,
filter,
ReadStream,
FilterStream,
map,
pipe,
read,
ReduceStream,
readAsync,
ReadStream,
reduce,
TransformStream,
ReduceStream,
transform,
map,
WriteStream,
write,
TransformStream,
wait,
pipe,
collect
write,
WriteStream
}
57 changes: 57 additions & 0 deletions lib/readAsync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { defer } from './utils'

const readOnceAsync = async (stream, count) => {
const data = stream.read(count)
if (data !== null) {
return data
}
return new Promise(resolve => {
stream.once('readable', () => {
const data = stream.read(count)
if (data === null) {
return resolve(stream.read())
}
resolve(data)
})
})
}

export const readAsync = async (stream, count) => {
if (!(stream && stream._readableState)) {
throw new TypeError('"stream" is not a readable stream')
}
if (stream._readableState.flowing) {
throw new TypeError('"stream" is in flowing mode, this is probably not what you want as data loss could occur. Please use stream.pause() to pause the stream before calling readAsync.')
}

const objectMode = stream._readableState.objectMode
const { resolve, reject, promise } = defer()

const cleanup = () => {
stream.removeListener('error', reject)
}

stream.once('error', reject)

if (objectMode) {
const objects = []
for (let index = 0; index < (count || 1); index++) {
const obj = await readOnceAsync(stream)
if (obj === null) {
cleanup()
if (objects.length === 0) {
return null
}
return objects
}
objects.push(obj)
}
cleanup()
resolve(objects)
} else {
const data = await readOnceAsync(stream, count)
cleanup()
return data
}
return promise
}
104 changes: 104 additions & 0 deletions test/readAsync-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import fs from 'fs'
import path from 'path'
import { readAsync, read, write, collect } from '../lib'

function nextTick (data) {
return new Promise(resolve => process.nextTick(() => resolve(data)))
}

function bufferStream () {
return fs.createReadStream(path.join(__dirname, 'test.txt'))
}

function stringStream () {
return fs.createReadStream(path.join(__dirname, 'test.txt'), 'utf8')
}

function objectStream (arr = [1, 2, 3, 4, 5, 6]) {
return read(() => {
const value = arr.shift()
return (value ? { value } : null)
})
}

describe('#readAsync', () => {
it(`rejects if it's not a readable stream`, async () => {
const writeStream = write(() => {})
await readAsync(writeStream, 1).then(() => {
assert.isTrue(false, 'The promise should have rejected')
}, err => {
assert.isNotNull(err)
assert.equal(writeStream._eventsCount, 1)
})
})
it('resolvers a buffer with a number bytes from a buffer stream', async () => {
const stream = bufferStream()
assert.deepEqual(await readAsync(stream, 4), Buffer.from('1\n2\n'))
assert.deepEqual(await readAsync(stream, 4), Buffer.from('3\n4\n'))
assert.equal(stream._eventsCount, 1)
})
it('resolvers a string with a number characters from a string stream', async () => {
const stream = stringStream()
assert.equal(await readAsync(stream, 4), '1\n2\n')
assert.equal(await readAsync(stream, 4), '3\n4\n')
assert.equal(stream._eventsCount, 1)
})
it('reads the number of objects from an object stream', async () => {
const stream = objectStream()
const objects = await readAsync(stream, 3)
assert.deepEqual(objects, [{ value: 1 }, { value: 2 }, { value: 3 }])
assert.equal(stream._eventsCount, 1)
})
it('resolvers early if the stream ends before there is enough bytes', async () => {
const file = await collect(bufferStream())
const stream = bufferStream()
const readBytes = await readAsync(stream, 500)
assert.equal(readBytes.length, file.length)
assert.deepEqual(readBytes, file)
assert.equal(stream._eventsCount, 1)
})
it('resolvers early if the stream ends before there is enough objects', async () => {
const stream = objectStream()
const objects = await readAsync(stream, 10)
assert.deepEqual(objects, [{ value: 1 }, { value: 2 }, { value: 3 }, { value: 4 }, { value: 5 }, { value: 6 }])
assert.equal(stream._eventsCount, 1)
})
it('resolves null if there was no data and the stream closed', async () => {
const stream = read({ objectMode: false }, () => null)
assert.isNull(await readAsync(stream, 5))
assert.equal(stream._eventsCount, 1)
const stream2 = read(() => null)
assert.isNull(await readAsync(stream2, 5))
assert.equal(stream2._eventsCount, 1)
})
it('resolves null if the stream has already ended', async () => {
const stream = read(() => null)
stream.read()
stream.read()
assert.isNull(await readAsync(stream, 5))
assert.equal(stream._eventsCount, 1)
})
it('rejects if the stream errors', async () => {
const stream = read(() => 1)
const error = new Error('Foo!')
nextTick().then(() => stream.emit('error', error))
await readAsync(stream, 5).then(() => {
assert.isTrue(false, 'The promise should have rejected')
}, err => {
assert.isNotNull(err)
assert.deepEqual(err, error)
assert.equal(stream._eventsCount, 1)
})
})
it('rejects if the stream is already in flowing mode', async () => {
const stream = read(() => nextTick(1))
stream.resume()
await readAsync(stream, 1).then(() => {
assert.isTrue(false, 'The promise should have rejected')
}, err => {
stream.pause()
assert.isNotNull(err)
assert.equal(stream._eventsCount, 1)
})
})
})
2 changes: 2 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ export function pipe(...streams: (ReadStream|WriteStream|TransformStream)[]): Pr

export function read(opts: StreamOptions, readFn: Function): ReadStream;

export function readAsync(stream: ReadableStream, count: Number): (Object[]|Buffer|String|null);

export function reduce(opts: StreamOptions, fn?: Function, initial?: any): ReduceStream;

export function transform(opts: StreamOptions, fn?: Function): TransformStream;
Expand Down

0 comments on commit be80c93

Please sign in to comment.