Skip to content

Commit

Permalink
refactor: update libp2p-yamux and fix breaking changes
Browse files Browse the repository at this point in the history
 - Passing a logger to MuxerInit is now required
 - Passing a logger to yamux()({logger:...}) is now required

See: #120

Update dependencies and fix various breaking changes introduced by those
dependency updates:

- Change type of Conn to Duplex<Uint8Array | Uint8ArrayList>
- Pass default logger to yamux as it's now required.
 - We need to pass a Uint8ArrayList to the WebSocket it-ws.

Related:

- ChainSafe/js-libp2p-yamux#69
- libp2p/js-libp2p#2275
- ChainSafe/js-libp2p-yamux#70

Signed-off-by: Christian Stewart <[email protected]>
  • Loading branch information
paralin committed Feb 10, 2024
1 parent 90f832e commit a2e6917
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 322 deletions.
4 changes: 0 additions & 4 deletions .github/renovate.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
"matchManagers": ["gomod"],
"matchDepTypes": ["replace"],
"enabled": false
},
{
"matchPackageNames": ["@chainsafe/libp2p-yamux", "@libp2p/interface", "uint8arraylist"],
"enabled": false
}
]
}
2 changes: 1 addition & 1 deletion hack/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ replace google.golang.org/protobuf => github.com/aperturerobotics/protobuf-go v1

require (
github.com/golangci/golangci-lint v1.55.2 // latest
github.com/planetscale/vtprotobuf v0.6.0 // main
github.com/planetscale/vtprotobuf v0.6.0 // latest
github.com/psampaz/go-mod-outdated v0.9.0
google.golang.org/protobuf v1.32.0
)
Expand Down
3 changes: 2 additions & 1 deletion integration/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
integration
integration.js
integration.js
integration.js.map
2 changes: 1 addition & 1 deletion integration/integration.bash
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ unset GOARCH

echo "Compiling ts..."
# ../node_modules/.bin/tsc --out integration.js --project tsconfig.json
../node_modules/.bin/esbuild integration.ts --bundle --platform=node --outfile=integration.js
../node_modules/.bin/esbuild integration.ts --bundle --sourcemap --platform=node --outfile=integration.js

echo "Compiling go..."
go build -o integration -v ./
Expand Down
6 changes: 3 additions & 3 deletions integration/integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ async function runRPC() {
const channel = new WebSocketConn(ws, 'outbound')
const client = channel.buildClient()

console.log('Running RpcStream test via WebSocket..')
await runRpcStreamTest(client)

console.log('Running client test via WebSocket..')
await runClientTest(client)

console.log('Running abort controller test via WebSocket..')
await runAbortControllerTest(client)

console.log('Running RpcStream test via WebSocket..')
await runRpcStreamTest(client)
}

