Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
feat: add abstract stream class (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored May 16, 2023
1 parent a3afbfe commit 920aa9e
Show file tree
Hide file tree
Showing 2 changed files with 329 additions and 0 deletions.
8 changes: 8 additions & 0 deletions packages/interface-stream-muxer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
},
"./stream": {
"types": "./dist/src/stream.d.ts",
"import": "./dist/src/stream.js"
}
},
"eslintConfig": {
Expand Down Expand Up @@ -134,6 +138,10 @@
"dependencies": {
"@libp2p/interface-connection": "^5.0.0",
"@libp2p/interfaces": "^3.0.0",
"@libp2p/logger": "^2.0.7",
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"it-pushable": "^3.1.3",
"it-stream-types": "^2.0.1",
"uint8arraylist": "^2.4.3"
},
Expand Down
321 changes: 321 additions & 0 deletions packages/interface-stream-muxer/src/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
import { CodeError } from '@libp2p/interfaces/errors'
import { logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import { anySignal } from 'any-signal'
import { type Pushable, pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import type { Direction, Stream, StreamStat } from '@libp2p/interface-connection'
import type { Source } from 'it-stream-types'

const log = logger('libp2p:stream')

const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT'
const ERR_SINK_ENDED = 'ERR_SINK_ENDED'
const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK'

export interface AbstractStreamInit {
/**
* A unique identifier for this stream
*/
id: string

/**
* The stream direction
*/
direction: Direction

/**
* The maximum allowable data size, any data larger than this will be
* chunked and sent in multiple data messages
*/
maxDataSize: number

/**
* User specific stream metadata
*/
metadata?: Record<string, unknown>

/**
* Invoked when the stream ends
*/
onEnd?: (err?: Error | undefined) => void
}

export abstract class AbstractStream implements Stream {
public id: string
public stat: StreamStat
public metadata: Record<string, unknown>
public source: AsyncGenerator<Uint8ArrayList, void, unknown>

private readonly abortController: AbortController
private readonly resetController: AbortController
private readonly closeController: AbortController
private sourceEnded: boolean
private sinkEnded: boolean
private sinkSunk: boolean
private endErr: Error | undefined
private readonly streamSource: Pushable<Uint8ArrayList>
private readonly onEnd?: (err?: Error | undefined) => void
private readonly maxDataSize: number

constructor (init: AbstractStreamInit) {
this.abortController = new AbortController()
this.resetController = new AbortController()
this.closeController = new AbortController()
this.sourceEnded = false
this.sinkEnded = false
this.sinkSunk = false

this.id = init.id
this.metadata = init.metadata ?? {}
this.stat = {
direction: init.direction,
timeline: {
open: Date.now()
}
}
this.maxDataSize = init.maxDataSize
this.onEnd = init.onEnd

this.source = this.streamSource = pushable<Uint8ArrayList>({
onEnd: () => {
// already sent a reset message
if (this.stat.timeline.reset !== null) {
this.sendCloseRead()
}

this.onSourceEnd()
}
})

// necessary because the libp2p upgrader wraps the sink function
this.sink = this.sink.bind(this)
}

protected onSourceEnd (err?: Error): void {
if (this.sourceEnded) {
return
}

this.stat.timeline.closeRead = Date.now()
this.sourceEnded = true
log.trace('%s stream %s source end - err: %o', this.stat.direction, this.id, err)

if (err != null && this.endErr == null) {
this.endErr = err
}

if (this.sinkEnded) {
this.stat.timeline.close = Date.now()

if (this.onEnd != null) {
this.onEnd(this.endErr)
}
}
}

protected onSinkEnd (err?: Error): void {
if (this.sinkEnded) {
return
}

this.stat.timeline.closeWrite = Date.now()
this.sinkEnded = true
log.trace('%s stream %s sink end - err: %o', this.stat.direction, this.id, err)

if (err != null && this.endErr == null) {
this.endErr = err
}

if (this.sourceEnded) {
this.stat.timeline.close = Date.now()

if (this.onEnd != null) {
this.onEnd(this.endErr)
}
}
}

// Close for both Reading and Writing
close (): void {
log.trace('%s stream %s close', this.stat.direction, this.id)

this.closeRead()
this.closeWrite()
}

// Close for reading
closeRead (): void {
log.trace('%s stream %s closeRead', this.stat.direction, this.id)

if (this.sourceEnded) {
return
}

this.streamSource.end()
}

// Close for writing
closeWrite (): void {
log.trace('%s stream %s closeWrite', this.stat.direction, this.id)

if (this.sinkEnded) {
return
}

this.closeController.abort()

try {
// need to call this here as the sink method returns in the catch block
// when the close controller is aborted
this.sendCloseWrite()
} catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err)
}

this.onSinkEnd()
}

// Close for reading and writing (local error)
abort (err: Error): void {
log.trace('%s stream %s abort', this.stat.direction, this.id, err)
// End the source with the passed error
this.streamSource.end(err)
this.abortController.abort()
this.onSinkEnd(err)
}

// Close immediately for reading and writing (remote error)
reset (): void {
const err = new CodeError('stream reset', ERR_STREAM_RESET)
this.resetController.abort()
this.streamSource.end(err)
this.onSinkEnd(err)
}

async sink (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> {
if (this.sinkSunk) {
throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK)
}

this.sinkSunk = true

if (this.sinkEnded) {
throw new CodeError('stream closed for writing', ERR_SINK_ENDED)
}

const signal = anySignal([
this.abortController.signal,
this.resetController.signal,
this.closeController.signal
])

try {
source = abortableSource(source, signal)

if (this.stat.direction === 'outbound') { // If initiator, open a new stream
this.sendNewStream()
}

for await (let data of source) {
while (data.length > 0) {
if (data.length <= this.maxDataSize) {
this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data)
break
}
data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data
this.sendData(data.sublist(0, this.maxDataSize))
data.consume(this.maxDataSize)
}
}
} catch (err: any) {
if (err.type === 'aborted' && err.message === 'The operation was aborted') {
if (this.closeController.signal.aborted) {
return
}

if (this.resetController.signal.aborted) {
err.message = 'stream reset'
err.code = ERR_STREAM_RESET
}

if (this.abortController.signal.aborted) {
err.message = 'stream aborted'
err.code = ERR_STREAM_ABORT
}
}

// Send no more data if this stream was remotely reset
if (err.code === ERR_STREAM_RESET) {
log.trace('%s stream %s reset', this.stat.direction, this.id)
} else {
log.trace('%s stream %s error', this.stat.direction, this.id, err)
try {
this.sendReset()
this.stat.timeline.reset = Date.now()
} catch (err) {
log.trace('%s stream %s error sending reset', this.stat.direction, this.id, err)
}
}

this.streamSource.end(err)
this.onSinkEnd(err)
return
} finally {
signal.clear()
}

try {
this.sendCloseWrite()
} catch (err) {
log.trace('%s stream %s error sending close', this.stat.direction, this.id, err)
}

this.onSinkEnd()
}

/**
* When an extending class reads data from it's implementation-specific source,
* call this method to allow the stream consumer to read the data.
*/
sourcePush (data: Uint8ArrayList): void {
this.streamSource.push(data)
}

/**
* Returns the amount of unread data - can be used to prevent large amounts of
* data building up when the stream consumer is too slow.
*/
sourceReadableLength (): number {
return this.streamSource.readableLength
}

/**
* Send a message to the remote muxer informing them a new stream is being
* opened
*/
abstract sendNewStream (): void

/**
* Send a data message to the remote muxer
*/
abstract sendData (buf: Uint8ArrayList): void

/**
* Send a reset message to the remote muxer
*/
abstract sendReset (): void

/**
* Send a message to the remote muxer, informing them no more data messages
* will be sent by this end of the stream
*/
abstract sendCloseWrite (): void

/**
* Send a message to the remote muxer, informing them no more data messages
* will be read by this end of the stream
*/
abstract sendCloseRead (): void
}

0 comments on commit 920aa9e

Please sign in to comment.