process.on('unhandledRejection', (ev) => {
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@
},
"dependencies": {
"@aptre/it-ws": "^1.0.0",
"@chainsafe/libp2p-yamux": "^5.0.0",
"@libp2p/interface": "^0.1.2",
"@chainsafe/libp2p-yamux": "^6.0.2",
"@libp2p/interface": "^1.1.3",
"@libp2p/logger": "^4.0.6",
"event-iterator": "^2.0.0",
"is-promise": "^4.0.0",
"isomorphic-ws": "^5.0.0",
"it-first": "^3.0.3",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"it-stream-types": "^2.0.1",
Expand Down
12 changes: 9 additions & 3 deletions srpc/conn.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { yamux } from '@chainsafe/libp2p-yamux'
import type { Direction, Stream } from '@libp2p/interface/connection'
import type {
Direction,
Stream,
StreamMuxer,
StreamMuxerFactory,
} from '@libp2p/interface/stream-muxer'
} from '@libp2p/interface'
import { pipe } from 'it-pipe'
import type { Duplex, Source } from 'it-stream-types'
import { Uint8ArrayList } from 'uint8arraylist'
import isPromise from 'is-promise'
import { pushable, Pushable } from 'it-pushable'
import { defaultLogger } from '@libp2p/logger'

import type { OpenStreamFunc, Stream as SRPCStream } from './stream.js'
import { Client } from './client.js'
Expand Down Expand Up @@ -73,7 +75,11 @@ export class Conn
if (server) {
this.server = server
}
const muxerFactory = connParams?.muxerFactory ?? yamux()()
const muxerFactory =
connParams?.muxerFactory ??
yamux({ enableKeepAlive: false })({
logger: defaultLogger(),
})
this.muxer = muxerFactory.createStreamMuxer({
onIncomingStream: this.handleIncomingStream.bind(this),
direction: connParams?.direction || 'outbound',
Expand Down
7 changes: 7 additions & 0 deletions srpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ export {
prependLengthPrefixTransform,
decodePacketSource,
encodePacketSource,
uint32LEDecode,
uint32LEEncode,
decodeUint32Le,
encodeUint32Le,
lengthPrefixDecode,
lengthPrefixEncode,
prependPacketLen,
} from './packet.js'
export { combineUint8ArrayListTransform } from './array-list.js'
export { ValueCtr } from './value-ctr.js'
Expand Down
Empty file added srpc/length-prefix.ts
Empty file.
8 changes: 4 additions & 4 deletions srpc/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function buildDecodeMessageTransform<T>(
def: MessageDefinition<T>,
memoize?: boolean,
): DecodeMessageTransform<T> {
const decode = memoize ? memoProtoDecode(def) : def.decode.bind(def)
const decode = !memoize ? def.decode.bind(def) : memoProtoDecode(def)

// decodeMessageSource unmarshals and async yields encoded Messages.
return async function* decodeMessageSource(
Expand Down Expand Up @@ -71,11 +71,11 @@ export function buildEncodeMessageTransform<T>(
def: MessageDefinition<T>,
memoize?: boolean,
): EncodeMessageTransform<T> {
const encode = memoize
? memoProto(def)
: (msg: T): Uint8Array => {
const encode = !memoize
? (msg: T): Uint8Array => {
return def.encode(msg).finish()
}
: memoProto(def)

// encodeMessageSource encodes messages to byte arrays.
return async function* encodeMessageSource(
Expand Down
70 changes: 56 additions & 14 deletions srpc/packet.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import {
encode as lengthPrefixEncode,
decode as lengthPrefixDecode,
} from 'it-length-prefixed'
import { Uint8ArrayList } from 'uint8arraylist'
import { Source, Transform } from 'it-stream-types'

import { Packet } from './rpcproto.pb.js'
import {
buildDecodeMessageTransform,
buildEncodeMessageTransform,
} from './message.js'
import { Source, Transform } from 'it-stream-types'

// decodePacketSource decodes packets from a binary data stream.
export const decodePacketSource = buildDecodeMessageTransform<Packet>(Packet)
Expand All @@ -18,7 +14,7 @@ export const decodePacketSource = buildDecodeMessageTransform<Packet>(Packet)
export const encodePacketSource = buildEncodeMessageTransform<Packet>(Packet)

// uint32LEDecode removes the length prefix.
const uint32LEDecode = (data: Uint8ArrayList) => {
export const uint32LEDecode = (data: Uint8ArrayList) => {
if (data.length < 4) {
throw RangeError('Could not decode int32BE')
}
Expand All @@ -28,34 +24,80 @@ const uint32LEDecode = (data: Uint8ArrayList) => {
uint32LEDecode.bytes = 4

// uint32LEEncode adds the length prefix.
const uint32LEEncode = (value: number) => {
export const uint32LEEncode = (value: number) => {
const data = new Uint8ArrayList(new Uint8Array(4))
data.setUint32(0, value, true)
return data
}
uint32LEEncode.bytes = 4

// lengthPrefixEncode transforms a source to a length-prefixed Uint8ArrayList stream.
export async function* lengthPrefixEncode(
source: Source<Uint8Array | Uint8ArrayList>,
lengthEncoder: typeof uint32LEEncode,
) {
for await (const chunk of source) {
// Encode the length of the chunk.
const length = chunk instanceof Uint8Array ? chunk.length : chunk.byteLength
const lengthEncoded = lengthEncoder(length)

// Concatenate the length prefix and the data.
yield new Uint8ArrayList(lengthEncoded, chunk)
}
}

// lengthPrefixDecode decodes a length-prefixed source to a Uint8ArrayList stream.
export async function* lengthPrefixDecode(
source: Source<Uint8Array | Uint8ArrayList>,
lengthDecoder: typeof uint32LEDecode,
) {
const buffer = new Uint8ArrayList()

for await (const chunk of source) {
buffer.append(chunk)

// Continue extracting messages while buffer contains enough data for decoding.
while (buffer.length >= lengthDecoder.bytes) {
const messageLength = lengthDecoder(buffer)
const totalLength = lengthDecoder.bytes + messageLength

if (buffer.length < totalLength) break // Wait for more data if the full message hasn't arrived.

// Extract the message excluding the length prefix.
const message = buffer.sublist(lengthDecoder.bytes, totalLength)
yield message

// Remove the processed message from the buffer.
buffer.consume(totalLength)
}
}
}

// prependLengthPrefixTransform adds a length prefix to a message source.
// little-endian uint32
export function prependLengthPrefixTransform(): Transform<
export function prependLengthPrefixTransform(
lengthEncoder = uint32LEEncode,
): Transform<
Source<Uint8Array | Uint8ArrayList>,
| AsyncGenerator<Uint8Array, void, undefined>
| Generator<Uint8Array, void, undefined>
| AsyncGenerator<Uint8ArrayList, void, undefined>
| Generator<Uint8ArrayList, void, undefined>
> {
return (source: Source<Uint8Array | Uint8ArrayList>) => {
return lengthPrefixEncode(source, { lengthEncoder: uint32LEEncode })
return lengthPrefixEncode(source, lengthEncoder)
}
}

// parseLengthPrefixTransform parses the length prefix from a message source.
// little-endian uint32
export function parseLengthPrefixTransform(): Transform<
Source<Uint8Array | Uint8ArrayList>, // Allow both AsyncIterable and Iterable
export function parseLengthPrefixTransform(
lengthDecoder = uint32LEDecode,
): Transform<
Source<Uint8Array | Uint8ArrayList>,
| AsyncGenerator<Uint8ArrayList, void, unknown>
| Generator<Uint8ArrayList, void, unknown>
> {
return (source: Source<Uint8Array | Uint8ArrayList>) => {
return lengthPrefixDecode(source, { lengthDecoder: uint32LEDecode })
return lengthPrefixDecode(source, lengthDecoder)
}
}

Expand Down
11 changes: 9 additions & 2 deletions srpc/websocket.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { pipe } from 'it-pipe'
import { Direction } from '@libp2p/interface/connection'
import { Direction } from '@libp2p/interface'

import duplex from '@aptre/it-ws/duplex'
import type WebSocket from '@aptre/it-ws/web-socket'

import { Conn } from './conn.js'
import { Server } from './server.js'
import { combineUint8ArrayListTransform } from './array-list.js'

// WebSocketConn implements a connection with a WebSocket and optional Server.
export class WebSocketConn extends Conn {
Expand All @@ -16,7 +17,13 @@ export class WebSocketConn extends Conn {
super(server, { direction })
this.socket = socket
const socketDuplex = duplex(socket)
pipe(socketDuplex, this, socketDuplex)
pipe(
socketDuplex,
this,
// it-ws only supports sending Uint8Array.
combineUint8ArrayListTransform(),
socketDuplex,
)
}

// getSocket returns the websocket.
Expand Down
Loading

0 comments on commit a2e6917

Please sign in to comment.